# PyPI Cache Robustness Design **Date:** 2026-02-02 **Status:** Approved **Branch:** fix/pypi-proxy-timeout ## Problem The current PyPI proxy proactive caching has reliability issues: - Unbounded thread spawning for each dependency - Silent failures (logged but not tracked or retried) - No visibility into cache completeness - Deps-of-deps often missing due to untracked failures ## Solution Database-backed task queue with managed worker pool, automatic retries, and visibility API. --- ## Data Model New table `pypi_cache_tasks`: ```sql CREATE TABLE pypi_cache_tasks ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), -- What to cache package_name VARCHAR(255) NOT NULL, version_constraint VARCHAR(255), -- Origin tracking parent_task_id UUID REFERENCES pypi_cache_tasks(id) ON DELETE SET NULL, depth INTEGER NOT NULL DEFAULT 0, triggered_by_artifact VARCHAR(64), -- Status status VARCHAR(20) NOT NULL DEFAULT 'pending', attempts INTEGER NOT NULL DEFAULT 0, max_attempts INTEGER NOT NULL DEFAULT 3, -- Results cached_artifact_id VARCHAR(64), error_message TEXT, -- Timing created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), started_at TIMESTAMP WITH TIME ZONE, completed_at TIMESTAMP WITH TIME ZONE, next_retry_at TIMESTAMP WITH TIME ZONE ); -- Indexes CREATE INDEX idx_pypi_cache_tasks_status_retry ON pypi_cache_tasks(status, next_retry_at); CREATE INDEX idx_pypi_cache_tasks_package_status ON pypi_cache_tasks(package_name, status); CREATE INDEX idx_pypi_cache_tasks_parent ON pypi_cache_tasks(parent_task_id); -- Constraints ALTER TABLE pypi_cache_tasks ADD CONSTRAINT check_task_status CHECK (status IN ('pending', 'in_progress', 'completed', 'failed')); ``` --- ## Worker Architecture ### Thread Pool (5 workers default) ```python _cache_worker_pool: ThreadPoolExecutor = None _cache_worker_running: bool = False def init_cache_worker_pool(max_workers: int = 5): global _cache_worker_pool, _cache_worker_running _cache_worker_pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="pypi-cache-") _cache_worker_running = True threading.Thread(target=_cache_dispatcher_loop, daemon=True).start() ``` ### Dispatcher Loop - Polls DB every 2 seconds when idle - Fetches batch of 10 ready tasks - Marks tasks in_progress before submitting to pool - Orders by depth (shallow first) then FIFO ### Task Processing 1. Dedup check - skip if package already cached 2. Dedup check - skip if pending/in_progress task exists for same package 3. Depth check - fail if >= 10 levels deep 4. Fetch package index page 5. Download best matching file (prefer wheels) 6. Store artifact, extract dependencies 7. Queue child tasks for each dependency 8. Mark completed or handle failure --- ## Retry Logic Exponential backoff with 3 attempts: | Attempt | Backoff | |---------|---------| | 1 fails | 30 seconds | | 2 fails | 60 seconds | | 3 fails | Permanent failure | ```python backoff_seconds = 30 * (2 ** (attempts - 1)) task.next_retry_at = datetime.utcnow() + timedelta(seconds=backoff_seconds) ``` --- ## API Endpoints | Endpoint | Method | Purpose | |----------|--------|---------| | `/pypi/cache/status` | GET | Queue health summary | | `/pypi/cache/failed` | GET | List failed tasks with errors | | `/pypi/cache/retry/{package}` | POST | Retry single failed package | | `/pypi/cache/retry-all` | POST | Retry all failed packages | ### Response Examples **GET /pypi/cache/status** ```json { "pending": 12, "in_progress": 3, "completed": 847, "failed": 5 } ``` **GET /pypi/cache/failed** ```json [ { "package": "some-obscure-pkg", "error": "Timeout connecting to upstream", "attempts": 3, "failed_at": "2026-02-02T10:30:00Z" } ] ``` --- ## Integration Points ### Replace Thread Spawning (pypi_proxy.py) ```python # OLD: _start_background_dependency_caching(base_url, unique_deps) # NEW: for dep_name, dep_version in unique_deps: _enqueue_cache_task( db, package_name=dep_name, version_constraint=dep_version, parent_task_id=None, depth=0, triggered_by_artifact=sha256, ) ``` ### App Startup (main.py) ```python @app.on_event("startup") async def startup(): init_cache_worker_pool(max_workers=settings.PYPI_CACHE_WORKERS) @app.on_event("shutdown") async def shutdown(): shutdown_cache_worker_pool() ``` ### Configuration (config.py) ```python PYPI_CACHE_WORKERS = int(os.getenv("ORCHARD_PYPI_CACHE_WORKERS", "5")) PYPI_CACHE_MAX_DEPTH = int(os.getenv("ORCHARD_PYPI_CACHE_MAX_DEPTH", "10")) PYPI_CACHE_MAX_ATTEMPTS = int(os.getenv("ORCHARD_PYPI_CACHE_MAX_ATTEMPTS", "3")) ``` --- ## Files to Create/Modify | File | Action | |------|--------| | `migrations/0XX_pypi_cache_tasks.sql` | Create - new table | | `backend/app/models.py` | Modify - add PyPICacheTask model | | `backend/app/pypi_cache_worker.py` | Create - worker pool + processing | | `backend/app/pypi_proxy.py` | Modify - replace threads, add API | | `backend/app/main.py` | Modify - init worker on startup | | `backend/app/config.py` | Modify - add config variables | | `backend/tests/test_pypi_cache_worker.py` | Create - unit tests | | `backend/tests/integration/test_pypi_cache_api.py` | Create - API tests | --- ## Deduplication Strategy ### At Task Creation Time ```python def _enqueue_cache_task(db, package_name, ...): # Check for existing pending/in_progress task existing_task = db.query(PyPICacheTask).filter( PyPICacheTask.package_name == package_name, PyPICacheTask.status.in_(["pending", "in_progress"]) ).first() if existing_task: return existing_task # Check if already cached if _find_cached_package(db, package_name): return None # Create new task ... ``` ### At Processing Time (safety check) ```python def _process_cache_task(task_id): # Double-check in case of race if _find_cached_package(db, task.package_name): _mark_task_completed(db, task, cached_artifact_id=existing.artifact_id) return ``` --- ## Success Criteria - [ ] No unbounded thread creation - [ ] All dependency caching attempts tracked in database - [ ] Failed tasks automatically retry with backoff - [ ] API provides visibility into queue status - [ ] Manual retry capability for failed packages - [ ] Existing pip install workflow unchanged (transparent) - [ ] Tests cover worker, retry, and API functionality