Compare commits
2 Commits
3c2ab70ef0
...
d274f3f375
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d274f3f375 | ||
|
|
490b05438d |
@@ -64,6 +64,11 @@ class Settings(BaseSettings):
|
|||||||
# Global cache settings override (None = use DB value, True/False = override DB)
|
# Global cache settings override (None = use DB value, True/False = override DB)
|
||||||
cache_auto_create_system_projects: Optional[bool] = None # Override auto_create_system_projects
|
cache_auto_create_system_projects: Optional[bool] = None # Override auto_create_system_projects
|
||||||
|
|
||||||
|
# PyPI Cache Worker settings
|
||||||
|
pypi_cache_workers: int = 5 # Number of concurrent cache workers
|
||||||
|
pypi_cache_max_depth: int = 10 # Maximum recursion depth for dependency caching
|
||||||
|
pypi_cache_max_attempts: int = 3 # Maximum retry attempts for failed cache tasks
|
||||||
|
|
||||||
# JWT Authentication settings (optional, for external identity providers)
|
# JWT Authentication settings (optional, for external identity providers)
|
||||||
jwt_enabled: bool = False # Enable JWT token validation
|
jwt_enabled: bool = False # Enable JWT token validation
|
||||||
jwt_secret: str = "" # Secret key for HS256, or leave empty for RS256 with JWKS
|
jwt_secret: str = "" # Secret key for HS256, or leave empty for RS256 with JWKS
|
||||||
@@ -88,6 +93,24 @@ class Settings(BaseSettings):
|
|||||||
def is_production(self) -> bool:
|
def is_production(self) -> bool:
|
||||||
return self.env.lower() == "production"
|
return self.env.lower() == "production"
|
||||||
|
|
||||||
|
@property
|
||||||
|
def PORT(self) -> int:
|
||||||
|
"""Alias for server_port for compatibility."""
|
||||||
|
return self.server_port
|
||||||
|
|
||||||
|
# Uppercase aliases for PyPI cache settings (for backward compatibility)
|
||||||
|
@property
|
||||||
|
def PYPI_CACHE_WORKERS(self) -> int:
|
||||||
|
return self.pypi_cache_workers
|
||||||
|
|
||||||
|
@property
|
||||||
|
def PYPI_CACHE_MAX_DEPTH(self) -> int:
|
||||||
|
return self.pypi_cache_max_depth
|
||||||
|
|
||||||
|
@property
|
||||||
|
def PYPI_CACHE_MAX_ATTEMPTS(self) -> int:
|
||||||
|
return self.pypi_cache_max_attempts
|
||||||
|
|
||||||
class Config:
|
class Config:
|
||||||
env_prefix = "ORCHARD_"
|
env_prefix = "ORCHARD_"
|
||||||
case_sensitive = False
|
case_sensitive = False
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ from .pypi_proxy import router as pypi_router
|
|||||||
from .seed import seed_database
|
from .seed import seed_database
|
||||||
from .auth import create_default_admin
|
from .auth import create_default_admin
|
||||||
from .rate_limit import limiter
|
from .rate_limit import limiter
|
||||||
|
from .pypi_cache_worker import init_cache_worker_pool, shutdown_cache_worker_pool
|
||||||
|
|
||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
@@ -49,8 +50,13 @@ async def lifespan(app: FastAPI):
|
|||||||
else:
|
else:
|
||||||
logger.info(f"Running in {settings.env} mode - skipping seed data")
|
logger.info(f"Running in {settings.env} mode - skipping seed data")
|
||||||
|
|
||||||
|
# Initialize PyPI cache worker pool
|
||||||
|
init_cache_worker_pool()
|
||||||
|
|
||||||
yield
|
yield
|
||||||
# Shutdown: cleanup if needed
|
|
||||||
|
# Shutdown: cleanup
|
||||||
|
shutdown_cache_worker_pool()
|
||||||
|
|
||||||
|
|
||||||
app = FastAPI(
|
app = FastAPI(
|
||||||
|
|||||||
@@ -803,3 +803,70 @@ class CachedUrl(Base):
|
|||||||
return hashlib.sha256(url.encode("utf-8")).hexdigest()
|
return hashlib.sha256(url.encode("utf-8")).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
class PyPICacheTask(Base):
|
||||||
|
"""Task for caching a PyPI package and its dependencies.
|
||||||
|
|
||||||
|
Tracks the status of background caching operations with retry support.
|
||||||
|
Used by the PyPI proxy to ensure reliable dependency caching.
|
||||||
|
"""
|
||||||
|
|
||||||
|
__tablename__ = "pypi_cache_tasks"
|
||||||
|
|
||||||
|
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||||||
|
|
||||||
|
# What to cache
|
||||||
|
package_name = Column(String(255), nullable=False)
|
||||||
|
version_constraint = Column(String(255))
|
||||||
|
|
||||||
|
# Origin tracking
|
||||||
|
parent_task_id = Column(
|
||||||
|
UUID(as_uuid=True),
|
||||||
|
ForeignKey("pypi_cache_tasks.id", ondelete="SET NULL"),
|
||||||
|
)
|
||||||
|
depth = Column(Integer, nullable=False, default=0)
|
||||||
|
triggered_by_artifact = Column(
|
||||||
|
String(64),
|
||||||
|
ForeignKey("artifacts.id", ondelete="SET NULL"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Status
|
||||||
|
status = Column(String(20), nullable=False, default="pending")
|
||||||
|
attempts = Column(Integer, nullable=False, default=0)
|
||||||
|
max_attempts = Column(Integer, nullable=False, default=3)
|
||||||
|
|
||||||
|
# Results
|
||||||
|
cached_artifact_id = Column(
|
||||||
|
String(64),
|
||||||
|
ForeignKey("artifacts.id", ondelete="SET NULL"),
|
||||||
|
)
|
||||||
|
error_message = Column(Text)
|
||||||
|
|
||||||
|
# Timing
|
||||||
|
created_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow)
|
||||||
|
started_at = Column(DateTime(timezone=True))
|
||||||
|
completed_at = Column(DateTime(timezone=True))
|
||||||
|
next_retry_at = Column(DateTime(timezone=True))
|
||||||
|
|
||||||
|
# Relationships
|
||||||
|
parent_task = relationship(
|
||||||
|
"PyPICacheTask",
|
||||||
|
remote_side=[id],
|
||||||
|
backref="child_tasks",
|
||||||
|
)
|
||||||
|
|
||||||
|
__table_args__ = (
|
||||||
|
Index("idx_pypi_cache_tasks_status_retry", "status", "next_retry_at"),
|
||||||
|
Index("idx_pypi_cache_tasks_package_status", "package_name", "status"),
|
||||||
|
Index("idx_pypi_cache_tasks_parent", "parent_task_id"),
|
||||||
|
Index("idx_pypi_cache_tasks_triggered_by", "triggered_by_artifact"),
|
||||||
|
Index("idx_pypi_cache_tasks_cached_artifact", "cached_artifact_id"),
|
||||||
|
Index("idx_pypi_cache_tasks_depth_created", "depth", "created_at"),
|
||||||
|
CheckConstraint(
|
||||||
|
"status IN ('pending', 'in_progress', 'completed', 'failed')",
|
||||||
|
name="check_task_status",
|
||||||
|
),
|
||||||
|
CheckConstraint("depth >= 0", name="check_depth_non_negative"),
|
||||||
|
CheckConstraint("attempts >= 0", name="check_attempts_non_negative"),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
576
backend/app/pypi_cache_worker.py
Normal file
576
backend/app/pypi_cache_worker.py
Normal file
@@ -0,0 +1,576 @@
|
|||||||
|
"""
|
||||||
|
PyPI cache worker module.
|
||||||
|
|
||||||
|
Manages a thread pool for background caching of PyPI packages and their dependencies.
|
||||||
|
Replaces unbounded thread spawning with a managed queue-based approach.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from typing import List, Optional
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
from sqlalchemy import or_
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from .config import get_settings
|
||||||
|
|
||||||
|
settings = get_settings()
|
||||||
|
from .database import SessionLocal
|
||||||
|
from .models import PyPICacheTask, Package, Project, Tag
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# Module-level worker pool state
|
||||||
|
_cache_worker_pool: Optional[ThreadPoolExecutor] = None
|
||||||
|
_cache_worker_running: bool = False
|
||||||
|
_dispatcher_thread: Optional[threading.Thread] = None
|
||||||
|
|
||||||
|
|
||||||
|
def init_cache_worker_pool(max_workers: Optional[int] = None):
|
||||||
|
"""
|
||||||
|
Initialize the cache worker pool. Called on app startup.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
max_workers: Number of concurrent workers. Defaults to PYPI_CACHE_WORKERS setting.
|
||||||
|
"""
|
||||||
|
global _cache_worker_pool, _cache_worker_running, _dispatcher_thread
|
||||||
|
|
||||||
|
if _cache_worker_pool is not None:
|
||||||
|
logger.warning("Cache worker pool already initialized")
|
||||||
|
return
|
||||||
|
|
||||||
|
workers = max_workers or settings.PYPI_CACHE_WORKERS
|
||||||
|
_cache_worker_pool = ThreadPoolExecutor(
|
||||||
|
max_workers=workers,
|
||||||
|
thread_name_prefix="pypi-cache-",
|
||||||
|
)
|
||||||
|
_cache_worker_running = True
|
||||||
|
|
||||||
|
# Start the dispatcher thread
|
||||||
|
_dispatcher_thread = threading.Thread(
|
||||||
|
target=_cache_dispatcher_loop,
|
||||||
|
daemon=True,
|
||||||
|
name="pypi-cache-dispatcher",
|
||||||
|
)
|
||||||
|
_dispatcher_thread.start()
|
||||||
|
|
||||||
|
logger.info(f"PyPI cache worker pool initialized with {workers} workers")
|
||||||
|
|
||||||
|
|
||||||
|
def shutdown_cache_worker_pool(wait: bool = True, timeout: float = 30.0):
|
||||||
|
"""
|
||||||
|
Shutdown the cache worker pool gracefully.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
wait: Whether to wait for pending tasks to complete.
|
||||||
|
timeout: Maximum time to wait for shutdown.
|
||||||
|
"""
|
||||||
|
global _cache_worker_pool, _cache_worker_running, _dispatcher_thread
|
||||||
|
|
||||||
|
if _cache_worker_pool is None:
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("Shutting down PyPI cache worker pool...")
|
||||||
|
_cache_worker_running = False
|
||||||
|
|
||||||
|
# Wait for dispatcher to stop
|
||||||
|
if _dispatcher_thread and _dispatcher_thread.is_alive():
|
||||||
|
_dispatcher_thread.join(timeout=5.0)
|
||||||
|
|
||||||
|
# Shutdown thread pool
|
||||||
|
_cache_worker_pool.shutdown(wait=wait, cancel_futures=not wait)
|
||||||
|
_cache_worker_pool = None
|
||||||
|
_dispatcher_thread = None
|
||||||
|
|
||||||
|
logger.info("PyPI cache worker pool shut down")
|
||||||
|
|
||||||
|
|
||||||
|
def _cache_dispatcher_loop():
|
||||||
|
"""
|
||||||
|
Main dispatcher loop: poll DB for pending tasks and submit to worker pool.
|
||||||
|
"""
|
||||||
|
logger.info("PyPI cache dispatcher started")
|
||||||
|
|
||||||
|
while _cache_worker_running:
|
||||||
|
try:
|
||||||
|
db = SessionLocal()
|
||||||
|
try:
|
||||||
|
tasks = _get_ready_tasks(db, limit=10)
|
||||||
|
|
||||||
|
for task in tasks:
|
||||||
|
# Mark in_progress before submitting
|
||||||
|
task.status = "in_progress"
|
||||||
|
task.started_at = datetime.utcnow()
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
# Submit to worker pool
|
||||||
|
_cache_worker_pool.submit(_process_cache_task, task.id)
|
||||||
|
|
||||||
|
# Sleep if no work (avoid busy loop)
|
||||||
|
if not tasks:
|
||||||
|
time.sleep(2.0)
|
||||||
|
else:
|
||||||
|
# Small delay between batches to avoid overwhelming
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"PyPI cache dispatcher error: {e}")
|
||||||
|
time.sleep(5.0)
|
||||||
|
|
||||||
|
logger.info("PyPI cache dispatcher stopped")
|
||||||
|
|
||||||
|
|
||||||
|
def _get_ready_tasks(db: Session, limit: int = 10) -> List[PyPICacheTask]:
|
||||||
|
"""
|
||||||
|
Get tasks ready to process.
|
||||||
|
|
||||||
|
Returns pending tasks that are either new or ready for retry.
|
||||||
|
Orders by depth (shallow first) then creation time (FIFO).
|
||||||
|
"""
|
||||||
|
now = datetime.utcnow()
|
||||||
|
return (
|
||||||
|
db.query(PyPICacheTask)
|
||||||
|
.filter(
|
||||||
|
PyPICacheTask.status == "pending",
|
||||||
|
or_(
|
||||||
|
PyPICacheTask.next_retry_at == None, # New tasks
|
||||||
|
PyPICacheTask.next_retry_at <= now, # Retry tasks ready
|
||||||
|
),
|
||||||
|
)
|
||||||
|
.order_by(
|
||||||
|
PyPICacheTask.depth.asc(), # Prefer shallow deps first
|
||||||
|
PyPICacheTask.created_at.asc(), # FIFO within same depth
|
||||||
|
)
|
||||||
|
.limit(limit)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _process_cache_task(task_id: UUID):
|
||||||
|
"""
|
||||||
|
Process a single cache task. Called by worker pool.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
task_id: The ID of the task to process.
|
||||||
|
"""
|
||||||
|
db = SessionLocal()
|
||||||
|
try:
|
||||||
|
task = db.query(PyPICacheTask).filter(PyPICacheTask.id == task_id).first()
|
||||||
|
if not task:
|
||||||
|
logger.warning(f"PyPI cache task {task_id} not found")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Processing cache task: {task.package_name} "
|
||||||
|
f"(depth={task.depth}, attempt={task.attempts + 1})"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if already cached by another task (dedup)
|
||||||
|
existing_artifact = _find_cached_package(db, task.package_name)
|
||||||
|
if existing_artifact:
|
||||||
|
logger.info(f"Package {task.package_name} already cached, skipping")
|
||||||
|
_mark_task_completed(db, task, cached_artifact_id=existing_artifact)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Check depth limit
|
||||||
|
max_depth = settings.PYPI_CACHE_MAX_DEPTH
|
||||||
|
if task.depth >= max_depth:
|
||||||
|
_mark_task_failed(db, task, f"Max depth {max_depth} exceeded")
|
||||||
|
return
|
||||||
|
|
||||||
|
# Do the actual caching
|
||||||
|
result = _fetch_and_cache_package(task.package_name, task.version_constraint)
|
||||||
|
|
||||||
|
if result["success"]:
|
||||||
|
_mark_task_completed(db, task, cached_artifact_id=result.get("artifact_id"))
|
||||||
|
logger.info(f"Successfully cached {task.package_name}")
|
||||||
|
else:
|
||||||
|
_handle_task_failure(db, task, result["error"])
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.exception(f"Error processing cache task {task_id}")
|
||||||
|
db = SessionLocal() # Get fresh session after exception
|
||||||
|
try:
|
||||||
|
task = db.query(PyPICacheTask).filter(PyPICacheTask.id == task_id).first()
|
||||||
|
if task:
|
||||||
|
_handle_task_failure(db, task, str(e))
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
finally:
|
||||||
|
db.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _find_cached_package(db: Session, package_name: str) -> Optional[str]:
|
||||||
|
"""
|
||||||
|
Check if a package is already cached.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: Database session.
|
||||||
|
package_name: Normalized package name.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Artifact ID if cached, None otherwise.
|
||||||
|
"""
|
||||||
|
# Normalize package name (PEP 503)
|
||||||
|
normalized = re.sub(r"[-_.]+", "-", package_name).lower()
|
||||||
|
|
||||||
|
# Check if _pypi project has this package with at least one tag
|
||||||
|
system_project = db.query(Project).filter(Project.name == "_pypi").first()
|
||||||
|
if not system_project:
|
||||||
|
return None
|
||||||
|
|
||||||
|
package = (
|
||||||
|
db.query(Package)
|
||||||
|
.filter(
|
||||||
|
Package.project_id == system_project.id,
|
||||||
|
Package.name == normalized,
|
||||||
|
)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if not package:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Check if package has any tags (cached files)
|
||||||
|
tag = db.query(Tag).filter(Tag.package_id == package.id).first()
|
||||||
|
if tag:
|
||||||
|
return tag.artifact_id
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _fetch_and_cache_package(
|
||||||
|
package_name: str,
|
||||||
|
version_constraint: Optional[str] = None,
|
||||||
|
) -> dict:
|
||||||
|
"""
|
||||||
|
Fetch and cache a PyPI package by making requests through our own proxy.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
package_name: The package name to cache.
|
||||||
|
version_constraint: Optional version constraint (currently not used for selection).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with "success" bool, "artifact_id" on success, "error" on failure.
|
||||||
|
"""
|
||||||
|
# Normalize package name (PEP 503)
|
||||||
|
normalized_name = re.sub(r"[-_.]+", "-", package_name).lower()
|
||||||
|
|
||||||
|
# Build the URL to our own proxy
|
||||||
|
# Use localhost since we're making internal requests
|
||||||
|
base_url = f"http://localhost:{settings.PORT}"
|
||||||
|
|
||||||
|
try:
|
||||||
|
with httpx.Client(timeout=60.0, follow_redirects=True) as client:
|
||||||
|
# Step 1: Get the simple index page
|
||||||
|
simple_url = f"{base_url}/pypi/simple/{normalized_name}/"
|
||||||
|
logger.debug(f"Fetching index: {simple_url}")
|
||||||
|
|
||||||
|
response = client.get(simple_url)
|
||||||
|
if response.status_code == 404:
|
||||||
|
return {"success": False, "error": f"Package {package_name} not found on upstream"}
|
||||||
|
if response.status_code != 200:
|
||||||
|
return {"success": False, "error": f"Failed to get index: HTTP {response.status_code}"}
|
||||||
|
|
||||||
|
# Step 2: Parse HTML to find downloadable files
|
||||||
|
html = response.text
|
||||||
|
|
||||||
|
# Create pattern that matches both normalized (hyphens) and original (underscores)
|
||||||
|
name_pattern = re.sub(r"[-_]+", "[-_]+", normalized_name)
|
||||||
|
|
||||||
|
# Look for wheel files first (preferred)
|
||||||
|
wheel_pattern = rf'href="([^"]*{name_pattern}[^"]*\.whl[^"]*)"'
|
||||||
|
matches = re.findall(wheel_pattern, html, re.IGNORECASE)
|
||||||
|
|
||||||
|
if not matches:
|
||||||
|
# Fall back to sdist
|
||||||
|
sdist_pattern = rf'href="([^"]*{name_pattern}[^"]*\.tar\.gz[^"]*)"'
|
||||||
|
matches = re.findall(sdist_pattern, html, re.IGNORECASE)
|
||||||
|
|
||||||
|
if not matches:
|
||||||
|
logger.warning(
|
||||||
|
f"No downloadable files found for {package_name}. "
|
||||||
|
f"Pattern: {wheel_pattern}, HTML preview: {html[:500]}"
|
||||||
|
)
|
||||||
|
return {"success": False, "error": "No downloadable files found"}
|
||||||
|
|
||||||
|
# Get the last match (usually latest version)
|
||||||
|
download_url = matches[-1]
|
||||||
|
|
||||||
|
# Make URL absolute if needed
|
||||||
|
if download_url.startswith("/"):
|
||||||
|
download_url = f"{base_url}{download_url}"
|
||||||
|
elif not download_url.startswith("http"):
|
||||||
|
download_url = f"{base_url}/pypi/simple/{normalized_name}/{download_url}"
|
||||||
|
|
||||||
|
# Step 3: Download the file through our proxy (this caches it)
|
||||||
|
logger.debug(f"Downloading: {download_url}")
|
||||||
|
response = client.get(download_url)
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
return {"success": False, "error": f"Download failed: HTTP {response.status_code}"}
|
||||||
|
|
||||||
|
# Get artifact ID from response header
|
||||||
|
artifact_id = response.headers.get("X-Checksum-SHA256")
|
||||||
|
|
||||||
|
return {"success": True, "artifact_id": artifact_id}
|
||||||
|
|
||||||
|
except httpx.TimeoutException as e:
|
||||||
|
return {"success": False, "error": f"Timeout: {e}"}
|
||||||
|
except httpx.ConnectError as e:
|
||||||
|
return {"success": False, "error": f"Connection failed: {e}"}
|
||||||
|
except Exception as e:
|
||||||
|
return {"success": False, "error": str(e)}
|
||||||
|
|
||||||
|
|
||||||
|
def _mark_task_completed(
|
||||||
|
db: Session,
|
||||||
|
task: PyPICacheTask,
|
||||||
|
cached_artifact_id: Optional[str] = None,
|
||||||
|
):
|
||||||
|
"""Mark a task as completed."""
|
||||||
|
task.status = "completed"
|
||||||
|
task.completed_at = datetime.utcnow()
|
||||||
|
task.cached_artifact_id = cached_artifact_id
|
||||||
|
task.error_message = None
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def _mark_task_failed(db: Session, task: PyPICacheTask, error: str):
|
||||||
|
"""Mark a task as permanently failed."""
|
||||||
|
task.status = "failed"
|
||||||
|
task.completed_at = datetime.utcnow()
|
||||||
|
task.error_message = error[:1000] if error else None
|
||||||
|
db.commit()
|
||||||
|
logger.warning(f"PyPI cache task failed permanently: {task.package_name} - {error}")
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_task_failure(db: Session, task: PyPICacheTask, error: str):
|
||||||
|
"""
|
||||||
|
Handle a failed cache attempt with exponential backoff.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: Database session.
|
||||||
|
task: The failed task.
|
||||||
|
error: Error message.
|
||||||
|
"""
|
||||||
|
task.attempts += 1
|
||||||
|
task.error_message = error[:1000] if error else None
|
||||||
|
|
||||||
|
max_attempts = task.max_attempts or settings.PYPI_CACHE_MAX_ATTEMPTS
|
||||||
|
|
||||||
|
if task.attempts >= max_attempts:
|
||||||
|
# Give up after max attempts
|
||||||
|
task.status = "failed"
|
||||||
|
task.completed_at = datetime.utcnow()
|
||||||
|
logger.warning(
|
||||||
|
f"PyPI cache task failed permanently: {task.package_name} - {error} "
|
||||||
|
f"(after {task.attempts} attempts)"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Schedule retry with exponential backoff
|
||||||
|
# Attempt 1 failed → retry in 30s
|
||||||
|
# Attempt 2 failed → retry in 60s
|
||||||
|
# Attempt 3 failed → permanent failure (if max_attempts=3)
|
||||||
|
backoff_seconds = 30 * (2 ** (task.attempts - 1))
|
||||||
|
task.status = "pending"
|
||||||
|
task.next_retry_at = datetime.utcnow() + timedelta(seconds=backoff_seconds)
|
||||||
|
logger.info(
|
||||||
|
f"PyPI cache task will retry: {task.package_name} in {backoff_seconds}s "
|
||||||
|
f"(attempt {task.attempts}/{max_attempts})"
|
||||||
|
)
|
||||||
|
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
|
||||||
|
def enqueue_cache_task(
|
||||||
|
db: Session,
|
||||||
|
package_name: str,
|
||||||
|
version_constraint: Optional[str] = None,
|
||||||
|
parent_task_id: Optional[UUID] = None,
|
||||||
|
depth: int = 0,
|
||||||
|
triggered_by_artifact: Optional[str] = None,
|
||||||
|
) -> Optional[PyPICacheTask]:
|
||||||
|
"""
|
||||||
|
Enqueue a package for caching.
|
||||||
|
|
||||||
|
Performs deduplication: won't create a task if one already exists
|
||||||
|
for the same package in pending/in_progress state, or if the package
|
||||||
|
is already cached.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: Database session.
|
||||||
|
package_name: The package name to cache.
|
||||||
|
version_constraint: Optional version constraint.
|
||||||
|
parent_task_id: Parent task that spawned this one.
|
||||||
|
depth: Recursion depth.
|
||||||
|
triggered_by_artifact: Artifact that declared this dependency.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The created or existing task, or None if already cached.
|
||||||
|
"""
|
||||||
|
# Normalize package name (PEP 503)
|
||||||
|
normalized = re.sub(r"[-_.]+", "-", package_name).lower()
|
||||||
|
|
||||||
|
# Check for existing pending/in_progress task
|
||||||
|
existing_task = (
|
||||||
|
db.query(PyPICacheTask)
|
||||||
|
.filter(
|
||||||
|
PyPICacheTask.package_name == normalized,
|
||||||
|
PyPICacheTask.status.in_(["pending", "in_progress"]),
|
||||||
|
)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if existing_task:
|
||||||
|
logger.debug(f"Task already exists for {normalized}: {existing_task.id}")
|
||||||
|
return existing_task
|
||||||
|
|
||||||
|
# Check if already cached
|
||||||
|
if _find_cached_package(db, normalized):
|
||||||
|
logger.debug(f"Package {normalized} already cached, skipping task creation")
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Create new task
|
||||||
|
task = PyPICacheTask(
|
||||||
|
package_name=normalized,
|
||||||
|
version_constraint=version_constraint,
|
||||||
|
parent_task_id=parent_task_id,
|
||||||
|
depth=depth,
|
||||||
|
triggered_by_artifact=triggered_by_artifact,
|
||||||
|
max_attempts=settings.PYPI_CACHE_MAX_ATTEMPTS,
|
||||||
|
)
|
||||||
|
db.add(task)
|
||||||
|
db.flush()
|
||||||
|
|
||||||
|
logger.info(f"Enqueued cache task for {normalized} (depth={depth})")
|
||||||
|
return task
|
||||||
|
|
||||||
|
|
||||||
|
def get_cache_status(db: Session) -> dict:
|
||||||
|
"""
|
||||||
|
Get summary of cache task queue status.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Dict with counts by status.
|
||||||
|
"""
|
||||||
|
from sqlalchemy import func
|
||||||
|
|
||||||
|
stats = (
|
||||||
|
db.query(PyPICacheTask.status, func.count(PyPICacheTask.id))
|
||||||
|
.group_by(PyPICacheTask.status)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
|
||||||
|
return {
|
||||||
|
"pending": next((s[1] for s in stats if s[0] == "pending"), 0),
|
||||||
|
"in_progress": next((s[1] for s in stats if s[0] == "in_progress"), 0),
|
||||||
|
"completed": next((s[1] for s in stats if s[0] == "completed"), 0),
|
||||||
|
"failed": next((s[1] for s in stats if s[0] == "failed"), 0),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def get_failed_tasks(db: Session, limit: int = 50) -> List[dict]:
|
||||||
|
"""
|
||||||
|
Get list of failed tasks for debugging.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: Database session.
|
||||||
|
limit: Maximum number of tasks to return.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of failed task info dicts.
|
||||||
|
"""
|
||||||
|
tasks = (
|
||||||
|
db.query(PyPICacheTask)
|
||||||
|
.filter(PyPICacheTask.status == "failed")
|
||||||
|
.order_by(PyPICacheTask.completed_at.desc())
|
||||||
|
.limit(limit)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
"id": str(task.id),
|
||||||
|
"package": task.package_name,
|
||||||
|
"error": task.error_message,
|
||||||
|
"attempts": task.attempts,
|
||||||
|
"depth": task.depth,
|
||||||
|
"failed_at": task.completed_at.isoformat() if task.completed_at else None,
|
||||||
|
}
|
||||||
|
for task in tasks
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def retry_failed_task(db: Session, package_name: str) -> Optional[PyPICacheTask]:
|
||||||
|
"""
|
||||||
|
Reset a failed task to retry.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: Database session.
|
||||||
|
package_name: The package name to retry.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The reset task, or None if not found.
|
||||||
|
"""
|
||||||
|
normalized = re.sub(r"[-_.]+", "-", package_name).lower()
|
||||||
|
|
||||||
|
task = (
|
||||||
|
db.query(PyPICacheTask)
|
||||||
|
.filter(
|
||||||
|
PyPICacheTask.package_name == normalized,
|
||||||
|
PyPICacheTask.status == "failed",
|
||||||
|
)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
|
||||||
|
if not task:
|
||||||
|
return None
|
||||||
|
|
||||||
|
task.status = "pending"
|
||||||
|
task.attempts = 0
|
||||||
|
task.next_retry_at = None
|
||||||
|
task.error_message = None
|
||||||
|
task.started_at = None
|
||||||
|
task.completed_at = None
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
logger.info(f"Reset failed task for retry: {normalized}")
|
||||||
|
return task
|
||||||
|
|
||||||
|
|
||||||
|
def retry_all_failed_tasks(db: Session) -> int:
|
||||||
|
"""
|
||||||
|
Reset all failed tasks to retry.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: Database session.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of tasks reset.
|
||||||
|
"""
|
||||||
|
count = (
|
||||||
|
db.query(PyPICacheTask)
|
||||||
|
.filter(PyPICacheTask.status == "failed")
|
||||||
|
.update(
|
||||||
|
{
|
||||||
|
"status": "pending",
|
||||||
|
"attempts": 0,
|
||||||
|
"next_retry_at": None,
|
||||||
|
"error_message": None,
|
||||||
|
"started_at": None,
|
||||||
|
"completed_at": None,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
)
|
||||||
|
db.commit()
|
||||||
|
|
||||||
|
logger.info(f"Reset {count} failed tasks for retry")
|
||||||
|
return count
|
||||||
@@ -9,7 +9,6 @@ import hashlib
|
|||||||
import logging
|
import logging
|
||||||
import re
|
import re
|
||||||
import tarfile
|
import tarfile
|
||||||
import threading
|
|
||||||
import zipfile
|
import zipfile
|
||||||
from io import BytesIO
|
from io import BytesIO
|
||||||
from typing import Optional, List, Tuple
|
from typing import Optional, List, Tuple
|
||||||
@@ -24,6 +23,13 @@ from .database import get_db
|
|||||||
from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, Tag, PackageVersion, ArtifactDependency
|
from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, Tag, PackageVersion, ArtifactDependency
|
||||||
from .storage import S3Storage, get_storage
|
from .storage import S3Storage, get_storage
|
||||||
from .config import get_env_upstream_sources
|
from .config import get_env_upstream_sources
|
||||||
|
from .pypi_cache_worker import (
|
||||||
|
enqueue_cache_task,
|
||||||
|
get_cache_status,
|
||||||
|
get_failed_tasks,
|
||||||
|
retry_failed_task,
|
||||||
|
retry_all_failed_tasks,
|
||||||
|
)
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -503,93 +509,6 @@ async def pypi_package_versions(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _cache_dependency_background(
|
|
||||||
base_url: str,
|
|
||||||
dep_name: str,
|
|
||||||
dep_version: Optional[str],
|
|
||||||
depth: int = 0,
|
|
||||||
max_depth: int = 10,
|
|
||||||
):
|
|
||||||
"""
|
|
||||||
Background task to proactively cache a dependency.
|
|
||||||
|
|
||||||
Fetches the dependency from upstream via our own proxy, which will
|
|
||||||
recursively cache its dependencies as well.
|
|
||||||
"""
|
|
||||||
if depth >= max_depth:
|
|
||||||
logger.warning(f"PyPI proxy: max depth {max_depth} reached caching {dep_name}")
|
|
||||||
return
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Normalize package name for URL (PEP 503)
|
|
||||||
normalized_name = re.sub(r'[-_.]+', '-', dep_name).lower()
|
|
||||||
|
|
||||||
# First, get the simple index page to find available versions
|
|
||||||
# Use HTTPS explicitly to avoid redirect issues that can drop trailing slashes
|
|
||||||
if base_url.startswith('http://'):
|
|
||||||
base_url = 'https://' + base_url[7:]
|
|
||||||
simple_url = f"{base_url}/pypi/simple/{normalized_name}/"
|
|
||||||
logger.info(f"PyPI proxy: proactively caching {dep_name} (depth={depth})")
|
|
||||||
|
|
||||||
with httpx.Client(timeout=30.0, follow_redirects=True) as client:
|
|
||||||
response = client.get(simple_url)
|
|
||||||
if response.status_code != 200:
|
|
||||||
logger.warning(f"PyPI proxy: failed to get index for {dep_name}: {response.status_code}")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Parse the HTML to find wheel files
|
|
||||||
html = response.text
|
|
||||||
|
|
||||||
# Create pattern that matches both normalized (hyphens) and original (underscores) forms
|
|
||||||
# PEP 503 normalizes to hyphens, but wheel filenames may use underscores
|
|
||||||
name_pattern = re.sub(r'[-_]+', '[-_]+', normalized_name)
|
|
||||||
|
|
||||||
# Look for wheel files (.whl) - prefer them over sdist
|
|
||||||
wheel_pattern = rf'href="([^"]*{name_pattern}[^"]*\.whl[^"]*)"'
|
|
||||||
matches = re.findall(wheel_pattern, html, re.IGNORECASE)
|
|
||||||
|
|
||||||
if not matches:
|
|
||||||
# Try sdist
|
|
||||||
sdist_pattern = rf'href="([^"]*{name_pattern}[^"]*\.tar\.gz[^"]*)"'
|
|
||||||
matches = re.findall(sdist_pattern, html, re.IGNORECASE)
|
|
||||||
|
|
||||||
if not matches:
|
|
||||||
# Debug: log first 500 chars of HTML and the pattern we're looking for
|
|
||||||
logger.warning(f"PyPI proxy: no downloadable files found for {dep_name}. Pattern: {wheel_pattern}, HTML preview: {html[:500]}")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Get the last match (usually the latest version)
|
|
||||||
# The URL might be relative or absolute
|
|
||||||
download_url = matches[-1]
|
|
||||||
if download_url.startswith('/'):
|
|
||||||
download_url = f"{base_url}{download_url}"
|
|
||||||
elif not download_url.startswith('http'):
|
|
||||||
download_url = f"{base_url}/pypi/simple/{normalized_name}/{download_url}"
|
|
||||||
|
|
||||||
# Download the file through our proxy (this will cache it)
|
|
||||||
logger.info(f"PyPI proxy: downloading dependency {dep_name} from {download_url}")
|
|
||||||
response = client.get(download_url)
|
|
||||||
if response.status_code == 200:
|
|
||||||
logger.info(f"PyPI proxy: successfully cached {dep_name}")
|
|
||||||
else:
|
|
||||||
logger.warning(f"PyPI proxy: failed to cache {dep_name}: {response.status_code}")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning(f"PyPI proxy: error caching dependency {dep_name}: {e}")
|
|
||||||
|
|
||||||
|
|
||||||
def _start_background_dependency_caching(base_url: str, dependencies: List[Tuple[str, Optional[str]]]):
|
|
||||||
"""Start background threads to cache dependencies."""
|
|
||||||
for dep_name, dep_version in dependencies:
|
|
||||||
# Use a thread to avoid blocking
|
|
||||||
thread = threading.Thread(
|
|
||||||
target=_cache_dependency_background,
|
|
||||||
args=(base_url, dep_name, dep_version),
|
|
||||||
daemon=True,
|
|
||||||
)
|
|
||||||
thread.start()
|
|
||||||
|
|
||||||
|
|
||||||
@router.get("/simple/{package_name}/{filename}")
|
@router.get("/simple/{package_name}/{filename}")
|
||||||
async def pypi_download_file(
|
async def pypi_download_file(
|
||||||
request: Request,
|
request: Request,
|
||||||
@@ -851,12 +770,20 @@ async def pypi_download_file(
|
|||||||
)
|
)
|
||||||
db.add(dep)
|
db.add(dep)
|
||||||
|
|
||||||
db.commit()
|
# Proactively cache dependencies via task queue
|
||||||
|
|
||||||
# Proactively cache dependencies in the background
|
|
||||||
if unique_deps:
|
if unique_deps:
|
||||||
base_url = str(request.base_url).rstrip("/")
|
for dep_name, dep_version in unique_deps:
|
||||||
_start_background_dependency_caching(base_url, unique_deps)
|
enqueue_cache_task(
|
||||||
|
db,
|
||||||
|
package_name=dep_name,
|
||||||
|
version_constraint=dep_version,
|
||||||
|
parent_task_id=None, # Top-level, triggered by user download
|
||||||
|
depth=0,
|
||||||
|
triggered_by_artifact=sha256,
|
||||||
|
)
|
||||||
|
logger.info(f"PyPI proxy: queued {len(unique_deps)} dependencies for caching")
|
||||||
|
|
||||||
|
db.commit()
|
||||||
|
|
||||||
# Return the file
|
# Return the file
|
||||||
return Response(
|
return Response(
|
||||||
@@ -879,3 +806,63 @@ async def pypi_download_file(
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(f"PyPI proxy: error downloading {filename}")
|
logger.exception(f"PyPI proxy: error downloading {filename}")
|
||||||
raise HTTPException(status_code=500, detail=str(e))
|
raise HTTPException(status_code=500, detail=str(e))
|
||||||
|
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Cache Status and Management Endpoints
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/cache/status")
|
||||||
|
async def pypi_cache_status(db: Session = Depends(get_db)):
|
||||||
|
"""
|
||||||
|
Get summary of the PyPI cache task queue.
|
||||||
|
|
||||||
|
Returns counts of tasks by status (pending, in_progress, completed, failed).
|
||||||
|
"""
|
||||||
|
return get_cache_status(db)
|
||||||
|
|
||||||
|
|
||||||
|
@router.get("/cache/failed")
|
||||||
|
async def pypi_cache_failed(
|
||||||
|
limit: int = 50,
|
||||||
|
db: Session = Depends(get_db),
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Get list of failed cache tasks for debugging.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
limit: Maximum number of tasks to return (default 50).
|
||||||
|
"""
|
||||||
|
return get_failed_tasks(db, limit=limit)
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/cache/retry/{package_name}")
|
||||||
|
async def pypi_cache_retry(
|
||||||
|
package_name: str,
|
||||||
|
db: Session = Depends(get_db),
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Reset a failed cache task to retry.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
package_name: The package name to retry.
|
||||||
|
"""
|
||||||
|
task = retry_failed_task(db, package_name)
|
||||||
|
if not task:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=404,
|
||||||
|
detail=f"No failed cache task found for package '{package_name}'"
|
||||||
|
)
|
||||||
|
return {"message": f"Retry queued for {task.package_name}", "task_id": str(task.id)}
|
||||||
|
|
||||||
|
|
||||||
|
@router.post("/cache/retry-all")
|
||||||
|
async def pypi_cache_retry_all(db: Session = Depends(get_db)):
|
||||||
|
"""
|
||||||
|
Reset all failed cache tasks to retry.
|
||||||
|
|
||||||
|
Returns the count of tasks that were reset.
|
||||||
|
"""
|
||||||
|
count = retry_all_failed_tasks(db)
|
||||||
|
return {"message": f"Queued {count} tasks for retry", "count": count}
|
||||||
|
|||||||
263
backend/tests/test_pypi_cache_worker.py
Normal file
263
backend/tests/test_pypi_cache_worker.py
Normal file
@@ -0,0 +1,263 @@
|
|||||||
|
"""Tests for PyPI cache worker module."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import pytest
|
||||||
|
import re
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
|
||||||
|
def get_base_url():
|
||||||
|
"""Get the base URL for the Orchard server from environment."""
|
||||||
|
return os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||||
|
|
||||||
|
|
||||||
|
class TestPyPICacheTaskModel:
|
||||||
|
"""Tests for PyPICacheTask model."""
|
||||||
|
|
||||||
|
def test_model_creation(self):
|
||||||
|
"""Test that PyPICacheTask model can be instantiated with explicit values."""
|
||||||
|
from app.models import PyPICacheTask
|
||||||
|
|
||||||
|
task = PyPICacheTask(
|
||||||
|
package_name="requests",
|
||||||
|
version_constraint=">=2.25.0",
|
||||||
|
depth=0,
|
||||||
|
status="pending",
|
||||||
|
attempts=0,
|
||||||
|
max_attempts=3,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert task.package_name == "requests"
|
||||||
|
assert task.version_constraint == ">=2.25.0"
|
||||||
|
assert task.depth == 0
|
||||||
|
assert task.status == "pending"
|
||||||
|
assert task.attempts == 0
|
||||||
|
assert task.max_attempts == 3
|
||||||
|
|
||||||
|
def test_model_fields_exist(self):
|
||||||
|
"""Test that PyPICacheTask has all expected fields."""
|
||||||
|
from app.models import PyPICacheTask
|
||||||
|
|
||||||
|
# Create with minimal required field
|
||||||
|
task = PyPICacheTask(package_name="urllib3")
|
||||||
|
|
||||||
|
# Verify all expected attributes exist (SQLAlchemy defaults apply on flush)
|
||||||
|
assert hasattr(task, "status")
|
||||||
|
assert hasattr(task, "depth")
|
||||||
|
assert hasattr(task, "attempts")
|
||||||
|
assert hasattr(task, "max_attempts")
|
||||||
|
assert hasattr(task, "version_constraint")
|
||||||
|
assert hasattr(task, "parent_task_id")
|
||||||
|
assert hasattr(task, "triggered_by_artifact")
|
||||||
|
|
||||||
|
|
||||||
|
class TestEnqueueCacheTask:
|
||||||
|
"""Tests for enqueue_cache_task function."""
|
||||||
|
|
||||||
|
def test_normalize_package_name(self):
|
||||||
|
"""Test that package names are normalized per PEP 503."""
|
||||||
|
# Test the normalization pattern used in the worker
|
||||||
|
test_cases = [
|
||||||
|
("Requests", "requests"),
|
||||||
|
("typing_extensions", "typing-extensions"),
|
||||||
|
("some.package", "some-package"),
|
||||||
|
("UPPER_CASE", "upper-case"),
|
||||||
|
("mixed-Case_name", "mixed-case-name"),
|
||||||
|
]
|
||||||
|
|
||||||
|
for input_name, expected in test_cases:
|
||||||
|
normalized = re.sub(r"[-_.]+", "-", input_name).lower()
|
||||||
|
assert normalized == expected, f"Failed for {input_name}"
|
||||||
|
|
||||||
|
|
||||||
|
class TestCacheWorkerFunctions:
|
||||||
|
"""Tests for cache worker helper functions."""
|
||||||
|
|
||||||
|
def test_exponential_backoff_calculation(self):
|
||||||
|
"""Test that exponential backoff is calculated correctly."""
|
||||||
|
# The formula is: 30 * (2 ** (attempts - 1))
|
||||||
|
# Attempt 1 failed → 30s
|
||||||
|
# Attempt 2 failed → 60s
|
||||||
|
# Attempt 3 failed → 120s
|
||||||
|
|
||||||
|
def calc_backoff(attempts):
|
||||||
|
return 30 * (2 ** (attempts - 1))
|
||||||
|
|
||||||
|
assert calc_backoff(1) == 30
|
||||||
|
assert calc_backoff(2) == 60
|
||||||
|
assert calc_backoff(3) == 120
|
||||||
|
|
||||||
|
|
||||||
|
class TestPyPICacheAPIEndpoints:
|
||||||
|
"""Integration tests for PyPI cache API endpoints."""
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_cache_status_endpoint(self):
|
||||||
|
"""Test GET /pypi/cache/status returns queue statistics."""
|
||||||
|
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
|
||||||
|
response = client.get("/pypi/cache/status")
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert "pending" in data
|
||||||
|
assert "in_progress" in data
|
||||||
|
assert "completed" in data
|
||||||
|
assert "failed" in data
|
||||||
|
|
||||||
|
# All values should be non-negative integers
|
||||||
|
assert isinstance(data["pending"], int)
|
||||||
|
assert isinstance(data["in_progress"], int)
|
||||||
|
assert isinstance(data["completed"], int)
|
||||||
|
assert isinstance(data["failed"], int)
|
||||||
|
assert data["pending"] >= 0
|
||||||
|
assert data["in_progress"] >= 0
|
||||||
|
assert data["completed"] >= 0
|
||||||
|
assert data["failed"] >= 0
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_cache_failed_endpoint(self):
|
||||||
|
"""Test GET /pypi/cache/failed returns list of failed tasks."""
|
||||||
|
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
|
||||||
|
response = client.get("/pypi/cache/failed")
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert isinstance(data, list)
|
||||||
|
|
||||||
|
# If there are failed tasks, verify structure
|
||||||
|
if data:
|
||||||
|
task = data[0]
|
||||||
|
assert "id" in task
|
||||||
|
assert "package" in task
|
||||||
|
assert "error" in task
|
||||||
|
assert "attempts" in task
|
||||||
|
assert "depth" in task
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_cache_failed_with_limit(self):
|
||||||
|
"""Test GET /pypi/cache/failed respects limit parameter."""
|
||||||
|
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
|
||||||
|
response = client.get("/pypi/cache/failed?limit=5")
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert isinstance(data, list)
|
||||||
|
assert len(data) <= 5
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_cache_retry_nonexistent_package(self):
|
||||||
|
"""Test POST /pypi/cache/retry/{package} returns 404 for unknown package."""
|
||||||
|
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
|
||||||
|
# Use a random package name that definitely doesn't exist
|
||||||
|
response = client.post(f"/pypi/cache/retry/nonexistent-package-{uuid4().hex[:8]}")
|
||||||
|
assert response.status_code == 404
|
||||||
|
# Check for "no failed" or "not found" in error message
|
||||||
|
detail = response.json()["detail"].lower()
|
||||||
|
assert "no failed" in detail or "not found" in detail
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_cache_retry_all_endpoint(self):
|
||||||
|
"""Test POST /pypi/cache/retry-all returns success."""
|
||||||
|
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
|
||||||
|
response = client.post("/pypi/cache/retry-all")
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert "count" in data
|
||||||
|
assert "message" in data
|
||||||
|
assert isinstance(data["count"], int)
|
||||||
|
assert data["count"] >= 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestCacheTaskDeduplication:
|
||||||
|
"""Tests for cache task deduplication logic."""
|
||||||
|
|
||||||
|
def test_find_cached_package_returns_none_for_uncached(self):
|
||||||
|
"""Test that _find_cached_package returns None for uncached packages."""
|
||||||
|
# This is a unit test pattern - mock the database
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
mock_db = MagicMock()
|
||||||
|
mock_db.query.return_value.filter.return_value.first.return_value = None
|
||||||
|
|
||||||
|
from app.pypi_cache_worker import _find_cached_package
|
||||||
|
|
||||||
|
result = _find_cached_package(mock_db, "nonexistent-package")
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestCacheWorkerConfiguration:
|
||||||
|
"""Tests for cache worker configuration."""
|
||||||
|
|
||||||
|
def test_config_settings_exist(self):
|
||||||
|
"""Test that PyPI cache config settings are available."""
|
||||||
|
from app.config import get_settings
|
||||||
|
|
||||||
|
settings = get_settings()
|
||||||
|
|
||||||
|
# Check that settings exist and have reasonable defaults
|
||||||
|
assert hasattr(settings, "pypi_cache_workers")
|
||||||
|
assert hasattr(settings, "pypi_cache_max_depth")
|
||||||
|
assert hasattr(settings, "pypi_cache_max_attempts")
|
||||||
|
|
||||||
|
# Check aliases work
|
||||||
|
assert settings.PYPI_CACHE_WORKERS == settings.pypi_cache_workers
|
||||||
|
assert settings.PYPI_CACHE_MAX_DEPTH == settings.pypi_cache_max_depth
|
||||||
|
assert settings.PYPI_CACHE_MAX_ATTEMPTS == settings.pypi_cache_max_attempts
|
||||||
|
|
||||||
|
def test_config_default_values(self):
|
||||||
|
"""Test that PyPI cache config has sensible defaults."""
|
||||||
|
from app.config import get_settings
|
||||||
|
|
||||||
|
settings = get_settings()
|
||||||
|
|
||||||
|
# These are the defaults from our implementation
|
||||||
|
assert settings.pypi_cache_workers == 5
|
||||||
|
assert settings.pypi_cache_max_depth == 10
|
||||||
|
assert settings.pypi_cache_max_attempts == 3
|
||||||
|
|
||||||
|
|
||||||
|
class TestFetchAndCachePackage:
|
||||||
|
"""Tests for _fetch_and_cache_package function."""
|
||||||
|
|
||||||
|
def test_result_structure_success(self):
|
||||||
|
"""Test that success result has correct structure."""
|
||||||
|
# Mock a successful result
|
||||||
|
result = {"success": True, "artifact_id": "abc123"}
|
||||||
|
|
||||||
|
assert result["success"] is True
|
||||||
|
assert "artifact_id" in result
|
||||||
|
|
||||||
|
def test_result_structure_failure(self):
|
||||||
|
"""Test that failure result has correct structure."""
|
||||||
|
# Mock a failure result
|
||||||
|
result = {"success": False, "error": "Package not found"}
|
||||||
|
|
||||||
|
assert result["success"] is False
|
||||||
|
assert "error" in result
|
||||||
|
|
||||||
|
|
||||||
|
class TestWorkerPoolLifecycle:
|
||||||
|
"""Tests for worker pool initialization and shutdown."""
|
||||||
|
|
||||||
|
def test_init_shutdown_cycle(self):
|
||||||
|
"""Test that worker pool can be initialized and shut down cleanly."""
|
||||||
|
from app.pypi_cache_worker import (
|
||||||
|
init_cache_worker_pool,
|
||||||
|
shutdown_cache_worker_pool,
|
||||||
|
_cache_worker_pool,
|
||||||
|
_cache_worker_running,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Note: We can't fully test this in isolation because the module
|
||||||
|
# has global state and may conflict with the running server.
|
||||||
|
# These tests verify the function signatures work.
|
||||||
|
|
||||||
|
# The pool should be initialized by main.py on startup
|
||||||
|
# We just verify the functions are callable
|
||||||
|
assert callable(init_cache_worker_pool)
|
||||||
|
assert callable(shutdown_cache_worker_pool)
|
||||||
251
docs/plans/2026-02-02-pypi-cache-robustness-design.md
Normal file
251
docs/plans/2026-02-02-pypi-cache-robustness-design.md
Normal file
@@ -0,0 +1,251 @@
|
|||||||
|
# PyPI Cache Robustness Design
|
||||||
|
|
||||||
|
**Date:** 2026-02-02
|
||||||
|
**Status:** Approved
|
||||||
|
**Branch:** fix/pypi-proxy-timeout
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
The current PyPI proxy proactive caching has reliability issues:
|
||||||
|
- Unbounded thread spawning for each dependency
|
||||||
|
- Silent failures (logged but not tracked or retried)
|
||||||
|
- No visibility into cache completeness
|
||||||
|
- Deps-of-deps often missing due to untracked failures
|
||||||
|
|
||||||
|
## Solution
|
||||||
|
|
||||||
|
Database-backed task queue with managed worker pool, automatic retries, and visibility API.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Data Model
|
||||||
|
|
||||||
|
New table `pypi_cache_tasks`:
|
||||||
|
|
||||||
|
```sql
|
||||||
|
CREATE TABLE pypi_cache_tasks (
|
||||||
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
|
||||||
|
-- What to cache
|
||||||
|
package_name VARCHAR(255) NOT NULL,
|
||||||
|
version_constraint VARCHAR(255),
|
||||||
|
|
||||||
|
-- Origin tracking
|
||||||
|
parent_task_id UUID REFERENCES pypi_cache_tasks(id) ON DELETE SET NULL,
|
||||||
|
depth INTEGER NOT NULL DEFAULT 0,
|
||||||
|
triggered_by_artifact VARCHAR(64),
|
||||||
|
|
||||||
|
-- Status
|
||||||
|
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
||||||
|
attempts INTEGER NOT NULL DEFAULT 0,
|
||||||
|
max_attempts INTEGER NOT NULL DEFAULT 3,
|
||||||
|
|
||||||
|
-- Results
|
||||||
|
cached_artifact_id VARCHAR(64),
|
||||||
|
error_message TEXT,
|
||||||
|
|
||||||
|
-- Timing
|
||||||
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||||
|
started_at TIMESTAMP WITH TIME ZONE,
|
||||||
|
completed_at TIMESTAMP WITH TIME ZONE,
|
||||||
|
next_retry_at TIMESTAMP WITH TIME ZONE
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Indexes
|
||||||
|
CREATE INDEX idx_pypi_cache_tasks_status_retry ON pypi_cache_tasks(status, next_retry_at);
|
||||||
|
CREATE INDEX idx_pypi_cache_tasks_package_status ON pypi_cache_tasks(package_name, status);
|
||||||
|
CREATE INDEX idx_pypi_cache_tasks_parent ON pypi_cache_tasks(parent_task_id);
|
||||||
|
|
||||||
|
-- Constraints
|
||||||
|
ALTER TABLE pypi_cache_tasks ADD CONSTRAINT check_task_status
|
||||||
|
CHECK (status IN ('pending', 'in_progress', 'completed', 'failed'));
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Worker Architecture
|
||||||
|
|
||||||
|
### Thread Pool (5 workers default)
|
||||||
|
|
||||||
|
```python
|
||||||
|
_cache_worker_pool: ThreadPoolExecutor = None
|
||||||
|
_cache_worker_running: bool = False
|
||||||
|
|
||||||
|
def init_cache_worker_pool(max_workers: int = 5):
|
||||||
|
global _cache_worker_pool, _cache_worker_running
|
||||||
|
_cache_worker_pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="pypi-cache-")
|
||||||
|
_cache_worker_running = True
|
||||||
|
threading.Thread(target=_cache_dispatcher_loop, daemon=True).start()
|
||||||
|
```
|
||||||
|
|
||||||
|
### Dispatcher Loop
|
||||||
|
|
||||||
|
- Polls DB every 2 seconds when idle
|
||||||
|
- Fetches batch of 10 ready tasks
|
||||||
|
- Marks tasks in_progress before submitting to pool
|
||||||
|
- Orders by depth (shallow first) then FIFO
|
||||||
|
|
||||||
|
### Task Processing
|
||||||
|
|
||||||
|
1. Dedup check - skip if package already cached
|
||||||
|
2. Dedup check - skip if pending/in_progress task exists for same package
|
||||||
|
3. Depth check - fail if >= 10 levels deep
|
||||||
|
4. Fetch package index page
|
||||||
|
5. Download best matching file (prefer wheels)
|
||||||
|
6. Store artifact, extract dependencies
|
||||||
|
7. Queue child tasks for each dependency
|
||||||
|
8. Mark completed or handle failure
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Retry Logic
|
||||||
|
|
||||||
|
Exponential backoff with 3 attempts:
|
||||||
|
|
||||||
|
| Attempt | Backoff |
|
||||||
|
|---------|---------|
|
||||||
|
| 1 fails | 30 seconds |
|
||||||
|
| 2 fails | 60 seconds |
|
||||||
|
| 3 fails | Permanent failure |
|
||||||
|
|
||||||
|
```python
|
||||||
|
backoff_seconds = 30 * (2 ** (attempts - 1))
|
||||||
|
task.next_retry_at = datetime.utcnow() + timedelta(seconds=backoff_seconds)
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## API Endpoints
|
||||||
|
|
||||||
|
| Endpoint | Method | Purpose |
|
||||||
|
|----------|--------|---------|
|
||||||
|
| `/pypi/cache/status` | GET | Queue health summary |
|
||||||
|
| `/pypi/cache/failed` | GET | List failed tasks with errors |
|
||||||
|
| `/pypi/cache/retry/{package}` | POST | Retry single failed package |
|
||||||
|
| `/pypi/cache/retry-all` | POST | Retry all failed packages |
|
||||||
|
|
||||||
|
### Response Examples
|
||||||
|
|
||||||
|
**GET /pypi/cache/status**
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"pending": 12,
|
||||||
|
"in_progress": 3,
|
||||||
|
"completed": 847,
|
||||||
|
"failed": 5
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**GET /pypi/cache/failed**
|
||||||
|
```json
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"package": "some-obscure-pkg",
|
||||||
|
"error": "Timeout connecting to upstream",
|
||||||
|
"attempts": 3,
|
||||||
|
"failed_at": "2026-02-02T10:30:00Z"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Integration Points
|
||||||
|
|
||||||
|
### Replace Thread Spawning (pypi_proxy.py)
|
||||||
|
|
||||||
|
```python
|
||||||
|
# OLD: _start_background_dependency_caching(base_url, unique_deps)
|
||||||
|
|
||||||
|
# NEW:
|
||||||
|
for dep_name, dep_version in unique_deps:
|
||||||
|
_enqueue_cache_task(
|
||||||
|
db,
|
||||||
|
package_name=dep_name,
|
||||||
|
version_constraint=dep_version,
|
||||||
|
parent_task_id=None,
|
||||||
|
depth=0,
|
||||||
|
triggered_by_artifact=sha256,
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
### App Startup (main.py)
|
||||||
|
|
||||||
|
```python
|
||||||
|
@app.on_event("startup")
|
||||||
|
async def startup():
|
||||||
|
init_cache_worker_pool(max_workers=settings.PYPI_CACHE_WORKERS)
|
||||||
|
|
||||||
|
@app.on_event("shutdown")
|
||||||
|
async def shutdown():
|
||||||
|
shutdown_cache_worker_pool()
|
||||||
|
```
|
||||||
|
|
||||||
|
### Configuration (config.py)
|
||||||
|
|
||||||
|
```python
|
||||||
|
PYPI_CACHE_WORKERS = int(os.getenv("ORCHARD_PYPI_CACHE_WORKERS", "5"))
|
||||||
|
PYPI_CACHE_MAX_DEPTH = int(os.getenv("ORCHARD_PYPI_CACHE_MAX_DEPTH", "10"))
|
||||||
|
PYPI_CACHE_MAX_ATTEMPTS = int(os.getenv("ORCHARD_PYPI_CACHE_MAX_ATTEMPTS", "3"))
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Files to Create/Modify
|
||||||
|
|
||||||
|
| File | Action |
|
||||||
|
|------|--------|
|
||||||
|
| `migrations/0XX_pypi_cache_tasks.sql` | Create - new table |
|
||||||
|
| `backend/app/models.py` | Modify - add PyPICacheTask model |
|
||||||
|
| `backend/app/pypi_cache_worker.py` | Create - worker pool + processing |
|
||||||
|
| `backend/app/pypi_proxy.py` | Modify - replace threads, add API |
|
||||||
|
| `backend/app/main.py` | Modify - init worker on startup |
|
||||||
|
| `backend/app/config.py` | Modify - add config variables |
|
||||||
|
| `backend/tests/test_pypi_cache_worker.py` | Create - unit tests |
|
||||||
|
| `backend/tests/integration/test_pypi_cache_api.py` | Create - API tests |
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Deduplication Strategy
|
||||||
|
|
||||||
|
### At Task Creation Time
|
||||||
|
|
||||||
|
```python
|
||||||
|
def _enqueue_cache_task(db, package_name, ...):
|
||||||
|
# Check for existing pending/in_progress task
|
||||||
|
existing_task = db.query(PyPICacheTask).filter(
|
||||||
|
PyPICacheTask.package_name == package_name,
|
||||||
|
PyPICacheTask.status.in_(["pending", "in_progress"])
|
||||||
|
).first()
|
||||||
|
if existing_task:
|
||||||
|
return existing_task
|
||||||
|
|
||||||
|
# Check if already cached
|
||||||
|
if _find_cached_package(db, package_name):
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Create new task
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
### At Processing Time (safety check)
|
||||||
|
|
||||||
|
```python
|
||||||
|
def _process_cache_task(task_id):
|
||||||
|
# Double-check in case of race
|
||||||
|
if _find_cached_package(db, task.package_name):
|
||||||
|
_mark_task_completed(db, task, cached_artifact_id=existing.artifact_id)
|
||||||
|
return
|
||||||
|
```
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Success Criteria
|
||||||
|
|
||||||
|
- [ ] No unbounded thread creation
|
||||||
|
- [ ] All dependency caching attempts tracked in database
|
||||||
|
- [ ] Failed tasks automatically retry with backoff
|
||||||
|
- [ ] API provides visibility into queue status
|
||||||
|
- [ ] Manual retry capability for failed packages
|
||||||
|
- [ ] Existing pip install workflow unchanged (transparent)
|
||||||
|
- [ ] Tests cover worker, retry, and API functionality
|
||||||
55
migrations/011_pypi_cache_tasks.sql
Normal file
55
migrations/011_pypi_cache_tasks.sql
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
-- Migration: 011_pypi_cache_tasks
|
||||||
|
-- Description: Add table for tracking PyPI dependency caching tasks
|
||||||
|
-- Date: 2026-02-02
|
||||||
|
|
||||||
|
-- Table for tracking PyPI cache tasks with retry support
|
||||||
|
CREATE TABLE pypi_cache_tasks (
|
||||||
|
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||||
|
|
||||||
|
-- What to cache
|
||||||
|
package_name VARCHAR(255) NOT NULL,
|
||||||
|
version_constraint VARCHAR(255),
|
||||||
|
|
||||||
|
-- Origin tracking
|
||||||
|
parent_task_id UUID REFERENCES pypi_cache_tasks(id) ON DELETE SET NULL,
|
||||||
|
depth INTEGER NOT NULL DEFAULT 0,
|
||||||
|
triggered_by_artifact VARCHAR(64) REFERENCES artifacts(id) ON DELETE SET NULL,
|
||||||
|
|
||||||
|
-- Status
|
||||||
|
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
||||||
|
attempts INTEGER NOT NULL DEFAULT 0,
|
||||||
|
max_attempts INTEGER NOT NULL DEFAULT 3,
|
||||||
|
|
||||||
|
-- Results
|
||||||
|
cached_artifact_id VARCHAR(64) REFERENCES artifacts(id) ON DELETE SET NULL,
|
||||||
|
error_message TEXT,
|
||||||
|
|
||||||
|
-- Timing
|
||||||
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
||||||
|
started_at TIMESTAMP WITH TIME ZONE,
|
||||||
|
completed_at TIMESTAMP WITH TIME ZONE,
|
||||||
|
next_retry_at TIMESTAMP WITH TIME ZONE,
|
||||||
|
|
||||||
|
-- Constraints
|
||||||
|
CONSTRAINT check_task_status CHECK (status IN ('pending', 'in_progress', 'completed', 'failed')),
|
||||||
|
CONSTRAINT check_depth_non_negative CHECK (depth >= 0),
|
||||||
|
CONSTRAINT check_attempts_non_negative CHECK (attempts >= 0)
|
||||||
|
);
|
||||||
|
|
||||||
|
-- Index for finding tasks ready to process (pending with retry time passed)
|
||||||
|
CREATE INDEX idx_pypi_cache_tasks_status_retry ON pypi_cache_tasks(status, next_retry_at);
|
||||||
|
|
||||||
|
-- Index for deduplication check (is this package already queued?)
|
||||||
|
CREATE INDEX idx_pypi_cache_tasks_package_status ON pypi_cache_tasks(package_name, status);
|
||||||
|
|
||||||
|
-- Index for tracing dependency chains
|
||||||
|
CREATE INDEX idx_pypi_cache_tasks_parent ON pypi_cache_tasks(parent_task_id);
|
||||||
|
|
||||||
|
-- Index for finding tasks by artifact that triggered them
|
||||||
|
CREATE INDEX idx_pypi_cache_tasks_triggered_by ON pypi_cache_tasks(triggered_by_artifact);
|
||||||
|
|
||||||
|
-- Index for finding tasks by cached artifact
|
||||||
|
CREATE INDEX idx_pypi_cache_tasks_cached_artifact ON pypi_cache_tasks(cached_artifact_id);
|
||||||
|
|
||||||
|
-- Index for sorting by depth and creation time (processing order)
|
||||||
|
CREATE INDEX idx_pypi_cache_tasks_depth_created ON pypi_cache_tasks(depth, created_at);
|
||||||
Reference in New Issue
Block a user