Remove proactive PyPI dependency caching feature

The background task queue for proactively caching package dependencies was
causing server instability and unnecessary growth. The PyPI proxy now only
caches packages on-demand when users request them.

Removed:
- PyPI cache worker (background task queue and worker pool)
- PyPICacheTask model and related database schema
- Cache management API endpoints (/pypi/cache/*)
- Background Jobs admin dashboard
- Dependency extraction and queueing logic

Kept:
- On-demand package caching (still works when users request packages)
- Async httpx for non-blocking downloads (prevents health check failures)
- URL-based cache lookups for deduplication
This commit is contained in:
Mondo Diaz
2026-02-02 16:17:33 -06:00
parent cf7bdccb3a
commit 081cc6df83
11 changed files with 4 additions and 2392 deletions

View File

@@ -9,172 +9,25 @@ import hashlib
import logging
import os
import re
import tarfile
import tempfile
import zipfile
from io import BytesIO
from typing import Optional, List, Tuple
from typing import Optional
from urllib.parse import urljoin, urlparse, quote, unquote
import httpx
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, Request, Response
from fastapi import APIRouter, Depends, HTTPException, Request, Response
from fastapi.responses import StreamingResponse, HTMLResponse
from sqlalchemy.orm import Session
from .auth import require_admin
from .database import get_db
from .models import User, UpstreamSource, CachedUrl, Artifact, Project, Package, Tag, PackageVersion, ArtifactDependency
from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, Tag, PackageVersion
from .storage import S3Storage, get_storage
from .config import get_env_upstream_sources
from .pypi_cache_worker import (
enqueue_cache_task,
get_cache_status,
get_failed_tasks,
get_active_tasks,
get_recent_activity,
retry_failed_task,
retry_all_failed_tasks,
cancel_cache_task,
)
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/pypi", tags=["pypi-proxy"])
def _parse_requires_dist(requires_dist: str) -> Tuple[str, Optional[str]]:
"""Parse a Requires-Dist line into (package_name, version_constraint).
Examples:
"requests (>=2.25.0)" -> ("requests", ">=2.25.0")
"typing-extensions; python_version < '3.8'" -> ("typing-extensions", None)
"numpy>=1.21.0" -> ("numpy", ">=1.21.0")
"certifi" -> ("certifi", None)
Returns:
Tuple of (normalized_package_name, version_constraint or None)
"""
# Remove any environment markers (after semicolon)
if ';' in requires_dist:
requires_dist = requires_dist.split(';')[0].strip()
# Match patterns like "package (>=1.0)" or "package>=1.0" or "package"
# Pattern breakdown: package name, optional whitespace, optional version in parens or directly
match = re.match(
r'^([a-zA-Z0-9][-a-zA-Z0-9._]*)\s*(?:\(([^)]+)\)|([<>=!~][^\s;]+))?',
requires_dist.strip()
)
if not match:
return None, None
package_name = match.group(1)
# Version can be in parentheses (group 2) or directly after name (group 3)
version_constraint = match.group(2) or match.group(3)
# Normalize package name (PEP 503)
normalized_name = re.sub(r'[-_.]+', '-', package_name).lower()
# Clean up version constraint
if version_constraint:
version_constraint = version_constraint.strip()
return normalized_name, version_constraint
def _extract_requires_from_metadata(metadata_content: str) -> List[Tuple[str, Optional[str]]]:
"""Extract all Requires-Dist entries from METADATA/PKG-INFO content.
Args:
metadata_content: The content of a METADATA or PKG-INFO file
Returns:
List of (package_name, version_constraint) tuples
"""
dependencies = []
for line in metadata_content.split('\n'):
if line.startswith('Requires-Dist:'):
# Extract the value after "Requires-Dist:"
value = line[len('Requires-Dist:'):].strip()
pkg_name, version = _parse_requires_dist(value)
if pkg_name:
dependencies.append((pkg_name, version))
return dependencies
def _extract_metadata_from_wheel(content: bytes) -> Optional[str]:
"""Extract METADATA file content from a wheel (zip) file.
Wheel files have structure: {package}-{version}.dist-info/METADATA
Args:
content: The wheel file content as bytes
Returns:
METADATA file content as string, or None if not found
"""
try:
with zipfile.ZipFile(BytesIO(content)) as zf:
# Find the .dist-info directory
for name in zf.namelist():
if name.endswith('.dist-info/METADATA'):
return zf.read(name).decode('utf-8', errors='replace')
except Exception as e:
logger.warning(f"Failed to extract metadata from wheel: {e}")
return None
def _extract_metadata_from_sdist(content: bytes, filename: str) -> Optional[str]:
"""Extract PKG-INFO file content from a source distribution (.tar.gz).
Source distributions have structure: {package}-{version}/PKG-INFO
Args:
content: The tarball content as bytes
filename: The original filename (used to determine package name)
Returns:
PKG-INFO file content as string, or None if not found
"""
try:
with tarfile.open(fileobj=BytesIO(content), mode='r:gz') as tf:
# Find PKG-INFO in the root directory of the archive
for member in tf.getmembers():
if member.name.endswith('/PKG-INFO') and member.name.count('/') == 1:
f = tf.extractfile(member)
if f:
return f.read().decode('utf-8', errors='replace')
except Exception as e:
logger.warning(f"Failed to extract metadata from sdist {filename}: {e}")
return None
def _extract_dependencies(content: bytes, filename: str) -> List[Tuple[str, Optional[str]]]:
"""Extract dependencies from a PyPI package file.
Supports wheel (.whl) and source distribution (.tar.gz) formats.
Args:
content: The package file content as bytes
filename: The original filename
Returns:
List of (package_name, version_constraint) tuples
"""
metadata = None
if filename.endswith('.whl'):
metadata = _extract_metadata_from_wheel(content)
elif filename.endswith('.tar.gz'):
metadata = _extract_metadata_from_sdist(content, filename)
if metadata:
return _extract_requires_from_metadata(metadata)
return []
# Timeout configuration for proxy requests
PROXY_CONNECT_TIMEOUT = 30.0
PROXY_READ_TIMEOUT = 60.0
@@ -521,7 +374,6 @@ async def pypi_download_file(
package_name: str,
filename: str,
upstream: Optional[str] = None,
cache_depth: int = Query(default=0, ge=0, le=100, alias="cache-depth"),
db: Session = Depends(get_db),
storage: S3Storage = Depends(get_storage),
):
@@ -532,7 +384,6 @@ async def pypi_download_file(
package_name: The package name
filename: The filename to download
upstream: URL-encoded upstream URL to fetch from
cache_depth: Current cache recursion depth (used by cache worker for nested deps)
"""
if not upstream:
raise HTTPException(
@@ -656,7 +507,7 @@ async def pypi_download_file(
sha256 = result.sha256
size = result.size
# Read content for metadata extraction and response
# Read content for response
with open(tmp_path, 'rb') as f:
content = f.read()
@@ -766,50 +617,6 @@ async def pypi_download_file(
)
db.add(cached_url_record)
# Extract and store dependencies
dependencies = _extract_dependencies(content, filename)
unique_deps = []
if dependencies:
# Deduplicate dependencies by package name (keep first occurrence)
seen_packages = set()
for dep_name, dep_version in dependencies:
if dep_name not in seen_packages:
seen_packages.add(dep_name)
unique_deps.append((dep_name, dep_version))
logger.info(f"PyPI proxy: extracted {len(unique_deps)} dependencies from {filename} (deduped from {len(dependencies)})")
for dep_name, dep_version in unique_deps:
# Check if this dependency already exists for this artifact
existing_dep = db.query(ArtifactDependency).filter(
ArtifactDependency.artifact_id == sha256,
ArtifactDependency.dependency_project == "_pypi",
ArtifactDependency.dependency_package == dep_name,
).first()
if not existing_dep:
dep = ArtifactDependency(
artifact_id=sha256,
dependency_project="_pypi",
dependency_package=dep_name,
version_constraint=dep_version if dep_version else "*",
)
db.add(dep)
# Proactively cache dependencies via task queue
# Dependencies are queued at cache_depth + 1 to track recursion
if unique_deps:
next_depth = cache_depth + 1
for dep_name, dep_version in unique_deps:
enqueue_cache_task(
db,
package_name=dep_name,
version_constraint=dep_version,
parent_task_id=None, # Top-level, triggered by user download
depth=next_depth,
triggered_by_artifact=sha256,
)
logger.info(f"PyPI proxy: queued {len(unique_deps)} dependencies for caching (depth={next_depth})")
db.commit()
# Return the file
@@ -833,119 +640,3 @@ async def pypi_download_file(
except Exception as e:
logger.exception(f"PyPI proxy: error downloading {filename}")
raise HTTPException(status_code=500, detail=str(e))
# =============================================================================
# Cache Status and Management Endpoints
# =============================================================================
@router.get("/cache/status")
async def pypi_cache_status(
db: Session = Depends(get_db),
_current_user: User = Depends(require_admin),
):
"""
Get summary of the PyPI cache task queue.
Returns counts of tasks by status (pending, in_progress, completed, failed).
Requires admin privileges.
"""
return get_cache_status(db)
@router.get("/cache/failed")
async def pypi_cache_failed(
limit: int = Query(default=50, ge=1, le=500),
db: Session = Depends(get_db),
_current_user: User = Depends(require_admin),
):
"""
Get list of failed cache tasks for debugging.
Args:
limit: Maximum number of tasks to return (default 50, max 500).
Requires admin privileges.
"""
return get_failed_tasks(db, limit=limit)
@router.get("/cache/active")
async def pypi_cache_active(
limit: int = Query(default=50, ge=1, le=500),
db: Session = Depends(get_db),
_current_user: User = Depends(require_admin),
):
"""
Get list of currently active (in_progress) cache tasks.
Shows what the cache workers are currently processing.
Args:
limit: Maximum number of tasks to return (default 50, max 500).
Requires admin privileges.
"""
return get_active_tasks(db, limit=limit)
@router.post("/cache/retry/{package_name}")
async def pypi_cache_retry(
package_name: str,
db: Session = Depends(get_db),
_current_user: User = Depends(require_admin),
):
"""
Reset a failed cache task to retry.
Args:
package_name: The package name to retry.
Requires admin privileges.
"""
task = retry_failed_task(db, package_name)
if not task:
raise HTTPException(
status_code=404,
detail=f"No failed cache task found for package '{package_name}'"
)
return {"message": f"Retry queued for {task.package_name}", "task_id": str(task.id)}
@router.post("/cache/retry-all")
async def pypi_cache_retry_all(
db: Session = Depends(get_db),
_current_user: User = Depends(require_admin),
):
"""
Reset all failed cache tasks to retry.
Returns the count of tasks that were reset.
Requires admin privileges.
"""
count = retry_all_failed_tasks(db)
return {"message": f"Queued {count} tasks for retry", "count": count}
@router.post("/cache/cancel/{package_name}")
async def pypi_cache_cancel(
package_name: str,
db: Session = Depends(get_db),
_current_user: User = Depends(require_admin),
):
"""
Cancel an in-progress or pending cache task.
Args:
package_name: The package name to cancel.
Requires admin privileges.
"""
task = cancel_cache_task(db, package_name)
if not task:
raise HTTPException(
status_code=404,
detail=f"No active cache task found for package '{package_name}'"
)
return {"message": f"Cancelled task for {task.package_name}", "task_id": str(task.id)}