6.4 KiB
6.4 KiB
PyPI Cache Robustness Design
Date: 2026-02-02 Status: Approved Branch: fix/pypi-proxy-timeout
Problem
The current PyPI proxy proactive caching has reliability issues:
- Unbounded thread spawning for each dependency
- Silent failures (logged but not tracked or retried)
- No visibility into cache completeness
- Deps-of-deps often missing due to untracked failures
Solution
Database-backed task queue with managed worker pool, automatic retries, and visibility API.
Data Model
New table pypi_cache_tasks:
CREATE TABLE pypi_cache_tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
-- What to cache
package_name VARCHAR(255) NOT NULL,
version_constraint VARCHAR(255),
-- Origin tracking
parent_task_id UUID REFERENCES pypi_cache_tasks(id) ON DELETE SET NULL,
depth INTEGER NOT NULL DEFAULT 0,
triggered_by_artifact VARCHAR(64),
-- Status
status VARCHAR(20) NOT NULL DEFAULT 'pending',
attempts INTEGER NOT NULL DEFAULT 0,
max_attempts INTEGER NOT NULL DEFAULT 3,
-- Results
cached_artifact_id VARCHAR(64),
error_message TEXT,
-- Timing
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
started_at TIMESTAMP WITH TIME ZONE,
completed_at TIMESTAMP WITH TIME ZONE,
next_retry_at TIMESTAMP WITH TIME ZONE
);
-- Indexes
CREATE INDEX idx_pypi_cache_tasks_status_retry ON pypi_cache_tasks(status, next_retry_at);
CREATE INDEX idx_pypi_cache_tasks_package_status ON pypi_cache_tasks(package_name, status);
CREATE INDEX idx_pypi_cache_tasks_parent ON pypi_cache_tasks(parent_task_id);
-- Constraints
ALTER TABLE pypi_cache_tasks ADD CONSTRAINT check_task_status
CHECK (status IN ('pending', 'in_progress', 'completed', 'failed'));
Worker Architecture
Thread Pool (5 workers default)
_cache_worker_pool: ThreadPoolExecutor = None
_cache_worker_running: bool = False
def init_cache_worker_pool(max_workers: int = 5):
global _cache_worker_pool, _cache_worker_running
_cache_worker_pool = ThreadPoolExecutor(max_workers=max_workers, thread_name_prefix="pypi-cache-")
_cache_worker_running = True
threading.Thread(target=_cache_dispatcher_loop, daemon=True).start()
Dispatcher Loop
- Polls DB every 2 seconds when idle
- Fetches batch of 10 ready tasks
- Marks tasks in_progress before submitting to pool
- Orders by depth (shallow first) then FIFO
Task Processing
- Dedup check - skip if package already cached
- Dedup check - skip if pending/in_progress task exists for same package
- Depth check - fail if >= 10 levels deep
- Fetch package index page
- Download best matching file (prefer wheels)
- Store artifact, extract dependencies
- Queue child tasks for each dependency
- Mark completed or handle failure
Retry Logic
Exponential backoff with 3 attempts:
| Attempt | Backoff |
|---|---|
| 1 fails | 30 seconds |
| 2 fails | 60 seconds |
| 3 fails | Permanent failure |
backoff_seconds = 30 * (2 ** (attempts - 1))
task.next_retry_at = datetime.utcnow() + timedelta(seconds=backoff_seconds)
API Endpoints
| Endpoint | Method | Purpose |
|---|---|---|
/pypi/cache/status |
GET | Queue health summary |
/pypi/cache/failed |
GET | List failed tasks with errors |
/pypi/cache/retry/{package} |
POST | Retry single failed package |
/pypi/cache/retry-all |
POST | Retry all failed packages |
Response Examples
GET /pypi/cache/status
{
"pending": 12,
"in_progress": 3,
"completed": 847,
"failed": 5
}
GET /pypi/cache/failed
[
{
"package": "some-obscure-pkg",
"error": "Timeout connecting to upstream",
"attempts": 3,
"failed_at": "2026-02-02T10:30:00Z"
}
]
Integration Points
Replace Thread Spawning (pypi_proxy.py)
# OLD: _start_background_dependency_caching(base_url, unique_deps)
# NEW:
for dep_name, dep_version in unique_deps:
_enqueue_cache_task(
db,
package_name=dep_name,
version_constraint=dep_version,
parent_task_id=None,
depth=0,
triggered_by_artifact=sha256,
)
App Startup (main.py)
@app.on_event("startup")
async def startup():
init_cache_worker_pool(max_workers=settings.PYPI_CACHE_WORKERS)
@app.on_event("shutdown")
async def shutdown():
shutdown_cache_worker_pool()
Configuration (config.py)
PYPI_CACHE_WORKERS = int(os.getenv("ORCHARD_PYPI_CACHE_WORKERS", "5"))
PYPI_CACHE_MAX_DEPTH = int(os.getenv("ORCHARD_PYPI_CACHE_MAX_DEPTH", "10"))
PYPI_CACHE_MAX_ATTEMPTS = int(os.getenv("ORCHARD_PYPI_CACHE_MAX_ATTEMPTS", "3"))
Files to Create/Modify
| File | Action |
|---|---|
migrations/0XX_pypi_cache_tasks.sql |
Create - new table |
backend/app/models.py |
Modify - add PyPICacheTask model |
backend/app/pypi_cache_worker.py |
Create - worker pool + processing |
backend/app/pypi_proxy.py |
Modify - replace threads, add API |
backend/app/main.py |
Modify - init worker on startup |
backend/app/config.py |
Modify - add config variables |
backend/tests/test_pypi_cache_worker.py |
Create - unit tests |
backend/tests/integration/test_pypi_cache_api.py |
Create - API tests |
Deduplication Strategy
At Task Creation Time
def _enqueue_cache_task(db, package_name, ...):
# Check for existing pending/in_progress task
existing_task = db.query(PyPICacheTask).filter(
PyPICacheTask.package_name == package_name,
PyPICacheTask.status.in_(["pending", "in_progress"])
).first()
if existing_task:
return existing_task
# Check if already cached
if _find_cached_package(db, package_name):
return None
# Create new task
...
At Processing Time (safety check)
def _process_cache_task(task_id):
# Double-check in case of race
if _find_cached_package(db, task.package_name):
_mark_task_completed(db, task, cached_artifact_id=existing.artifact_id)
return
Success Criteria
- No unbounded thread creation
- All dependency caching attempts tracked in database
- Failed tasks automatically retry with backoff
- API provides visibility into queue status
- Manual retry capability for failed packages
- Existing pip install workflow unchanged (transparent)
- Tests cover worker, retry, and API functionality