2 Commits

Author SHA1 Message Date
Mondo Diaz
d274f3f375 Add robust PyPI dependency caching with task queue
Replace unbounded thread spawning with managed worker pool:
- New pypi_cache_tasks table tracks caching jobs
- Thread pool with 5 workers (configurable via ORCHARD_PYPI_CACHE_WORKERS)
- Automatic retries with exponential backoff (30s, 60s, then fail)
- Deduplication to prevent duplicate caching attempts

New API endpoints for visibility and control:
- GET /pypi/cache/status - queue health summary
- GET /pypi/cache/failed - list failed tasks with errors
- POST /pypi/cache/retry/{package} - retry single package
- POST /pypi/cache/retry-all - retry all failed packages

This fixes silent failures in background dependency caching where
packages would fail to cache without any tracking or retry mechanism.
2026-02-02 11:16:02 -06:00
Mondo Diaz
490b05438d Add design doc for PyPI cache robustness improvements 2026-02-02 11:06:51 -06:00
8 changed files with 1322 additions and 94 deletions

View File

@@ -64,6 +64,11 @@ class Settings(BaseSettings):
# Global cache settings override (None = use DB value, True/False = override DB) # Global cache settings override (None = use DB value, True/False = override DB)
cache_auto_create_system_projects: Optional[bool] = None # Override auto_create_system_projects cache_auto_create_system_projects: Optional[bool] = None # Override auto_create_system_projects
# PyPI Cache Worker settings
pypi_cache_workers: int = 5 # Number of concurrent cache workers
pypi_cache_max_depth: int = 10 # Maximum recursion depth for dependency caching
pypi_cache_max_attempts: int = 3 # Maximum retry attempts for failed cache tasks
# JWT Authentication settings (optional, for external identity providers) # JWT Authentication settings (optional, for external identity providers)
jwt_enabled: bool = False # Enable JWT token validation jwt_enabled: bool = False # Enable JWT token validation
jwt_secret: str = "" # Secret key for HS256, or leave empty for RS256 with JWKS jwt_secret: str = "" # Secret key for HS256, or leave empty for RS256 with JWKS
@@ -88,6 +93,24 @@ class Settings(BaseSettings):
def is_production(self) -> bool: def is_production(self) -> bool:
return self.env.lower() == "production" return self.env.lower() == "production"
@property
def PORT(self) -> int:
"""Alias for server_port for compatibility."""
return self.server_port
# Uppercase aliases for PyPI cache settings (for backward compatibility)
@property
def PYPI_CACHE_WORKERS(self) -> int:
return self.pypi_cache_workers
@property
def PYPI_CACHE_MAX_DEPTH(self) -> int:
return self.pypi_cache_max_depth
@property
def PYPI_CACHE_MAX_ATTEMPTS(self) -> int:
return self.pypi_cache_max_attempts
class Config: class Config:
env_prefix = "ORCHARD_" env_prefix = "ORCHARD_"
case_sensitive = False case_sensitive = False

View File

