From 490b05438d3e89639e1e698d3362b5955e8070db Mon Sep 17 00:00:00 2001 From: Mondo Diaz Date: Mon, 2 Feb 2026 11:06:51 -0600 Subject: [PATCH] Add design doc for PyPI cache robustness improvements --- ...2026-02-02-pypi-cache-robustness-design.md | 251 ++++++++++++++++++ 1 file changed, 251 insertions(+) create mode 100644 docs/plans/2026-02-02-pypi-cache-robustness-design.md diff --git a/docs/plans/2026-02-02-pypi-cache-robustness-design.md b/docs/plans/2026-02-02-pypi-cache-robustness-design.md new file mode 100644 index 0000000..ee60dbb --- /dev/null +++ b/docs/plans/2026-02-02-pypi-cache-robustness-design.md @@ -0,0 +1,251 @@ +# PyPI Cache Robustness Design + +**Date:** 2026-02-02 +**Status:** Approved +**Branch:** fix/pypi-proxy-timeout + +## Problem + +The current PyPI proxy proactive caching has reliability issues: +- Unbounded thread spawning for each dependency +- Silent failures (logged but not tracked or retried) +- No visibility into cache completeness +- Deps-of-deps often missing due to untracked failures + +## Solution + +Database-backed task queue with managed worker pool, automatic retries, and visibility API. + +--- + +## Data Model + +New table `pypi_cache_tasks`: + +```sql +CREATE TABLE pypi_cache_tasks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + + -- What to cache + package_name VARCHAR(255) NOT NULL, + version_constraint VARCHAR(255), + + -- Origin tracking + parent_task_id UUID REFERENCES pypi_cache_tasks(id) ON DELETE SET NULL, + depth INTEGER NOT NULL DEFAULT 0, + triggered_by_artifact VARCHAR(64), + + -- Status + status VARCHAR(20) NOT NULL DEFAULT 'pending', + attempts INTEGER NOT NULL DEFAULT 0, + max_attempts INTEGER NOT NULL DEFAULT 3, + + -- Results + cached_artifact_id VARCHAR(64), + error_message TEXT, + + -- Timing + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + started_at TIMESTAMP WITH TIME ZONE, + completed_at TIMESTAMP WITH TIME ZONE, + next_retry_at TIMESTAMP WITH TIME ZONE +); + +-- Indexes +CREATE INDEX idx_pypi_cache_tasks_status_retry ON pypi_cache_tasks(status, next_retry_at); +CREATE INDEX idx_pypi_cache_tasks_package_status ON pypi_cache_tasks(package_name, status); +CREATE INDEX idx_pypi_cache_tasks_parent ON pypi_cache_tasks(parent_task_id); + +-- Constraints +ALTER TABLE pypi_cache_tasks ADD CONSTRAINT check_task_status + CHECK (status IN ('pending', 'in_progress', 'completed', 'failed')); +``` + +--- + +## Worker Architecture + +### Thread Pool (5 workers default) + +```python +_cache_worker_pool: ThreadPoolExecutor = None +_cache_worker_running: bool = False + +def init_cache_worker_pool(max_workers: int = 5): + global _cache_worker_pool, _cache_worker_running + _cache_worker_pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="pypi-cache-") + _cache_worker_running = True + threading.Thread(target=_cache_dispatcher_loop, daemon=True).start() +``` + +### Dispatcher Loop + +- Polls DB every 2 seconds when idle +- Fetches batch of 10 ready tasks +- Marks tasks in_progress before submitting to pool +- Orders by depth (shallow first) then FIFO + +### Task Processing + +1. Dedup check - skip if package already cached +2. Dedup check - skip if pending/in_progress task exists for same package +3. Depth check - fail if >= 10 levels deep +4. Fetch package index page +5. Download best matching file (prefer wheels) +6. Store artifact, extract dependencies +7. Queue child tasks for each dependency +8. Mark completed or handle failure + +--- + +## Retry Logic + +Exponential backoff with 3 attempts: + +| Attempt | Backoff | +|---------|---------| +| 1 fails | 30 seconds | +| 2 fails | 60 seconds | +| 3 fails | Permanent failure | + +```python +backoff_seconds = 30 * (2 ** (attempts - 1)) +task.next_retry_at = datetime.utcnow() + timedelta(seconds=backoff_seconds) +``` + +--- + +## API Endpoints + +| Endpoint | Method | Purpose | +|----------|--------|---------| +| `/pypi/cache/status` | GET | Queue health summary | +| `/pypi/cache/failed` | GET | List failed tasks with errors | +| `/pypi/cache/retry/{package}` | POST | Retry single failed package | +| `/pypi/cache/retry-all` | POST | Retry all failed packages | + +### Response Examples + +**GET /pypi/cache/status** +```json +{ + "pending": 12, + "in_progress": 3, + "completed": 847, + "failed": 5 +} +``` + +**GET /pypi/cache/failed** +```json +[ + { + "package": "some-obscure-pkg", + "error": "Timeout connecting to upstream", + "attempts": 3, + "failed_at": "2026-02-02T10:30:00Z" + } +] +``` + +--- + +## Integration Points + +### Replace Thread Spawning (pypi_proxy.py) + +```python +# OLD: _start_background_dependency_caching(base_url, unique_deps) + +# NEW: +for dep_name, dep_version in unique_deps: + _enqueue_cache_task( + db, + package_name=dep_name, + version_constraint=dep_version, + parent_task_id=None, + depth=0, + triggered_by_artifact=sha256, + ) +``` + +### App Startup (main.py) + +```python +@app.on_event("startup") +async def startup(): + init_cache_worker_pool(max_workers=settings.PYPI_CACHE_WORKERS) + +@app.on_event("shutdown") +async def shutdown(): + shutdown_cache_worker_pool() +``` + +### Configuration (config.py) + +```python +PYPI_CACHE_WORKERS = int(os.getenv("ORCHARD_PYPI_CACHE_WORKERS", "5")) +PYPI_CACHE_MAX_DEPTH = int(os.getenv("ORCHARD_PYPI_CACHE_MAX_DEPTH", "10")) +PYPI_CACHE_MAX_ATTEMPTS = int(os.getenv("ORCHARD_PYPI_CACHE_MAX_ATTEMPTS", "3")) +``` + +--- + +## Files to Create/Modify + +| File | Action | +|------|--------| +| `migrations/0XX_pypi_cache_tasks.sql` | Create - new table | +| `backend/app/models.py` | Modify - add PyPICacheTask model | +| `backend/app/pypi_cache_worker.py` | Create - worker pool + processing | +| `backend/app/pypi_proxy.py` | Modify - replace threads, add API | +| `backend/app/main.py` | Modify - init worker on startup | +| `backend/app/config.py` | Modify - add config variables | +| `backend/tests/test_pypi_cache_worker.py` | Create - unit tests | +| `backend/tests/integration/test_pypi_cache_api.py` | Create - API tests | + +--- + +## Deduplication Strategy + +### At Task Creation Time + +```python +def _enqueue_cache_task(db, package_name, ...): + # Check for existing pending/in_progress task + existing_task = db.query(PyPICacheTask).filter( + PyPICacheTask.package_name == package_name, + PyPICacheTask.status.in_(["pending", "in_progress"]) + ).first() + if existing_task: + return existing_task + + # Check if already cached + if _find_cached_package(db, package_name): + return None + + # Create new task + ... +``` + +### At Processing Time (safety check) + +```python +def _process_cache_task(task_id): + # Double-check in case of race + if _find_cached_package(db, task.package_name): + _mark_task_completed(db, task, cached_artifact_id=existing.artifact_id) + return +``` + +--- + +## Success Criteria + +- [ ] No unbounded thread creation +- [ ] All dependency caching attempts tracked in database +- [ ] Failed tasks automatically retry with backoff +- [ ] API provides visibility into queue status +- [ ] Manual retry capability for failed packages +- [ ] Existing pip install workflow unchanged (transparent) +- [ ] Tests cover worker, retry, and API functionality