diff --git a/backend/app/config.py b/backend/app/config.py index a3e18c9..be8b20b 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -64,6 +64,11 @@ class Settings(BaseSettings): # Global cache settings override (None = use DB value, True/False = override DB) cache_auto_create_system_projects: Optional[bool] = None # Override auto_create_system_projects + # PyPI Cache Worker settings + pypi_cache_workers: int = 5 # Number of concurrent cache workers + pypi_cache_max_depth: int = 10 # Maximum recursion depth for dependency caching + pypi_cache_max_attempts: int = 3 # Maximum retry attempts for failed cache tasks + # JWT Authentication settings (optional, for external identity providers) jwt_enabled: bool = False # Enable JWT token validation jwt_secret: str = "" # Secret key for HS256, or leave empty for RS256 with JWKS @@ -88,6 +93,24 @@ class Settings(BaseSettings): def is_production(self) -> bool: return self.env.lower() == "production" + @property + def PORT(self) -> int: + """Alias for server_port for compatibility.""" + return self.server_port + + # Uppercase aliases for PyPI cache settings (for backward compatibility) + @property + def PYPI_CACHE_WORKERS(self) -> int: + return self.pypi_cache_workers + + @property + def PYPI_CACHE_MAX_DEPTH(self) -> int: + return self.pypi_cache_max_depth + + @property + def PYPI_CACHE_MAX_ATTEMPTS(self) -> int: + return self.pypi_cache_max_attempts + class Config: env_prefix = "ORCHARD_" case_sensitive = False diff --git a/backend/app/main.py b/backend/app/main.py index 08b253e..4b8bec8 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -15,6 +15,7 @@ from .pypi_proxy import router as pypi_router from .seed import seed_database from .auth import create_default_admin from .rate_limit import limiter +from .pypi_cache_worker import init_cache_worker_pool, shutdown_cache_worker_pool settings = get_settings() logging.basicConfig(level=logging.INFO) @@ -49,8 +50,13 @@ async def lifespan(app: FastAPI): else: logger.info(f"Running in {settings.env} mode - skipping seed data") + # Initialize PyPI cache worker pool + init_cache_worker_pool() + yield - # Shutdown: cleanup if needed + + # Shutdown: cleanup + shutdown_cache_worker_pool() app = FastAPI( diff --git a/backend/app/models.py b/backend/app/models.py index 500d318..17f5e13 100644 --- a/backend/app/models.py +++ b/backend/app/models.py @@ -803,3 +803,70 @@ class CachedUrl(Base): return hashlib.sha256(url.encode("utf-8")).hexdigest() +class PyPICacheTask(Base): + """Task for caching a PyPI package and its dependencies. + + Tracks the status of background caching operations with retry support. + Used by the PyPI proxy to ensure reliable dependency caching. + """ + + __tablename__ = "pypi_cache_tasks" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + + # What to cache + package_name = Column(String(255), nullable=False) + version_constraint = Column(String(255)) + + # Origin tracking + parent_task_id = Column( + UUID(as_uuid=True), + ForeignKey("pypi_cache_tasks.id", ondelete="SET NULL"), + ) + depth = Column(Integer, nullable=False, default=0) + triggered_by_artifact = Column( + String(64), + ForeignKey("artifacts.id", ondelete="SET NULL"), + ) + + # Status + status = Column(String(20), nullable=False, default="pending") + attempts = Column(Integer, nullable=False, default=0) + max_attempts = Column(Integer, nullable=False, default=3) + + # Results + cached_artifact_id = Column( + String(64), + ForeignKey("artifacts.id", ondelete="SET NULL"), + ) + error_message = Column(Text) + + # Timing + created_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow) + started_at = Column(DateTime(timezone=True)) + completed_at = Column(DateTime(timezone=True)) + next_retry_at = Column(DateTime(timezone=True)) + + # Relationships + parent_task = relationship( + "PyPICacheTask", + remote_side=[id], + backref="child_tasks", + ) + + __table_args__ = ( + Index("idx_pypi_cache_tasks_status_retry", "status", "next_retry_at"), + Index("idx_pypi_cache_tasks_package_status", "package_name", "status"), + Index("idx_pypi_cache_tasks_parent", "parent_task_id"), + Index("idx_pypi_cache_tasks_triggered_by", "triggered_by_artifact"), + Index("idx_pypi_cache_tasks_cached_artifact", "cached_artifact_id"), + Index("idx_pypi_cache_tasks_depth_created", "depth", "created_at"), + CheckConstraint( + "status IN ('pending', 'in_progress', 'completed', 'failed')", + name="check_task_status", + ), + CheckConstraint("depth >= 0", name="check_depth_non_negative"), + CheckConstraint("attempts >= 0", name="check_attempts_non_negative"), + ) + + diff --git a/backend/app/pypi_cache_worker.py b/backend/app/pypi_cache_worker.py new file mode 100644 index 0000000..1c8f82f --- /dev/null +++ b/backend/app/pypi_cache_worker.py @@ -0,0 +1,576 @@ +""" +PyPI cache worker module. + +Manages a thread pool for background caching of PyPI packages and their dependencies. +Replaces unbounded thread spawning with a managed queue-based approach. +""" + +import logging +import re +import threading +import time +from concurrent.futures import ThreadPoolExecutor +from datetime import datetime, timedelta +from typing import List, Optional +from uuid import UUID + +import httpx +from sqlalchemy import or_ +from sqlalchemy.orm import Session + +from .config import get_settings + +settings = get_settings() +from .database import SessionLocal +from .models import PyPICacheTask, Package, Project, Tag + +logger = logging.getLogger(__name__) + +# Module-level worker pool state +_cache_worker_pool: Optional[ThreadPoolExecutor] = None +_cache_worker_running: bool = False +_dispatcher_thread: Optional[threading.Thread] = None + + +def init_cache_worker_pool(max_workers: Optional[int] = None): + """ + Initialize the cache worker pool. Called on app startup. + + Args: + max_workers: Number of concurrent workers. Defaults to PYPI_CACHE_WORKERS setting. + """ + global _cache_worker_pool, _cache_worker_running, _dispatcher_thread + + if _cache_worker_pool is not None: + logger.warning("Cache worker pool already initialized") + return + + workers = max_workers or settings.PYPI_CACHE_WORKERS + _cache_worker_pool = ThreadPoolExecutor( + max_workers=workers, + thread_name_prefix="pypi-cache-", + ) + _cache_worker_running = True + + # Start the dispatcher thread + _dispatcher_thread = threading.Thread( + target=_cache_dispatcher_loop, + daemon=True, + name="pypi-cache-dispatcher", + ) + _dispatcher_thread.start() + + logger.info(f"PyPI cache worker pool initialized with {workers} workers") + + +def shutdown_cache_worker_pool(wait: bool = True, timeout: float = 30.0): + """ + Shutdown the cache worker pool gracefully. + + Args: + wait: Whether to wait for pending tasks to complete. + timeout: Maximum time to wait for shutdown. + """ + global _cache_worker_pool, _cache_worker_running, _dispatcher_thread + + if _cache_worker_pool is None: + return + + logger.info("Shutting down PyPI cache worker pool...") + _cache_worker_running = False + + # Wait for dispatcher to stop + if _dispatcher_thread and _dispatcher_thread.is_alive(): + _dispatcher_thread.join(timeout=5.0) + + # Shutdown thread pool + _cache_worker_pool.shutdown(wait=wait, cancel_futures=not wait) + _cache_worker_pool = None + _dispatcher_thread = None + + logger.info("PyPI cache worker pool shut down") + + +def _cache_dispatcher_loop(): + """ + Main dispatcher loop: poll DB for pending tasks and submit to worker pool. + """ + logger.info("PyPI cache dispatcher started") + + while _cache_worker_running: + try: + db = SessionLocal() + try: + tasks = _get_ready_tasks(db, limit=10) + + for task in tasks: + # Mark in_progress before submitting + task.status = "in_progress" + task.started_at = datetime.utcnow() + db.commit() + + # Submit to worker pool + _cache_worker_pool.submit(_process_cache_task, task.id) + + # Sleep if no work (avoid busy loop) + if not tasks: + time.sleep(2.0) + else: + # Small delay between batches to avoid overwhelming + time.sleep(0.1) + + finally: + db.close() + + except Exception as e: + logger.error(f"PyPI cache dispatcher error: {e}") + time.sleep(5.0) + + logger.info("PyPI cache dispatcher stopped") + + +def _get_ready_tasks(db: Session, limit: int = 10) -> List[PyPICacheTask]: + """ + Get tasks ready to process. + + Returns pending tasks that are either new or ready for retry. + Orders by depth (shallow first) then creation time (FIFO). + """ + now = datetime.utcnow() + return ( + db.query(PyPICacheTask) + .filter( + PyPICacheTask.status == "pending", + or_( + PyPICacheTask.next_retry_at == None, # New tasks + PyPICacheTask.next_retry_at <= now, # Retry tasks ready + ), + ) + .order_by( + PyPICacheTask.depth.asc(), # Prefer shallow deps first + PyPICacheTask.created_at.asc(), # FIFO within same depth + ) + .limit(limit) + .all() + ) + + +def _process_cache_task(task_id: UUID): + """ + Process a single cache task. Called by worker pool. + + Args: + task_id: The ID of the task to process. + """ + db = SessionLocal() + try: + task = db.query(PyPICacheTask).filter(PyPICacheTask.id == task_id).first() + if not task: + logger.warning(f"PyPI cache task {task_id} not found") + return + + logger.info( + f"Processing cache task: {task.package_name} " + f"(depth={task.depth}, attempt={task.attempts + 1})" + ) + + # Check if already cached by another task (dedup) + existing_artifact = _find_cached_package(db, task.package_name) + if existing_artifact: + logger.info(f"Package {task.package_name} already cached, skipping") + _mark_task_completed(db, task, cached_artifact_id=existing_artifact) + return + + # Check depth limit + max_depth = settings.PYPI_CACHE_MAX_DEPTH + if task.depth >= max_depth: + _mark_task_failed(db, task, f"Max depth {max_depth} exceeded") + return + + # Do the actual caching + result = _fetch_and_cache_package(task.package_name, task.version_constraint) + + if result["success"]: + _mark_task_completed(db, task, cached_artifact_id=result.get("artifact_id")) + logger.info(f"Successfully cached {task.package_name}") + else: + _handle_task_failure(db, task, result["error"]) + + except Exception as e: + logger.exception(f"Error processing cache task {task_id}") + db = SessionLocal() # Get fresh session after exception + try: + task = db.query(PyPICacheTask).filter(PyPICacheTask.id == task_id).first() + if task: + _handle_task_failure(db, task, str(e)) + finally: + db.close() + finally: + db.close() + + +def _find_cached_package(db: Session, package_name: str) -> Optional[str]: + """ + Check if a package is already cached. + + Args: + db: Database session. + package_name: Normalized package name. + + Returns: + Artifact ID if cached, None otherwise. + """ + # Normalize package name (PEP 503) + normalized = re.sub(r"[-_.]+", "-", package_name).lower() + + # Check if _pypi project has this package with at least one tag + system_project = db.query(Project).filter(Project.name == "_pypi").first() + if not system_project: + return None + + package = ( + db.query(Package) + .filter( + Package.project_id == system_project.id, + Package.name == normalized, + ) + .first() + ) + if not package: + return None + + # Check if package has any tags (cached files) + tag = db.query(Tag).filter(Tag.package_id == package.id).first() + if tag: + return tag.artifact_id + + return None + + +def _fetch_and_cache_package( + package_name: str, + version_constraint: Optional[str] = None, +) -> dict: + """ + Fetch and cache a PyPI package by making requests through our own proxy. + + Args: + package_name: The package name to cache. + version_constraint: Optional version constraint (currently not used for selection). + + Returns: + Dict with "success" bool, "artifact_id" on success, "error" on failure. + """ + # Normalize package name (PEP 503) + normalized_name = re.sub(r"[-_.]+", "-", package_name).lower() + + # Build the URL to our own proxy + # Use localhost since we're making internal requests + base_url = f"http://localhost:{settings.PORT}" + + try: + with httpx.Client(timeout=60.0, follow_redirects=True) as client: + # Step 1: Get the simple index page + simple_url = f"{base_url}/pypi/simple/{normalized_name}/" + logger.debug(f"Fetching index: {simple_url}") + + response = client.get(simple_url) + if response.status_code == 404: + return {"success": False, "error": f"Package {package_name} not found on upstream"} + if response.status_code != 200: + return {"success": False, "error": f"Failed to get index: HTTP {response.status_code}"} + + # Step 2: Parse HTML to find downloadable files + html = response.text + + # Create pattern that matches both normalized (hyphens) and original (underscores) + name_pattern = re.sub(r"[-_]+", "[-_]+", normalized_name) + + # Look for wheel files first (preferred) + wheel_pattern = rf'href="([^"]*{name_pattern}[^"]*\.whl[^"]*)"' + matches = re.findall(wheel_pattern, html, re.IGNORECASE) + + if not matches: + # Fall back to sdist + sdist_pattern = rf'href="([^"]*{name_pattern}[^"]*\.tar\.gz[^"]*)"' + matches = re.findall(sdist_pattern, html, re.IGNORECASE) + + if not matches: + logger.warning( + f"No downloadable files found for {package_name}. " + f"Pattern: {wheel_pattern}, HTML preview: {html[:500]}" + ) + return {"success": False, "error": "No downloadable files found"} + + # Get the last match (usually latest version) + download_url = matches[-1] + + # Make URL absolute if needed + if download_url.startswith("/"): + download_url = f"{base_url}{download_url}" + elif not download_url.startswith("http"): + download_url = f"{base_url}/pypi/simple/{normalized_name}/{download_url}" + + # Step 3: Download the file through our proxy (this caches it) + logger.debug(f"Downloading: {download_url}") + response = client.get(download_url) + + if response.status_code != 200: + return {"success": False, "error": f"Download failed: HTTP {response.status_code}"} + + # Get artifact ID from response header + artifact_id = response.headers.get("X-Checksum-SHA256") + + return {"success": True, "artifact_id": artifact_id} + + except httpx.TimeoutException as e: + return {"success": False, "error": f"Timeout: {e}"} + except httpx.ConnectError as e: + return {"success": False, "error": f"Connection failed: {e}"} + except Exception as e: + return {"success": False, "error": str(e)} + + +def _mark_task_completed( + db: Session, + task: PyPICacheTask, + cached_artifact_id: Optional[str] = None, +): + """Mark a task as completed.""" + task.status = "completed" + task.completed_at = datetime.utcnow() + task.cached_artifact_id = cached_artifact_id + task.error_message = None + db.commit() + + +def _mark_task_failed(db: Session, task: PyPICacheTask, error: str): + """Mark a task as permanently failed.""" + task.status = "failed" + task.completed_at = datetime.utcnow() + task.error_message = error[:1000] if error else None + db.commit() + logger.warning(f"PyPI cache task failed permanently: {task.package_name} - {error}") + + +def _handle_task_failure(db: Session, task: PyPICacheTask, error: str): + """ + Handle a failed cache attempt with exponential backoff. + + Args: + db: Database session. + task: The failed task. + error: Error message. + """ + task.attempts += 1 + task.error_message = error[:1000] if error else None + + max_attempts = task.max_attempts or settings.PYPI_CACHE_MAX_ATTEMPTS + + if task.attempts >= max_attempts: + # Give up after max attempts + task.status = "failed" + task.completed_at = datetime.utcnow() + logger.warning( + f"PyPI cache task failed permanently: {task.package_name} - {error} " + f"(after {task.attempts} attempts)" + ) + else: + # Schedule retry with exponential backoff + # Attempt 1 failed → retry in 30s + # Attempt 2 failed → retry in 60s + # Attempt 3 failed → permanent failure (if max_attempts=3) + backoff_seconds = 30 * (2 ** (task.attempts - 1)) + task.status = "pending" + task.next_retry_at = datetime.utcnow() + timedelta(seconds=backoff_seconds) + logger.info( + f"PyPI cache task will retry: {task.package_name} in {backoff_seconds}s " + f"(attempt {task.attempts}/{max_attempts})" + ) + + db.commit() + + +def enqueue_cache_task( + db: Session, + package_name: str, + version_constraint: Optional[str] = None, + parent_task_id: Optional[UUID] = None, + depth: int = 0, + triggered_by_artifact: Optional[str] = None, +) -> Optional[PyPICacheTask]: + """ + Enqueue a package for caching. + + Performs deduplication: won't create a task if one already exists + for the same package in pending/in_progress state, or if the package + is already cached. + + Args: + db: Database session. + package_name: The package name to cache. + version_constraint: Optional version constraint. + parent_task_id: Parent task that spawned this one. + depth: Recursion depth. + triggered_by_artifact: Artifact that declared this dependency. + + Returns: + The created or existing task, or None if already cached. + """ + # Normalize package name (PEP 503) + normalized = re.sub(r"[-_.]+", "-", package_name).lower() + + # Check for existing pending/in_progress task + existing_task = ( + db.query(PyPICacheTask) + .filter( + PyPICacheTask.package_name == normalized, + PyPICacheTask.status.in_(["pending", "in_progress"]), + ) + .first() + ) + if existing_task: + logger.debug(f"Task already exists for {normalized}: {existing_task.id}") + return existing_task + + # Check if already cached + if _find_cached_package(db, normalized): + logger.debug(f"Package {normalized} already cached, skipping task creation") + return None + + # Create new task + task = PyPICacheTask( + package_name=normalized, + version_constraint=version_constraint, + parent_task_id=parent_task_id, + depth=depth, + triggered_by_artifact=triggered_by_artifact, + max_attempts=settings.PYPI_CACHE_MAX_ATTEMPTS, + ) + db.add(task) + db.flush() + + logger.info(f"Enqueued cache task for {normalized} (depth={depth})") + return task + + +def get_cache_status(db: Session) -> dict: + """ + Get summary of cache task queue status. + + Returns: + Dict with counts by status. + """ + from sqlalchemy import func + + stats = ( + db.query(PyPICacheTask.status, func.count(PyPICacheTask.id)) + .group_by(PyPICacheTask.status) + .all() + ) + + return { + "pending": next((s[1] for s in stats if s[0] == "pending"), 0), + "in_progress": next((s[1] for s in stats if s[0] == "in_progress"), 0), + "completed": next((s[1] for s in stats if s[0] == "completed"), 0), + "failed": next((s[1] for s in stats if s[0] == "failed"), 0), + } + + +def get_failed_tasks(db: Session, limit: int = 50) -> List[dict]: + """ + Get list of failed tasks for debugging. + + Args: + db: Database session. + limit: Maximum number of tasks to return. + + Returns: + List of failed task info dicts. + """ + tasks = ( + db.query(PyPICacheTask) + .filter(PyPICacheTask.status == "failed") + .order_by(PyPICacheTask.completed_at.desc()) + .limit(limit) + .all() + ) + + return [ + { + "id": str(task.id), + "package": task.package_name, + "error": task.error_message, + "attempts": task.attempts, + "depth": task.depth, + "failed_at": task.completed_at.isoformat() if task.completed_at else None, + } + for task in tasks + ] + + +def retry_failed_task(db: Session, package_name: str) -> Optional[PyPICacheTask]: + """ + Reset a failed task to retry. + + Args: + db: Database session. + package_name: The package name to retry. + + Returns: + The reset task, or None if not found. + """ + normalized = re.sub(r"[-_.]+", "-", package_name).lower() + + task = ( + db.query(PyPICacheTask) + .filter( + PyPICacheTask.package_name == normalized, + PyPICacheTask.status == "failed", + ) + .first() + ) + + if not task: + return None + + task.status = "pending" + task.attempts = 0 + task.next_retry_at = None + task.error_message = None + task.started_at = None + task.completed_at = None + db.commit() + + logger.info(f"Reset failed task for retry: {normalized}") + return task + + +def retry_all_failed_tasks(db: Session) -> int: + """ + Reset all failed tasks to retry. + + Args: + db: Database session. + + Returns: + Number of tasks reset. + """ + count = ( + db.query(PyPICacheTask) + .filter(PyPICacheTask.status == "failed") + .update( + { + "status": "pending", + "attempts": 0, + "next_retry_at": None, + "error_message": None, + "started_at": None, + "completed_at": None, + } + ) + ) + db.commit() + + logger.info(f"Reset {count} failed tasks for retry") + return count diff --git a/backend/app/pypi_proxy.py b/backend/app/pypi_proxy.py index 87678eb..2b55f49 100644 --- a/backend/app/pypi_proxy.py +++ b/backend/app/pypi_proxy.py @@ -9,7 +9,6 @@ import hashlib import logging import re import tarfile -import threading import zipfile from io import BytesIO from typing import Optional, List, Tuple @@ -24,6 +23,13 @@ from .database import get_db from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, Tag, PackageVersion, ArtifactDependency from .storage import S3Storage, get_storage from .config import get_env_upstream_sources +from .pypi_cache_worker import ( + enqueue_cache_task, + get_cache_status, + get_failed_tasks, + retry_failed_task, + retry_all_failed_tasks, +) logger = logging.getLogger(__name__) @@ -503,93 +509,6 @@ async def pypi_package_versions( ) -def _cache_dependency_background( - base_url: str, - dep_name: str, - dep_version: Optional[str], - depth: int = 0, - max_depth: int = 10, -): - """ - Background task to proactively cache a dependency. - - Fetches the dependency from upstream via our own proxy, which will - recursively cache its dependencies as well. - """ - if depth >= max_depth: - logger.warning(f"PyPI proxy: max depth {max_depth} reached caching {dep_name}") - return - - try: - # Normalize package name for URL (PEP 503) - normalized_name = re.sub(r'[-_.]+', '-', dep_name).lower() - - # First, get the simple index page to find available versions - # Use HTTPS explicitly to avoid redirect issues that can drop trailing slashes - if base_url.startswith('http://'): - base_url = 'https://' + base_url[7:] - simple_url = f"{base_url}/pypi/simple/{normalized_name}/" - logger.info(f"PyPI proxy: proactively caching {dep_name} (depth={depth})") - - with httpx.Client(timeout=30.0, follow_redirects=True) as client: - response = client.get(simple_url) - if response.status_code != 200: - logger.warning(f"PyPI proxy: failed to get index for {dep_name}: {response.status_code}") - return - - # Parse the HTML to find wheel files - html = response.text - - # Create pattern that matches both normalized (hyphens) and original (underscores) forms - # PEP 503 normalizes to hyphens, but wheel filenames may use underscores - name_pattern = re.sub(r'[-_]+', '[-_]+', normalized_name) - - # Look for wheel files (.whl) - prefer them over sdist - wheel_pattern = rf'href="([^"]*{name_pattern}[^"]*\.whl[^"]*)"' - matches = re.findall(wheel_pattern, html, re.IGNORECASE) - - if not matches: - # Try sdist - sdist_pattern = rf'href="([^"]*{name_pattern}[^"]*\.tar\.gz[^"]*)"' - matches = re.findall(sdist_pattern, html, re.IGNORECASE) - - if not matches: - # Debug: log first 500 chars of HTML and the pattern we're looking for - logger.warning(f"PyPI proxy: no downloadable files found for {dep_name}. Pattern: {wheel_pattern}, HTML preview: {html[:500]}") - return - - # Get the last match (usually the latest version) - # The URL might be relative or absolute - download_url = matches[-1] - if download_url.startswith('/'): - download_url = f"{base_url}{download_url}" - elif not download_url.startswith('http'): - download_url = f"{base_url}/pypi/simple/{normalized_name}/{download_url}" - - # Download the file through our proxy (this will cache it) - logger.info(f"PyPI proxy: downloading dependency {dep_name} from {download_url}") - response = client.get(download_url) - if response.status_code == 200: - logger.info(f"PyPI proxy: successfully cached {dep_name}") - else: - logger.warning(f"PyPI proxy: failed to cache {dep_name}: {response.status_code}") - - except Exception as e: - logger.warning(f"PyPI proxy: error caching dependency {dep_name}: {e}") - - -def _start_background_dependency_caching(base_url: str, dependencies: List[Tuple[str, Optional[str]]]): - """Start background threads to cache dependencies.""" - for dep_name, dep_version in dependencies: - # Use a thread to avoid blocking - thread = threading.Thread( - target=_cache_dependency_background, - args=(base_url, dep_name, dep_version), - daemon=True, - ) - thread.start() - - @router.get("/simple/{package_name}/{filename}") async def pypi_download_file( request: Request, @@ -851,12 +770,20 @@ async def pypi_download_file( ) db.add(dep) - db.commit() - - # Proactively cache dependencies in the background + # Proactively cache dependencies via task queue if unique_deps: - base_url = str(request.base_url).rstrip("/") - _start_background_dependency_caching(base_url, unique_deps) + for dep_name, dep_version in unique_deps: + enqueue_cache_task( + db, + package_name=dep_name, + version_constraint=dep_version, + parent_task_id=None, # Top-level, triggered by user download + depth=0, + triggered_by_artifact=sha256, + ) + logger.info(f"PyPI proxy: queued {len(unique_deps)} dependencies for caching") + + db.commit() # Return the file return Response( @@ -879,3 +806,63 @@ async def pypi_download_file( except Exception as e: logger.exception(f"PyPI proxy: error downloading {filename}") raise HTTPException(status_code=500, detail=str(e)) + + +# ============================================================================= +# Cache Status and Management Endpoints +# ============================================================================= + + +@router.get("/cache/status") +async def pypi_cache_status(db: Session = Depends(get_db)): + """ + Get summary of the PyPI cache task queue. + + Returns counts of tasks by status (pending, in_progress, completed, failed). + """ + return get_cache_status(db) + + +@router.get("/cache/failed") +async def pypi_cache_failed( + limit: int = 50, + db: Session = Depends(get_db), +): + """ + Get list of failed cache tasks for debugging. + + Args: + limit: Maximum number of tasks to return (default 50). + """ + return get_failed_tasks(db, limit=limit) + + +@router.post("/cache/retry/{package_name}") +async def pypi_cache_retry( + package_name: str, + db: Session = Depends(get_db), +): + """ + Reset a failed cache task to retry. + + Args: + package_name: The package name to retry. + """ + task = retry_failed_task(db, package_name) + if not task: + raise HTTPException( + status_code=404, + detail=f"No failed cache task found for package '{package_name}'" + ) + return {"message": f"Retry queued for {task.package_name}", "task_id": str(task.id)} + + +@router.post("/cache/retry-all") +async def pypi_cache_retry_all(db: Session = Depends(get_db)): + """ + Reset all failed cache tasks to retry. + + Returns the count of tasks that were reset. + """ + count = retry_all_failed_tasks(db) + return {"message": f"Queued {count} tasks for retry", "count": count} diff --git a/backend/tests/test_pypi_cache_worker.py b/backend/tests/test_pypi_cache_worker.py new file mode 100644 index 0000000..2f5f09a --- /dev/null +++ b/backend/tests/test_pypi_cache_worker.py @@ -0,0 +1,263 @@ +"""Tests for PyPI cache worker module.""" + +import os +import pytest +import re +from datetime import datetime, timedelta +from unittest.mock import MagicMock, patch +from uuid import uuid4 + +import httpx + + +def get_base_url(): + """Get the base URL for the Orchard server from environment.""" + return os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + +class TestPyPICacheTaskModel: + """Tests for PyPICacheTask model.""" + + def test_model_creation(self): + """Test that PyPICacheTask model can be instantiated with explicit values.""" + from app.models import PyPICacheTask + + task = PyPICacheTask( + package_name="requests", + version_constraint=">=2.25.0", + depth=0, + status="pending", + attempts=0, + max_attempts=3, + ) + + assert task.package_name == "requests" + assert task.version_constraint == ">=2.25.0" + assert task.depth == 0 + assert task.status == "pending" + assert task.attempts == 0 + assert task.max_attempts == 3 + + def test_model_fields_exist(self): + """Test that PyPICacheTask has all expected fields.""" + from app.models import PyPICacheTask + + # Create with minimal required field + task = PyPICacheTask(package_name="urllib3") + + # Verify all expected attributes exist (SQLAlchemy defaults apply on flush) + assert hasattr(task, "status") + assert hasattr(task, "depth") + assert hasattr(task, "attempts") + assert hasattr(task, "max_attempts") + assert hasattr(task, "version_constraint") + assert hasattr(task, "parent_task_id") + assert hasattr(task, "triggered_by_artifact") + + +class TestEnqueueCacheTask: + """Tests for enqueue_cache_task function.""" + + def test_normalize_package_name(self): + """Test that package names are normalized per PEP 503.""" + # Test the normalization pattern used in the worker + test_cases = [ + ("Requests", "requests"), + ("typing_extensions", "typing-extensions"), + ("some.package", "some-package"), + ("UPPER_CASE", "upper-case"), + ("mixed-Case_name", "mixed-case-name"), + ] + + for input_name, expected in test_cases: + normalized = re.sub(r"[-_.]+", "-", input_name).lower() + assert normalized == expected, f"Failed for {input_name}" + + +class TestCacheWorkerFunctions: + """Tests for cache worker helper functions.""" + + def test_exponential_backoff_calculation(self): + """Test that exponential backoff is calculated correctly.""" + # The formula is: 30 * (2 ** (attempts - 1)) + # Attempt 1 failed → 30s + # Attempt 2 failed → 60s + # Attempt 3 failed → 120s + + def calc_backoff(attempts): + return 30 * (2 ** (attempts - 1)) + + assert calc_backoff(1) == 30 + assert calc_backoff(2) == 60 + assert calc_backoff(3) == 120 + + +class TestPyPICacheAPIEndpoints: + """Integration tests for PyPI cache API endpoints.""" + + @pytest.mark.integration + def test_cache_status_endpoint(self): + """Test GET /pypi/cache/status returns queue statistics.""" + with httpx.Client(base_url=get_base_url(), timeout=30.0) as client: + response = client.get("/pypi/cache/status") + assert response.status_code == 200 + + data = response.json() + assert "pending" in data + assert "in_progress" in data + assert "completed" in data + assert "failed" in data + + # All values should be non-negative integers + assert isinstance(data["pending"], int) + assert isinstance(data["in_progress"], int) + assert isinstance(data["completed"], int) + assert isinstance(data["failed"], int) + assert data["pending"] >= 0 + assert data["in_progress"] >= 0 + assert data["completed"] >= 0 + assert data["failed"] >= 0 + + @pytest.mark.integration + def test_cache_failed_endpoint(self): + """Test GET /pypi/cache/failed returns list of failed tasks.""" + with httpx.Client(base_url=get_base_url(), timeout=30.0) as client: + response = client.get("/pypi/cache/failed") + assert response.status_code == 200 + + data = response.json() + assert isinstance(data, list) + + # If there are failed tasks, verify structure + if data: + task = data[0] + assert "id" in task + assert "package" in task + assert "error" in task + assert "attempts" in task + assert "depth" in task + + @pytest.mark.integration + def test_cache_failed_with_limit(self): + """Test GET /pypi/cache/failed respects limit parameter.""" + with httpx.Client(base_url=get_base_url(), timeout=30.0) as client: + response = client.get("/pypi/cache/failed?limit=5") + assert response.status_code == 200 + + data = response.json() + assert isinstance(data, list) + assert len(data) <= 5 + + @pytest.mark.integration + def test_cache_retry_nonexistent_package(self): + """Test POST /pypi/cache/retry/{package} returns 404 for unknown package.""" + with httpx.Client(base_url=get_base_url(), timeout=30.0) as client: + # Use a random package name that definitely doesn't exist + response = client.post(f"/pypi/cache/retry/nonexistent-package-{uuid4().hex[:8]}") + assert response.status_code == 404 + # Check for "no failed" or "not found" in error message + detail = response.json()["detail"].lower() + assert "no failed" in detail or "not found" in detail + + @pytest.mark.integration + def test_cache_retry_all_endpoint(self): + """Test POST /pypi/cache/retry-all returns success.""" + with httpx.Client(base_url=get_base_url(), timeout=30.0) as client: + response = client.post("/pypi/cache/retry-all") + assert response.status_code == 200 + + data = response.json() + assert "count" in data + assert "message" in data + assert isinstance(data["count"], int) + assert data["count"] >= 0 + + +class TestCacheTaskDeduplication: + """Tests for cache task deduplication logic.""" + + def test_find_cached_package_returns_none_for_uncached(self): + """Test that _find_cached_package returns None for uncached packages.""" + # This is a unit test pattern - mock the database + from unittest.mock import MagicMock + + mock_db = MagicMock() + mock_db.query.return_value.filter.return_value.first.return_value = None + + from app.pypi_cache_worker import _find_cached_package + + result = _find_cached_package(mock_db, "nonexistent-package") + assert result is None + + +class TestCacheWorkerConfiguration: + """Tests for cache worker configuration.""" + + def test_config_settings_exist(self): + """Test that PyPI cache config settings are available.""" + from app.config import get_settings + + settings = get_settings() + + # Check that settings exist and have reasonable defaults + assert hasattr(settings, "pypi_cache_workers") + assert hasattr(settings, "pypi_cache_max_depth") + assert hasattr(settings, "pypi_cache_max_attempts") + + # Check aliases work + assert settings.PYPI_CACHE_WORKERS == settings.pypi_cache_workers + assert settings.PYPI_CACHE_MAX_DEPTH == settings.pypi_cache_max_depth + assert settings.PYPI_CACHE_MAX_ATTEMPTS == settings.pypi_cache_max_attempts + + def test_config_default_values(self): + """Test that PyPI cache config has sensible defaults.""" + from app.config import get_settings + + settings = get_settings() + + # These are the defaults from our implementation + assert settings.pypi_cache_workers == 5 + assert settings.pypi_cache_max_depth == 10 + assert settings.pypi_cache_max_attempts == 3 + + +class TestFetchAndCachePackage: + """Tests for _fetch_and_cache_package function.""" + + def test_result_structure_success(self): + """Test that success result has correct structure.""" + # Mock a successful result + result = {"success": True, "artifact_id": "abc123"} + + assert result["success"] is True + assert "artifact_id" in result + + def test_result_structure_failure(self): + """Test that failure result has correct structure.""" + # Mock a failure result + result = {"success": False, "error": "Package not found"} + + assert result["success"] is False + assert "error" in result + + +class TestWorkerPoolLifecycle: + """Tests for worker pool initialization and shutdown.""" + + def test_init_shutdown_cycle(self): + """Test that worker pool can be initialized and shut down cleanly.""" + from app.pypi_cache_worker import ( + init_cache_worker_pool, + shutdown_cache_worker_pool, + _cache_worker_pool, + _cache_worker_running, + ) + + # Note: We can't fully test this in isolation because the module + # has global state and may conflict with the running server. + # These tests verify the function signatures work. + + # The pool should be initialized by main.py on startup + # We just verify the functions are callable + assert callable(init_cache_worker_pool) + assert callable(shutdown_cache_worker_pool) diff --git a/migrations/011_pypi_cache_tasks.sql b/migrations/011_pypi_cache_tasks.sql new file mode 100644 index 0000000..db7f0d3 --- /dev/null +++ b/migrations/011_pypi_cache_tasks.sql @@ -0,0 +1,55 @@ +-- Migration: 011_pypi_cache_tasks +-- Description: Add table for tracking PyPI dependency caching tasks +-- Date: 2026-02-02 + +-- Table for tracking PyPI cache tasks with retry support +CREATE TABLE pypi_cache_tasks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + + -- What to cache + package_name VARCHAR(255) NOT NULL, + version_constraint VARCHAR(255), + + -- Origin tracking + parent_task_id UUID REFERENCES pypi_cache_tasks(id) ON DELETE SET NULL, + depth INTEGER NOT NULL DEFAULT 0, + triggered_by_artifact VARCHAR(64) REFERENCES artifacts(id) ON DELETE SET NULL, + + -- Status + status VARCHAR(20) NOT NULL DEFAULT 'pending', + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 3, + + -- Results + cached_artifact_id VARCHAR(64) REFERENCES artifacts(id) ON DELETE SET NULL, + error_message TEXT, + + -- Timing + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + started_at TIMESTAMP WITH TIME ZONE, + completed_at TIMESTAMP WITH TIME ZONE, + next_retry_at TIMESTAMP WITH TIME ZONE, + + -- Constraints + CONSTRAINT check_task_status CHECK (status IN ('pending', 'in_progress', 'completed', 'failed')), + CONSTRAINT check_depth_non_negative CHECK (depth >= 0), + CONSTRAINT check_attempts_non_negative CHECK (attempts >= 0) +); + +-- Index for finding tasks ready to process (pending with retry time passed) +CREATE INDEX idx_pypi_cache_tasks_status_retry ON pypi_cache_tasks(status, next_retry_at); + +-- Index for deduplication check (is this package already queued?) +CREATE INDEX idx_pypi_cache_tasks_package_status ON pypi_cache_tasks(package_name, status); + +-- Index for tracing dependency chains +CREATE INDEX idx_pypi_cache_tasks_parent ON pypi_cache_tasks(parent_task_id); + +-- Index for finding tasks by artifact that triggered them +CREATE INDEX idx_pypi_cache_tasks_triggered_by ON pypi_cache_tasks(triggered_by_artifact); + +-- Index for finding tasks by cached artifact +CREATE INDEX idx_pypi_cache_tasks_cached_artifact ON pypi_cache_tasks(cached_artifact_id); + +-- Index for sorting by depth and creation time (processing order) +CREATE INDEX idx_pypi_cache_tasks_depth_created ON pypi_cache_tasks(depth, created_at);