@@ -15,6 +15,7 @@ from .pypi_proxy import router as pypi_router
from .seed import seed_database from .seed import seed_database
from .auth import create_default_admin from .auth import create_default_admin
from .rate_limit import limiter from .rate_limit import limiter
from .pypi_cache_worker import init_cache_worker_pool, shutdown_cache_worker_pool
settings = get_settings() settings = get_settings()
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@@ -49,8 +50,13 @@ async def lifespan(app: FastAPI):
else: else:
logger.info(f"Running in {settings.env} mode - skipping seed data") logger.info(f"Running in {settings.env} mode - skipping seed data")
# Initialize PyPI cache worker pool
init_cache_worker_pool()
yield yield
# Shutdown: cleanup if needed
# Shutdown: cleanup
shutdown_cache_worker_pool()
app = FastAPI( app = FastAPI(

View File

@@ -803,3 +803,70 @@ class CachedUrl(Base):
return hashlib.sha256(url.encode("utf-8")).hexdigest() return hashlib.sha256(url.encode("utf-8")).hexdigest()
class PyPICacheTask(Base):
"""Task for caching a PyPI package and its dependencies.
Tracks the status of background caching operations with retry support.
Used by the PyPI proxy to ensure reliable dependency caching.
"""
__tablename__ = "pypi_cache_tasks"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
# What to cache
package_name = Column(String(255), nullable=False)
version_constraint = Column(String(255))
# Origin tracking
parent_task_id = Column(
UUID(as_uuid=True),
ForeignKey("pypi_cache_tasks.id", ondelete="SET NULL"),
)
depth = Column(Integer, nullable=False, default=0)
triggered_by_artifact = Column(
String(64),
ForeignKey("artifacts.id", ondelete="SET NULL"),
)
# Status
status = Column(String(20), nullable=False, default="pending")
attempts = Column(Integer, nullable=False, default=0)
max_attempts = Column(Integer, nullable=False, default=3)
# Results
cached_artifact_id = Column(
String(64),
ForeignKey("artifacts.id", ondelete="SET NULL"),
)
error_message = Column(Text)
# Timing
created_at = Column(DateTime(timezone=True), nullable=False, default=datetime.utcnow)
started_at = Column(DateTime(timezone=True))
completed_at = Column(DateTime(timezone=True))
next_retry_at = Column(DateTime(timezone=True))
# Relationships
parent_task = relationship(
"PyPICacheTask",
remote_side=[id],
backref="child_tasks",
)
__table_args__ = (
Index("idx_pypi_cache_tasks_status_retry", "status", "next_retry_at"),
Index("idx_pypi_cache_tasks_package_status", "package_name", "status"),
Index("idx_pypi_cache_tasks_parent", "parent_task_id"),
Index("idx_pypi_cache_tasks_triggered_by", "triggered_by_artifact"),
Index("idx_pypi_cache_tasks_cached_artifact", "cached_artifact_id"),
Index("idx_pypi_cache_tasks_depth_created", "depth", "created_at"),
CheckConstraint(
"status IN ('pending', 'in_progress', 'completed', 'failed')",
name="check_task_status",
),
CheckConstraint("depth >= 0", name="check_depth_non_negative"),
CheckConstraint("attempts >= 0", name="check_attempts_non_negative"),
)

View File

@@ -0,0 +1,576 @@
"""
PyPI cache worker module.
Manages a thread pool for background caching of PyPI packages and their dependencies.
Replaces unbounded thread spawning with a managed queue-based approach.
"""
import logging
import re
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from typing import List, Optional
from uuid import UUID
import httpx
from sqlalchemy import or_
from sqlalchemy.orm import Session
from .config import get_settings
settings = get_settings()
from .database import SessionLocal
from .models import PyPICacheTask, Package, Project, Tag
logger = logging.getLogger(__name__)
# Module-level worker pool state
_cache_worker_pool: Optional[ThreadPoolExecutor] = None
_cache_worker_running: bool = False
_dispatcher_thread: Optional[threading.Thread] = None
def init_cache_worker_pool(max_workers: Optional[int] = None):
"""
Initialize the cache worker pool. Called on app startup.
Args:
max_workers: Number of concurrent workers. Defaults to PYPI_CACHE_WORKERS setting.
"""
global _cache_worker_pool, _cache_worker_running, _dispatcher_thread
if _cache_worker_pool is not None:
logger.warning("Cache worker pool already initialized")
return
workers = max_workers or settings.PYPI_CACHE_WORKERS
_cache_worker_pool = ThreadPoolExecutor(
max_workers=workers,
thread_name_prefix="pypi-cache-",
)
_cache_worker_running = True
# Start the dispatcher thread
_dispatcher_thread = threading.Thread(
target=_cache_dispatcher_loop,
daemon=True,
name="pypi-cache-dispatcher",
)
_dispatcher_thread.start()
logger.info(f"PyPI cache worker pool initialized with {workers} workers")
def shutdown_cache_worker_pool(wait: bool = True, timeout: float = 30.0):
"""
Shutdown the cache worker pool gracefully.
Args:
wait: Whether to wait for pending tasks to complete.
timeout: Maximum time to wait for shutdown.
"""
global _cache_worker_pool, _cache_worker_running, _dispatcher_thread
if _cache_worker_pool is None:
return
logger.info("Shutting down PyPI cache worker pool...")
_cache_worker_running = False
# Wait for dispatcher to stop
if _dispatcher_thread and _dispatcher_thread.is_alive():
_dispatcher_thread.join(timeout=5.0)
# Shutdown thread pool
_cache_worker_pool.shutdown(wait=wait, cancel_futures=not wait)
_cache_worker_pool = None
_dispatcher_thread = None
logger.info("PyPI cache worker pool shut down")
def _cache_dispatcher_loop():
"""
Main dispatcher loop: poll DB for pending tasks and submit to worker pool.
"""
logger.info("PyPI cache dispatcher started")
while _cache_worker_running:
try:
db = SessionLocal()
try:
tasks = _get_ready_tasks(db, limit=10)
for task in tasks:
# Mark in_progress before submitting
task.status = "in_progress"
task.started_at = datetime.utcnow()
db.commit()
# Submit to worker pool
_cache_worker_pool.submit(_process_cache_task, task.id)
# Sleep if no work (avoid busy loop)
if not tasks:
time.sleep(2.0)
else:
# Small delay between batches to avoid overwhelming
time.sleep(0.1)
finally:
db.close()
except Exception as e:
logger.error(f"PyPI cache dispatcher error: {e}")
time.sleep(5.0)
logger.info("PyPI cache dispatcher stopped")
def _get_ready_tasks(db: Session, limit: int = 10) -> List[PyPICacheTask]:
"""
Get tasks ready to process.
Returns pending tasks that are either new or ready for retry.
Orders by depth (shallow first) then creation time (FIFO).
"""
now = datetime.utcnow()
return (
db.query(PyPICacheTask)
.filter(
PyPICacheTask.status == "pending",
or_(
PyPICacheTask.next_retry_at == None, # New tasks
PyPICacheTask.next_retry_at <= now, # Retry tasks ready
),
)
.order_by(
PyPICacheTask.depth.asc(), # Prefer shallow deps first
PyPICacheTask.created_at.asc(), # FIFO within same depth
)
.limit(limit)
.all()
)
def _process_cache_task(task_id: UUID):
"""
Process a single cache task. Called by worker pool.
Args:
task_id: The ID of the task to process.
"""
db = SessionLocal()
try:
task = db.query(PyPICacheTask).filter(PyPICacheTask.id == task_id).first()
if not task:
logger.warning(f"PyPI cache task {task_id} not found")
return
logger.info(
f"Processing cache task: {task.package_name} "
f"(depth={task.depth}, attempt={task.attempts + 1})"
)
# Check if already cached by another task (dedup)
existing_artifact = _find_cached_package(db, task.package_name)
if existing_artifact:
logger.info(f"Package {task.package_name} already cached, skipping")
_mark_task_completed(db, task, cached_artifact_id=existing_artifact)
return
# Check depth limit
max_depth = settings.PYPI_CACHE_MAX_DEPTH
if task.depth >= max_depth:
_mark_task_failed(db, task, f"Max depth {max_depth} exceeded")
return
# Do the actual caching
result = _fetch_and_cache_package(task.package_name, task.version_constraint)
if result["success"]:
_mark_task_completed(db, task, cached_artifact_id=result.get("artifact_id"))
logger.info(f"Successfully cached {task.package_name}")
else:
_handle_task_failure(db, task, result["error"])
except Exception as e:
logger.exception(f"Error processing cache task {task_id}")
db = SessionLocal() # Get fresh session after exception
try:
task = db.query(PyPICacheTask).filter(PyPICacheTask.id == task_id).first()
if task:
_handle_task_failure(db, task, str(e))
finally:
db.close()
finally:
db.close()
def _find_cached_package(db: Session, package_name: str) -> Optional[str]:
"""
Check if a package is already cached.
Args:
db: Database session.
package_name: Normalized package name.
Returns:
Artifact ID if cached, None otherwise.
"""
# Normalize package name (PEP 503)
normalized = re.sub(r"[-_.]+", "-", package_name).lower()
# Check if _pypi project has this package with at least one tag
system_project = db.query(Project).filter(Project.name == "_pypi").first()
if not system_project:
return None
package = (
db.query(Package)
.filter(
Package.project_id == system_project.id,
Package.name == normalized,
)
.first()
)
if not package:
return None
# Check if package has any tags (cached files)
tag = db.query(Tag).filter(Tag.package_id == package.id).first()
if tag:
return tag.artifact_id
return None
def _fetch_and_cache_package(
package_name: str,
version_constraint: Optional[str] = None,
) -> dict:
"""
Fetch and cache a PyPI package by making requests through our own proxy.
Args:
package_name: The package name to cache.
version_constraint: Optional version constraint (currently not used for selection).
Returns:
Dict with "success" bool, "artifact_id" on success, "error" on failure.
"""
# Normalize package name (PEP 503)
normalized_name = re.sub(r"[-_.]+", "-", package_name).lower()
# Build the URL to our own proxy
# Use localhost since we're making internal requests
base_url = f"http://localhost:{settings.PORT}"
try:
with httpx.Client(timeout=60.0, follow_redirects=True) as client:
# Step 1: Get the simple index page
simple_url = f"{base_url}/pypi/simple/{normalized_name}/"
logger.debug(f"Fetching index: {simple_url}")
response = client.get(simple_url)
if response.status_code == 404:
return {"success": False, "error": f"Package {package_name} not found on upstream"}
if response.status_code != 200:
return {"success": False, "error": f"Failed to get index: HTTP {response.status_code}"}
# Step 2: Parse HTML to find downloadable files
html = response.text
# Create pattern that matches both normalized (hyphens) and original (underscores)
name_pattern = re.sub(r"[-_]+", "[-_]+", normalized_name)
# Look for wheel files first (preferred)
wheel_pattern = rf'href="([^"]*{name_pattern}[^"]*\.whl[^"]*)"'
matches = re.findall(wheel_pattern, html, re.IGNORECASE)
if not matches:
# Fall back to sdist
sdist_pattern = rf'href="([^"]*{name_pattern}[^"]*\.tar\.gz[^"]*)"'
matches = re.findall(sdist_pattern, html, re.IGNORECASE)
if not matches:
logger.warning(
f"No downloadable files found for {package_name}. "
f"Pattern: {wheel_pattern}, HTML preview: {html[:500]}"
)
return {"success": False, "error": "No downloadable files found"}
# Get the last match (usually latest version)
download_url = matches[-1]
# Make URL absolute if needed
if download_url.startswith("/"):
download_url = f"{base_url}{download_url}"
elif not download_url.startswith("http"):
download_url = f"{base_url}/pypi/simple/{normalized_name}/{download_url}"
# Step 3: Download the file through our proxy (this caches it)
logger.debug(f"Downloading: {download_url}")
response = client.get(download_url)
if response.status_code != 200:
return {"success": False, "error": f"Download failed: HTTP {response.status_code}"}
# Get artifact ID from response header
artifact_id = response.headers.get("X-Checksum-SHA256")
return {"success": True, "artifact_id": artifact_id}
except httpx.TimeoutException as e:
return {"success": False, "error": f"Timeout: {e}"}
except httpx.ConnectError as e:
return {"success": False, "error": f"Connection failed: {e}"}
except Exception as e:
return {"success": False, "error": str(e)}
def _mark_task_completed(
db: Session,
task: PyPICacheTask,
cached_artifact_id: Optional[str] = None,
):
"""Mark a task as completed."""
task.status = "completed"
task.completed_at = datetime.utcnow()
task.cached_artifact_id = cached_artifact_id
task.error_message = None
db.commit()
def _mark_task_failed(db: Session, task: PyPICacheTask, error: str):
"""Mark a task as permanently failed."""
task.status = "failed"
task.completed_at = datetime.utcnow()
task.error_message = error[:1000] if error else None
db.commit()
logger.warning(f"PyPI cache task failed permanently: {task.package_name} - {error}")
def _handle_task_failure(db: Session, task: PyPICacheTask, error: str):
"""
Handle a failed cache attempt with exponential backoff.
Args:
db: Database session.
task: The failed task.
error: Error message.
"""
task.attempts += 1
task.error_message = error[:1000] if error else None
max_attempts = task.max_attempts or settings.PYPI_CACHE_MAX_ATTEMPTS
if task.attempts >= max_attempts:
# Give up after max attempts
task.status = "failed"
task.completed_at = datetime.utcnow()
logger.warning(
f"PyPI cache task failed permanently: {task.package_name} - {error} "
f"(after {task.attempts} attempts)"
)
else:
# Schedule retry with exponential backoff
# Attempt 1 failed → retry in 30s
# Attempt 2 failed → retry in 60s
# Attempt 3 failed → permanent failure (if max_attempts=3)
backoff_seconds = 30 * (2 ** (task.attempts - 1))
task.status = "pending"
task.next_retry_at = datetime.utcnow() + timedelta(seconds=backoff_seconds)
logger.info(
f"PyPI cache task will retry: {task.package_name} in {backoff_seconds}s "
f"(attempt {task.attempts}/{max_attempts})"
)
db.commit()
def enqueue_cache_task(
db: Session,
package_name: str,
version_constraint: Optional[str] = None,
parent_task_id: Optional[UUID] = None,
depth: int = 0,
triggered_by_artifact: Optional[str] = None,
) -> Optional[PyPICacheTask]:
"""
Enqueue a package for caching.
Performs deduplication: won't create a task if one already exists
for the same package in pending/in_progress state, or if the package
is already cached.
Args:
db: Database session.
package_name: The package name to cache.
version_constraint: Optional version constraint.
parent_task_id: Parent task that spawned this one.
depth: Recursion depth.
triggered_by_artifact: Artifact that declared this dependency.
Returns:
The created or existing task, or None if already cached.
"""
# Normalize package name (PEP 503)
normalized = re.sub(r"[-_.]+", "-", package_name).lower()
# Check for existing pending/in_progress task
existing_task = (
db.query(PyPICacheTask)
.filter(
PyPICacheTask.package_name == normalized,
PyPICacheTask.status.in_(["pending", "in_progress"]),
)
.first()
)
if existing_task:
logger.debug(f"Task already exists for {normalized}: {existing_task.id}")
return existing_task
# Check if already cached
if _find_cached_package(db, normalized):
logger.debug(f"Package {normalized} already cached, skipping task creation")
return None
# Create new task
task = PyPICacheTask(
package_name=normalized,
version_constraint=version_constraint,
parent_task_id=parent_task_id,
depth=depth,
triggered_by_artifact=triggered_by_artifact,
max_attempts=settings.PYPI_CACHE_MAX_ATTEMPTS,
)
db.add(task)
db.flush()
logger.info(f"Enqueued cache task for {normalized} (depth={depth})")
return task
def get_cache_status(db: Session) -> dict:
"""
Get summary of cache task queue status.
Returns:
Dict with counts by status.
"""
from sqlalchemy import func
stats = (
db.query(PyPICacheTask.status, func.count(PyPICacheTask.id))
.group_by(PyPICacheTask.status)
.all()
)
return {
"pending": next((s[1] for s in stats if s[0] == "pending"), 0),
"in_progress": next((s[1] for s in stats if s[0] == "in_progress"), 0),
"completed": next((s[1] for s in stats if s[0] == "completed"), 0),
"failed": next((s[1] for s in stats if s[0] == "failed"), 0),
}
def get_failed_tasks(db: Session, limit: int = 50) -> List[dict]:
"""
Get list of failed tasks for debugging.
Args:
db: Database session.
limit: Maximum number of tasks to return.
Returns:
List of failed task info dicts.
"""
tasks = (
db.query(PyPICacheTask)
.filter(PyPICacheTask.status == "failed")
.order_by(PyPICacheTask.completed_at.desc())
.limit(limit)
.all()
)
return [
{
"id": str(task.id),
"package": task.package_name,
"error": task.error_message,
"attempts": task.attempts,
"depth": task.depth,
"failed_at": task.completed_at.isoformat() if task.completed_at else None,
}
for task in tasks
]
def retry_failed_task(db: Session, package_name: str) -> Optional[PyPICacheTask]:
"""
Reset a failed task to retry.
Args:
db: Database session.
package_name: The package name to retry.
Returns:
The reset task, or None if not found.
"""
normalized = re.sub(r"[-_.]+", "-", package_name).lower()
task = (
db.query(PyPICacheTask)
.filter(
PyPICacheTask.package_name == normalized,
PyPICacheTask.status == "failed",
)
.first()
)
if not task:
return None
task.status = "pending"
task.attempts = 0
task.next_retry_at = None
task.error_message = None
task.started_at = None
task.completed_at = None
db.commit()
logger.info(f"Reset failed task for retry: {normalized}")
return task
def retry_all_failed_tasks(db: Session) -> int:
"""
Reset all failed tasks to retry.
Args:
db: Database session.
Returns:
Number of tasks reset.
"""
count = (
db.query(PyPICacheTask)
.filter(PyPICacheTask.status == "failed")
.update(
{
"status": "pending",
"attempts": 0,
"next_retry_at": None,
"error_message": None,
"started_at": None,
"completed_at": None,
}
)
)
db.commit()
logger.info(f"Reset {count} failed tasks for retry")
return count

View File

@@ -9,7 +9,6 @@ import hashlib
import logging import logging
import re import re
import tarfile import tarfile
import threading
import zipfile import zipfile
from io import BytesIO from io import BytesIO
from typing import Optional, List, Tuple from typing import Optional, List, Tuple
@@ -24,6 +23,13 @@ from .database import get_db
from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, Tag, PackageVersion, ArtifactDependency from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, Tag, PackageVersion, ArtifactDependency
from .storage import S3Storage, get_storage from .storage import S3Storage, get_storage
from .config import get_env_upstream_sources from .config import get_env_upstream_sources
from .pypi_cache_worker import (
enqueue_cache_task,
get_cache_status,
get_failed_tasks,
retry_failed_task,
retry_all_failed_tasks,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -503,93 +509,6 @@ async def pypi_package_versions(
) )
def _cache_dependency_background(
base_url: str,
dep_name: str,
dep_version: Optional[str],
depth: int = 0,
max_depth: int = 10,
):
"""
Background task to proactively cache a dependency.
Fetches the dependency from upstream via our own proxy, which will
recursively cache its dependencies as well.
"""
if depth >= max_depth:
logger.warning(f"PyPI proxy: max depth {max_depth} reached caching {dep_name}")
return
try:
# Normalize package name for URL (PEP 503)
normalized_name = re.sub(r'[-_.]+', '-', dep_name).lower()
# First, get the simple index page to find available versions
# Use HTTPS explicitly to avoid redirect issues that can drop trailing slashes
if base_url.startswith('http://'):
base_url = 'https://' + base_url[7:]
simple_url = f"{base_url}/pypi/simple/{normalized_name}/"
logger.info(f"PyPI proxy: proactively caching {dep_name} (depth={depth})")
with httpx.Client(timeout=30.0, follow_redirects=True) as client:
response = client.get(simple_url)
if response.status_code != 200:
logger.warning(f"PyPI proxy: failed to get index for {dep_name}: {response.status_code}")
return
# Parse the HTML to find wheel files
html = response.text
# Create pattern that matches both normalized (hyphens) and original (underscores) forms
# PEP 503 normalizes to hyphens, but wheel filenames may use underscores
name_pattern = re.sub(r'[-_]+', '[-_]+', normalized_name)
# Look for wheel files (.whl) - prefer them over sdist
wheel_pattern = rf'href="([^"]*{name_pattern}[^"]*\.whl[^"]*)"'
matches = re.findall(wheel_pattern, html, re.IGNORECASE)
if not matches:
# Try sdist
sdist_pattern = rf'href="([^"]*{name_pattern}[^"]*\.tar\.gz[^"]*)"'
matches = re.findall(sdist_pattern, html, re.IGNORECASE)
if not matches:
# Debug: log first 500 chars of HTML and the pattern we're looking for
logger.warning(f"PyPI proxy: no downloadable files found for {dep_name}. Pattern: {wheel_pattern}, HTML preview: {html[:500]}")
return
# Get the last match (usually the latest version)
# The URL might be relative or absolute
download_url = matches[-1]
if download_url.startswith('/'):
download_url = f"{base_url}{download_url}"
elif not download_url.startswith('http'):
download_url = f"{base_url}/pypi/simple/{normalized_name}/{download_url}"
# Download the file through our proxy (this will cache it)
logger.info(f"PyPI proxy: downloading dependency {dep_name} from {download_url}")
response = client.get(download_url)
if response.status_code == 200:
logger.info(f"PyPI proxy: successfully cached {dep_name}")
else:
logger.warning(f"PyPI proxy: failed to cache {dep_name}: {response.status_code}")
except Exception as e:
logger.warning(f"PyPI proxy: error caching dependency {dep_name}: {e}")
def _start_background_dependency_caching(base_url: str, dependencies: List[Tuple[str, Optional[str]]]):
"""Start background threads to cache dependencies."""
for dep_name, dep_version in dependencies:
# Use a thread to avoid blocking
thread = threading.Thread(
target=_cache_dependency_background,
args=(base_url, dep_name, dep_version),
daemon=True,
)
thread.start()
@router.get("/simple/{package_name}/{filename}") @router.get("/simple/{package_name}/{filename}")
async def pypi_download_file( async def pypi_download_file(
request: Request, request: Request,
@@ -851,12 +770,20 @@ async def pypi_download_file(
) )
db.add(dep) db.add(dep)
db.commit() # Proactively cache dependencies via task queue
# Proactively cache dependencies in the background
if unique_deps: if unique_deps:
base_url = str(request.base_url).rstrip("/") for dep_name, dep_version in unique_deps:
_start_background_dependency_caching(base_url, unique_deps) enqueue_cache_task(
db,
package_name=dep_name,
version_constraint=dep_version,
parent_task_id=None, # Top-level, triggered by user download
depth=0,
triggered_by_artifact=sha256,
)
logger.info(f"PyPI proxy: queued {len(unique_deps)} dependencies for caching")
db.commit()
# Return the file # Return the file
return Response( return Response(
@@ -879,3 +806,63 @@ async def pypi_download_file(
except Exception as e: except Exception as e:
logger.exception(f"PyPI proxy: error downloading {filename}") logger.exception(f"PyPI proxy: error downloading {filename}")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# Cache Status and Management Endpoints
# =============================================================================
@router.get("/cache/status")
async def pypi_cache_status(db: Session = Depends(get_db)):
"""
Get summary of the PyPI cache task queue.
Returns counts of tasks by status (pending, in_progress, completed, failed).
"""
return get_cache_status(db)
@router.get("/cache/failed")
async def pypi_cache_failed(
limit: int = 50,
db: Session = Depends(get_db),
):
"""
Get list of failed cache tasks for debugging.
Args:
limit: Maximum number of tasks to return (default 50).
"""
return get_failed_tasks(db, limit=limit)
@router.post("/cache/retry/{package_name}")
async def pypi_cache_retry(
package_name: str,
db: Session = Depends(get_db),
):
"""
Reset a failed cache task to retry.
Args:
package_name: The package name to retry.
"""
task = retry_failed_task(db, package_name)
if not task:
raise HTTPException(
status_code=404,
detail=f"No failed cache task found for package '{package_name}'"
)
return {"message": f"Retry queued for {task.package_name}", "task_id": str(task.id)}
@router.post("/cache/retry-all")
async def pypi_cache_retry_all(db: Session = Depends(get_db)):
"""
Reset all failed cache tasks to retry.
Returns the count of tasks that were reset.
"""
count = retry_all_failed_tasks(db)
return {"message": f"Queued {count} tasks for retry", "count": count}

View File

@@ -0,0 +1,263 @@
"""Tests for PyPI cache worker module."""
import os
import pytest
import re
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch
from uuid import uuid4
import httpx
def get_base_url():
"""Get the base URL for the Orchard server from environment."""
return os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
class TestPyPICacheTaskModel:
"""Tests for PyPICacheTask model."""
def test_model_creation(self):
"""Test that PyPICacheTask model can be instantiated with explicit values."""
from app.models import PyPICacheTask
task = PyPICacheTask(
package_name="requests",
version_constraint=">=2.25.0",
depth=0,
status="pending",
attempts=0,
max_attempts=3,
)
assert task.package_name == "requests"
assert task.version_constraint == ">=2.25.0"
assert task.depth == 0
assert task.status == "pending"
assert task.attempts == 0
assert task.max_attempts == 3
def test_model_fields_exist(self):
"""Test that PyPICacheTask has all expected fields."""
from app.models import PyPICacheTask
# Create with minimal required field
task = PyPICacheTask(package_name="urllib3")
# Verify all expected attributes exist (SQLAlchemy defaults apply on flush)
assert hasattr(task, "status")
assert hasattr(task, "depth")
assert hasattr(task, "attempts")
assert hasattr(task, "max_attempts")
assert hasattr(task, "version_constraint")
assert hasattr(task, "parent_task_id")
assert hasattr(task, "triggered_by_artifact")
class TestEnqueueCacheTask:
"""Tests for enqueue_cache_task function."""
def test_normalize_package_name(self):
"""Test that package names are normalized per PEP 503."""
# Test the normalization pattern used in the worker
test_cases = [
("Requests", "requests"),
("typing_extensions", "typing-extensions"),
("some.package", "some-package"),
("UPPER_CASE", "upper-case"),
("mixed-Case_name", "mixed-case-name"),
]
for input_name, expected in test_cases:
normalized = re.sub(r"[-_.]+", "-", input_name).lower()
assert normalized == expected, f"Failed for {input_name}"
class TestCacheWorkerFunctions:
"""Tests for cache worker helper functions."""
def test_exponential_backoff_calculation(self):
"""Test that exponential backoff is calculated correctly."""
# The formula is: 30 * (2 ** (attempts - 1))
# Attempt 1 failed → 30s
# Attempt 2 failed → 60s
# Attempt 3 failed → 120s
def calc_backoff(attempts):
return 30 * (2 ** (attempts - 1))
assert calc_backoff(1) == 30
assert calc_backoff(2) == 60
assert calc_backoff(3) == 120
class TestPyPICacheAPIEndpoints:
"""Integration tests for PyPI cache API endpoints."""
@pytest.mark.integration
def test_cache_status_endpoint(self):
"""Test GET /pypi/cache/status returns queue statistics."""
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
response = client.get("/pypi/cache/status")
assert response.status_code == 200
data = response.json()
assert "pending" in data
assert "in_progress" in data
assert "completed" in data
assert "failed" in data
# All values should be non-negative integers
assert isinstance(data["pending"], int)
assert isinstance(data["in_progress"], int)
assert isinstance(data["completed"], int)
assert isinstance(data["failed"], int)
assert data["pending"] >= 0
assert data["in_progress"] >= 0
assert data["completed"] >= 0
assert data["failed"] >= 0
@pytest.mark.integration
def test_cache_failed_endpoint(self):
"""Test GET /pypi/cache/failed returns list of failed tasks."""
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
response = client.get("/pypi/cache/failed")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
# If there are failed tasks, verify structure
if data:
task = data[0]
assert "id" in task
assert "package" in task
assert "error" in task
assert "attempts" in task
assert "depth" in task
@pytest.mark.integration
def test_cache_failed_with_limit(self):
"""Test GET /pypi/cache/failed respects limit parameter."""
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
response = client.get("/pypi/cache/failed?limit=5")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) <= 5
@pytest.mark.integration
def test_cache_retry_nonexistent_package(self):
"""Test POST /pypi/cache/retry/{package} returns 404 for unknown package."""
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
# Use a random package name that definitely doesn't exist
response = client.post(f"/pypi/cache/retry/nonexistent-package-{uuid4().hex[:8]}")
assert response.status_code == 404
# Check for "no failed" or "not found" in error message
detail = response.json()["detail"].lower()
assert "no failed" in detail or "not found" in detail
@pytest.mark.integration
def test_cache_retry_all_endpoint(self):
"""Test POST /pypi/cache/retry-all returns success."""
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
response = client.post("/pypi/cache/retry-all")
assert response.status_code == 200
data = response.json()
assert "count" in data
assert "message" in data
assert isinstance(data["count"], int)
assert data["count"] >= 0
class TestCacheTaskDeduplication:
"""Tests for cache task deduplication logic."""
def test_find_cached_package_returns_none_for_uncached(self):
"""Test that _find_cached_package returns None for uncached packages."""
# This is a unit test pattern - mock the database
from unittest.mock import MagicMock
mock_db = MagicMock()
mock_db.query.return_value.filter.return_value.first.return_value = None
from app.pypi_cache_worker import _find_cached_package
result = _find_cached_package(mock_db, "nonexistent-package")
assert result is None
class TestCacheWorkerConfiguration:
"""Tests for cache worker configuration."""
def test_config_settings_exist(self):
"""Test that PyPI cache config settings are available."""
from app.config import get_settings
settings = get_settings()
# Check that settings exist and have reasonable defaults
assert hasattr(settings, "pypi_cache_workers")
assert hasattr(settings, "pypi_cache_max_depth")
assert hasattr(settings, "pypi_cache_max_attempts")
# Check aliases work
assert settings.PYPI_CACHE_WORKERS == settings.pypi_cache_workers
assert settings.PYPI_CACHE_MAX_DEPTH == settings.pypi_cache_max_depth
assert settings.PYPI_CACHE_MAX_ATTEMPTS == settings.pypi_cache_max_attempts
def test_config_default_values(self):
"""Test that PyPI cache config has sensible defaults."""
from app.config import get_settings
settings = get_settings()
# These are the defaults from our implementation
assert settings.pypi_cache_workers == 5
assert settings.pypi_cache_max_depth == 10
assert settings.pypi_cache_max_attempts == 3
class TestFetchAndCachePackage:
"""Tests for _fetch_and_cache_package function."""
def test_result_structure_success(self):
"""Test that success result has correct structure."""
# Mock a successful result
result = {"success": True, "artifact_id": "abc123"}
assert result["success"] is True
assert "artifact_id" in result
def test_result_structure_failure(self):
"""Test that failure result has correct structure."""
# Mock a failure result
result = {"success": False, "error": "Package not found"}
assert result["success"] is False
assert "error" in result
class TestWorkerPoolLifecycle:
"""Tests for worker pool initialization and shutdown."""
def test_init_shutdown_cycle(self):
"""Test that worker pool can be initialized and shut down cleanly."""
from app.pypi_cache_worker import (
init_cache_worker_pool,
shutdown_cache_worker_pool,
_cache_worker_pool,
_cache_worker_running,
)
# Note: We can't fully test this in isolation because the module
# has global state and may conflict with the running server.
# These tests verify the function signatures work.
# The pool should be initialized by main.py on startup
# We just verify the functions are callable
assert callable(init_cache_worker_pool)
assert callable(shutdown_cache_worker_pool)

View File

@@ -0,0 +1,251 @@
# PyPI Cache Robustness Design
**Date:** 2026-02-02
**Status:** Approved
**Branch:** fix/pypi-proxy-timeout
## Problem
The current PyPI proxy proactive caching has reliability issues:
- Unbounded thread spawning for each dependency
- Silent failures (logged but not tracked or retried)
- No visibility into cache completeness
- Deps-of-deps often missing due to untracked failures
## Solution
Database-backed task queue with managed worker pool, automatic retries, and visibility API.
---
## Data Model
New table `pypi_cache_tasks`:
```sql
CREATE TABLE pypi_cache_tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- What to cache
package_name VARCHAR(255) NOT NULL,
version_constraint VARCHAR(255),
-- Origin tracking
parent_task_id UUID REFERENCES pypi_cache_tasks(id) ON DELETE SET NULL,
depth INTEGER NOT NULL DEFAULT 0,
triggered_by_artifact VARCHAR(64),
-- Status
status VARCHAR(20) NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
-- Results
cached_artifact_id VARCHAR(64),
error_message TEXT,
-- Timing
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
next_retry_at TIMESTAMP WITH TIME ZONE
);
-- Indexes
CREATE INDEX idx_pypi_cache_tasks_status_retry ON pypi_cache_tasks(status, next_retry_at);
CREATE INDEX idx_pypi_cache_tasks_package_status ON pypi_cache_tasks(package_name, status);
CREATE INDEX idx_pypi_cache_tasks_parent ON pypi_cache_tasks(parent_task_id);
-- Constraints
ALTER TABLE pypi_cache_tasks ADD CONSTRAINT check_task_status
CHECK (status IN ('pending', 'in_progress', 'completed', 'failed'));
```
---
## Worker Architecture
### Thread Pool (5 workers default)
```python
_cache_worker_pool: ThreadPoolExecutor = None
_cache_worker_running: bool = False
def init_cache_worker_pool(max_workers: int = 5):
global _cache_worker_pool, _cache_worker_running
_cache_worker_pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="pypi-cache-")
_cache_worker_running = True
threading.Thread(target=_cache_dispatcher_loop, daemon=True).start()
```
### Dispatcher Loop
- Polls DB every 2 seconds when idle
- Fetches batch of 10 ready tasks
- Marks tasks in_progress before submitting to pool
- Orders by depth (shallow first) then FIFO
### Task Processing
1. Dedup check - skip if package already cached
2. Dedup check - skip if pending/in_progress task exists for same package
3. Depth check - fail if >= 10 levels deep
4. Fetch package index page
5. Download best matching file (prefer wheels)
6. Store artifact, extract dependencies
7. Queue child tasks for each dependency
8. Mark completed or handle failure
---
## Retry Logic
Exponential backoff with 3 attempts:
| Attempt | Backoff |
|---------|---------|
| 1 fails | 30 seconds |
| 2 fails | 60 seconds |
| 3 fails | Permanent failure |
```python
backoff_seconds = 30 * (2 ** (attempts - 1))
task.next_retry_at = datetime.utcnow() + timedelta(seconds=backoff_seconds)
```
---
## API Endpoints
| Endpoint | Method | Purpose |
|----------|--------|---------|
| `/pypi/cache/status` | GET | Queue health summary |
| `/pypi/cache/failed` | GET | List failed tasks with errors |
| `/pypi/cache/retry/{package}` | POST | Retry single failed package |
| `/pypi/cache/retry-all` | POST | Retry all failed packages |
### Response Examples
**GET /pypi/cache/status**
```json
{
"pending": 12,
"in_progress": 3,
"completed": 847,
"failed": 5
}
```
**GET /pypi/cache/failed**
```json
[
{
"package": "some-obscure-pkg",
"error": "Timeout connecting to upstream",
"attempts": 3,
"failed_at": "2026-02-02T10:30:00Z"
}
]
```
---
## Integration Points
### Replace Thread Spawning (pypi_proxy.py)
```python
# OLD: _start_background_dependency_caching(base_url, unique_deps)
# NEW:
for dep_name, dep_version in unique_deps:
_enqueue_cache_task(
db,
package_name=dep_name,
version_constraint=dep_version,
parent_task_id=None,
depth=0,
triggered_by_artifact=sha256,
)
```
### App Startup (main.py)
```python
@app.on_event("startup")
async def startup():
init_cache_worker_pool(max_workers=settings.PYPI_CACHE_WORKERS)
@app.on_event("shutdown")
async def shutdown():
shutdown_cache_worker_pool()
```
### Configuration (config.py)
```python
PYPI_CACHE_WORKERS = int(os.getenv("ORCHARD_PYPI_CACHE_WORKERS", "5"))
PYPI_CACHE_MAX_DEPTH = int(os.getenv("ORCHARD_PYPI_CACHE_MAX_DEPTH", "10"))
PYPI_CACHE_MAX_ATTEMPTS = int(os.getenv("ORCHARD_PYPI_CACHE_MAX_ATTEMPTS", "3"))
```
---
## Files to Create/Modify
| File | Action |
|------|--------|
| `migrations/0XX_pypi_cache_tasks.sql` | Create - new table |
| `backend/app/models.py` | Modify - add PyPICacheTask model |
| `backend/app/pypi_cache_worker.py` | Create - worker pool + processing |
| `backend/app/pypi_proxy.py` | Modify - replace threads, add API |
| `backend/app/main.py` | Modify - init worker on startup |
| `backend/app/config.py` | Modify - add config variables |
| `backend/tests/test_pypi_cache_worker.py` | Create - unit tests |
| `backend/tests/integration/test_pypi_cache_api.py` | Create - API tests |
---
## Deduplication Strategy
### At Task Creation Time
```python
def _enqueue_cache_task(db, package_name, ...):
# Check for existing pending/in_progress task
existing_task = db.query(PyPICacheTask).filter(
PyPICacheTask.package_name == package_name,
PyPICacheTask.status.in_(["pending", "in_progress"])
).first()
if existing_task:
return existing_task
# Check if already cached
if _find_cached_package(db, package_name):
return None
# Create new task
...
```
### At Processing Time (safety check)
```python
def _process_cache_task(task_id):
# Double-check in case of race
if _find_cached_package(db, task.package_name):
_mark_task_completed(db, task, cached_artifact_id=existing.artifact_id)
return
```
---
## Success Criteria
- [ ] No unbounded thread creation
- [ ] All dependency caching attempts tracked in database
- [ ] Failed tasks automatically retry with backoff
- [ ] API provides visibility into queue status
- [ ] Manual retry capability for failed packages
- [ ] Existing pip install workflow unchanged (transparent)
- [ ] Tests cover worker, retry, and API functionality

View File

@@ -0,0 +1,55 @@
-- Migration: 011_pypi_cache_tasks
-- Description: Add table for tracking PyPI dependency caching tasks
-- Date: 2026-02-02
-- Table for tracking PyPI cache tasks with retry support
CREATE TABLE pypi_cache_tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- What to cache
package_name VARCHAR(255) NOT NULL,
version_constraint VARCHAR(255),
-- Origin tracking
parent_task_id UUID REFERENCES pypi_cache_tasks(id) ON DELETE SET NULL,
depth INTEGER NOT NULL DEFAULT 0,
triggered_by_artifact VARCHAR(64) REFERENCES artifacts(id) ON DELETE SET NULL,
-- Status
status VARCHAR(20) NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
-- Results
cached_artifact_id VARCHAR(64) REFERENCES artifacts(id) ON DELETE SET NULL,
error_message TEXT,
-- Timing
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
next_retry_at TIMESTAMP WITH TIME ZONE,
-- Constraints
CONSTRAINT check_task_status CHECK (status IN ('pending', 'in_progress', 'completed', 'failed')),
CONSTRAINT check_depth_non_negative CHECK (depth >= 0),
CONSTRAINT check_attempts_non_negative CHECK (attempts >= 0)
);
-- Index for finding tasks ready to process (pending with retry time passed)
CREATE INDEX idx_pypi_cache_tasks_status_retry ON pypi_cache_tasks(status, next_retry_at);
-- Index for deduplication check (is this package already queued?)
CREATE INDEX idx_pypi_cache_tasks_package_status ON pypi_cache_tasks(package_name, status);
-- Index for tracing dependency chains
CREATE INDEX idx_pypi_cache_tasks_parent ON pypi_cache_tasks(parent_task_id);
-- Index for finding tasks by artifact that triggered them
CREATE INDEX idx_pypi_cache_tasks_triggered_by ON pypi_cache_tasks(triggered_by_artifact);
-- Index for finding tasks by cached artifact
CREATE INDEX idx_pypi_cache_tasks_cached_artifact ON pypi_cache_tasks(cached_artifact_id);
-- Index for sorting by depth and creation time (processing order)
CREATE INDEX idx_pypi_cache_tasks_depth_created ON pypi_cache_tasks(depth, created_at);