Remove proactive PyPI dependency caching feature
The background task queue for proactively caching package dependencies was causing server instability and unnecessary growth. The PyPI proxy now only caches packages on-demand when users request them. Removed: - PyPI cache worker (background task queue and worker pool) - PyPICacheTask model and related database schema - Cache management API endpoints (/pypi/cache/*) - Background Jobs admin dashboard - Dependency extraction and queueing logic Kept: - On-demand package caching (still works when users request packages) - Async httpx for non-blocking downloads (prevents health check failures) - URL-based cache lookups for deduplication
This commit is contained in:
@@ -9,172 +9,25 @@ import hashlib
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import tarfile
|
||||
import tempfile
|
||||
import zipfile
|
||||
from io import BytesIO
|
||||
from typing import Optional, List, Tuple
|
||||
from typing import Optional
|
||||
from urllib.parse import urljoin, urlparse, quote, unquote
|
||||
|
||||
import httpx
|
||||
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, Request, Response
|
||||
from fastapi import APIRouter, Depends, HTTPException, Request, Response
|
||||
from fastapi.responses import StreamingResponse, HTMLResponse
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from .auth import require_admin
|
||||
from .database import get_db
|
||||
from .models import User, UpstreamSource, CachedUrl, Artifact, Project, Package, Tag, PackageVersion, ArtifactDependency
|
||||
from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, Tag, PackageVersion
|
||||
from .storage import S3Storage, get_storage
|
||||
from .config import get_env_upstream_sources
|
||||
from .pypi_cache_worker import (
|
||||
enqueue_cache_task,
|
||||
get_cache_status,
|
||||
get_failed_tasks,
|
||||
get_active_tasks,
|
||||
get_recent_activity,
|
||||
retry_failed_task,
|
||||
retry_all_failed_tasks,
|
||||
cancel_cache_task,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/pypi", tags=["pypi-proxy"])
|
||||
|
||||
|
||||
def _parse_requires_dist(requires_dist: str) -> Tuple[str, Optional[str]]:
|
||||
"""Parse a Requires-Dist line into (package_name, version_constraint).
|
||||
|
||||
Examples:
|
||||
"requests (>=2.25.0)" -> ("requests", ">=2.25.0")
|
||||
"typing-extensions; python_version < '3.8'" -> ("typing-extensions", None)
|
||||
"numpy>=1.21.0" -> ("numpy", ">=1.21.0")
|
||||
"certifi" -> ("certifi", None)
|
||||
|
||||
Returns:
|
||||
Tuple of (normalized_package_name, version_constraint or None)
|
||||
"""
|
||||
# Remove any environment markers (after semicolon)
|
||||
if ';' in requires_dist:
|
||||
requires_dist = requires_dist.split(';')[0].strip()
|
||||
|
||||
# Match patterns like "package (>=1.0)" or "package>=1.0" or "package"
|
||||
# Pattern breakdown: package name, optional whitespace, optional version in parens or directly
|
||||
match = re.match(
|
||||
r'^([a-zA-Z0-9][-a-zA-Z0-9._]*)\s*(?:\(([^)]+)\)|([<>=!~][^\s;]+))?',
|
||||
requires_dist.strip()
|
||||
)
|
||||
|
||||
if not match:
|
||||
return None, None
|
||||
|
||||
package_name = match.group(1)
|
||||
# Version can be in parentheses (group 2) or directly after name (group 3)
|
||||
version_constraint = match.group(2) or match.group(3)
|
||||
|
||||
# Normalize package name (PEP 503)
|
||||
normalized_name = re.sub(r'[-_.]+', '-', package_name).lower()
|
||||
|
||||
# Clean up version constraint
|
||||
if version_constraint:
|
||||
version_constraint = version_constraint.strip()
|
||||
|
||||
return normalized_name, version_constraint
|
||||
|
||||
|
||||
def _extract_requires_from_metadata(metadata_content: str) -> List[Tuple[str, Optional[str]]]:
|
||||
"""Extract all Requires-Dist entries from METADATA/PKG-INFO content.
|
||||
|
||||
Args:
|
||||
metadata_content: The content of a METADATA or PKG-INFO file
|
||||
|
||||
Returns:
|
||||
List of (package_name, version_constraint) tuples
|
||||
"""
|
||||
dependencies = []
|
||||
|
||||
for line in metadata_content.split('\n'):
|
||||
if line.startswith('Requires-Dist:'):
|
||||
# Extract the value after "Requires-Dist:"
|
||||
value = line[len('Requires-Dist:'):].strip()
|
||||
pkg_name, version = _parse_requires_dist(value)
|
||||
if pkg_name:
|
||||
dependencies.append((pkg_name, version))
|
||||
|
||||
return dependencies
|
||||
|
||||
|
||||
def _extract_metadata_from_wheel(content: bytes) -> Optional[str]:
|
||||
"""Extract METADATA file content from a wheel (zip) file.
|
||||
|
||||
Wheel files have structure: {package}-{version}.dist-info/METADATA
|
||||
|
||||
Args:
|
||||
content: The wheel file content as bytes
|
||||
|
||||
Returns:
|
||||
METADATA file content as string, or None if not found
|
||||
"""
|
||||
try:
|
||||
with zipfile.ZipFile(BytesIO(content)) as zf:
|
||||
# Find the .dist-info directory
|
||||
for name in zf.namelist():
|
||||
if name.endswith('.dist-info/METADATA'):
|
||||
return zf.read(name).decode('utf-8', errors='replace')
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to extract metadata from wheel: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def _extract_metadata_from_sdist(content: bytes, filename: str) -> Optional[str]:
|
||||
"""Extract PKG-INFO file content from a source distribution (.tar.gz).
|
||||
|
||||
Source distributions have structure: {package}-{version}/PKG-INFO
|
||||
|
||||
Args:
|
||||
content: The tarball content as bytes
|
||||
filename: The original filename (used to determine package name)
|
||||
|
||||
Returns:
|
||||
PKG-INFO file content as string, or None if not found
|
||||
"""
|
||||
try:
|
||||
with tarfile.open(fileobj=BytesIO(content), mode='r:gz') as tf:
|
||||
# Find PKG-INFO in the root directory of the archive
|
||||
for member in tf.getmembers():
|
||||
if member.name.endswith('/PKG-INFO') and member.name.count('/') == 1:
|
||||
f = tf.extractfile(member)
|
||||
if f:
|
||||
return f.read().decode('utf-8', errors='replace')
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to extract metadata from sdist {filename}: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def _extract_dependencies(content: bytes, filename: str) -> List[Tuple[str, Optional[str]]]:
|
||||
"""Extract dependencies from a PyPI package file.
|
||||
|
||||
Supports wheel (.whl) and source distribution (.tar.gz) formats.
|
||||
|
||||
Args:
|
||||
content: The package file content as bytes
|
||||
filename: The original filename
|
||||
|
||||
Returns:
|
||||
List of (package_name, version_constraint) tuples
|
||||
"""
|
||||
metadata = None
|
||||
|
||||
if filename.endswith('.whl'):
|
||||
metadata = _extract_metadata_from_wheel(content)
|
||||
elif filename.endswith('.tar.gz'):
|
||||
metadata = _extract_metadata_from_sdist(content, filename)
|
||||
|
||||
if metadata:
|
||||
return _extract_requires_from_metadata(metadata)
|
||||
|
||||
return []
|
||||
|
||||
# Timeout configuration for proxy requests
|
||||
PROXY_CONNECT_TIMEOUT = 30.0
|
||||
PROXY_READ_TIMEOUT = 60.0
|
||||
@@ -521,7 +374,6 @@ async def pypi_download_file(
|
||||
package_name: str,
|
||||
filename: str,
|
||||
upstream: Optional[str] = None,
|
||||
cache_depth: int = Query(default=0, ge=0, le=100, alias="cache-depth"),
|
||||
db: Session = Depends(get_db),
|
||||
storage: S3Storage = Depends(get_storage),
|
||||
):
|
||||
@@ -532,7 +384,6 @@ async def pypi_download_file(
|
||||
package_name: The package name
|
||||
filename: The filename to download
|
||||
upstream: URL-encoded upstream URL to fetch from
|
||||
cache_depth: Current cache recursion depth (used by cache worker for nested deps)
|
||||
"""
|
||||
if not upstream:
|
||||
raise HTTPException(
|
||||
@@ -656,7 +507,7 @@ async def pypi_download_file(
|
||||
sha256 = result.sha256
|
||||
size = result.size
|
||||
|
||||
# Read content for metadata extraction and response
|
||||
# Read content for response
|
||||
with open(tmp_path, 'rb') as f:
|
||||
content = f.read()
|
||||
|
||||
@@ -766,50 +617,6 @@ async def pypi_download_file(
|
||||
)
|
||||
db.add(cached_url_record)
|
||||
|
||||
# Extract and store dependencies
|
||||
dependencies = _extract_dependencies(content, filename)
|
||||
unique_deps = []
|
||||
if dependencies:
|
||||
# Deduplicate dependencies by package name (keep first occurrence)
|
||||
seen_packages = set()
|
||||
for dep_name, dep_version in dependencies:
|
||||
if dep_name not in seen_packages:
|
||||
seen_packages.add(dep_name)
|
||||
unique_deps.append((dep_name, dep_version))
|
||||
|
||||
logger.info(f"PyPI proxy: extracted {len(unique_deps)} dependencies from {filename} (deduped from {len(dependencies)})")
|
||||
for dep_name, dep_version in unique_deps:
|
||||
# Check if this dependency already exists for this artifact
|
||||
existing_dep = db.query(ArtifactDependency).filter(
|
||||
ArtifactDependency.artifact_id == sha256,
|
||||
ArtifactDependency.dependency_project == "_pypi",
|
||||
ArtifactDependency.dependency_package == dep_name,
|
||||
).first()
|
||||
|
||||
if not existing_dep:
|
||||
dep = ArtifactDependency(
|
||||
artifact_id=sha256,
|
||||
dependency_project="_pypi",
|
||||
dependency_package=dep_name,
|
||||
version_constraint=dep_version if dep_version else "*",
|
||||
)
|
||||
db.add(dep)
|
||||
|
||||
# Proactively cache dependencies via task queue
|
||||
# Dependencies are queued at cache_depth + 1 to track recursion
|
||||
if unique_deps:
|
||||
next_depth = cache_depth + 1
|
||||
for dep_name, dep_version in unique_deps:
|
||||
enqueue_cache_task(
|
||||
db,
|
||||
package_name=dep_name,
|
||||
version_constraint=dep_version,
|
||||
parent_task_id=None, # Top-level, triggered by user download
|
||||
depth=next_depth,
|
||||
triggered_by_artifact=sha256,
|
||||
)
|
||||
logger.info(f"PyPI proxy: queued {len(unique_deps)} dependencies for caching (depth={next_depth})")
|
||||
|
||||
db.commit()
|
||||
|
||||
# Return the file
|
||||
@@ -833,119 +640,3 @@ async def pypi_download_file(
|
||||
except Exception as e:
|
||||
logger.exception(f"PyPI proxy: error downloading {filename}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Cache Status and Management Endpoints
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@router.get("/cache/status")
|
||||
async def pypi_cache_status(
|
||||
db: Session = Depends(get_db),
|
||||
_current_user: User = Depends(require_admin),
|
||||
):
|
||||
"""
|
||||
Get summary of the PyPI cache task queue.
|
||||
|
||||
Returns counts of tasks by status (pending, in_progress, completed, failed).
|
||||
Requires admin privileges.
|
||||
"""
|
||||
return get_cache_status(db)
|
||||
|
||||
|
||||
@router.get("/cache/failed")
|
||||
async def pypi_cache_failed(
|
||||
limit: int = Query(default=50, ge=1, le=500),
|
||||
db: Session = Depends(get_db),
|
||||
_current_user: User = Depends(require_admin),
|
||||
):
|
||||
"""
|
||||
Get list of failed cache tasks for debugging.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of tasks to return (default 50, max 500).
|
||||
|
||||
Requires admin privileges.
|
||||
"""
|
||||
return get_failed_tasks(db, limit=limit)
|
||||
|
||||
|
||||
@router.get("/cache/active")
|
||||
async def pypi_cache_active(
|
||||
limit: int = Query(default=50, ge=1, le=500),
|
||||
db: Session = Depends(get_db),
|
||||
_current_user: User = Depends(require_admin),
|
||||
):
|
||||
"""
|
||||
Get list of currently active (in_progress) cache tasks.
|
||||
|
||||
Shows what the cache workers are currently processing.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of tasks to return (default 50, max 500).
|
||||
|
||||
Requires admin privileges.
|
||||
"""
|
||||
return get_active_tasks(db, limit=limit)
|
||||
|
||||
|
||||
@router.post("/cache/retry/{package_name}")
|
||||
async def pypi_cache_retry(
|
||||
package_name: str,
|
||||
db: Session = Depends(get_db),
|
||||
_current_user: User = Depends(require_admin),
|
||||
):
|
||||
"""
|
||||
Reset a failed cache task to retry.
|
||||
|
||||
Args:
|
||||
package_name: The package name to retry.
|
||||
|
||||
Requires admin privileges.
|
||||
"""
|
||||
task = retry_failed_task(db, package_name)
|
||||
if not task:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"No failed cache task found for package '{package_name}'"
|
||||
)
|
||||
return {"message": f"Retry queued for {task.package_name}", "task_id": str(task.id)}
|
||||
|
||||
|
||||
@router.post("/cache/retry-all")
|
||||
async def pypi_cache_retry_all(
|
||||
db: Session = Depends(get_db),
|
||||
_current_user: User = Depends(require_admin),
|
||||
):
|
||||
"""
|
||||
Reset all failed cache tasks to retry.
|
||||
|
||||
Returns the count of tasks that were reset.
|
||||
Requires admin privileges.
|
||||
"""
|
||||
count = retry_all_failed_tasks(db)
|
||||
return {"message": f"Queued {count} tasks for retry", "count": count}
|
||||
|
||||
|
||||
@router.post("/cache/cancel/{package_name}")
|
||||
async def pypi_cache_cancel(
|
||||
package_name: str,
|
||||
db: Session = Depends(get_db),
|
||||
_current_user: User = Depends(require_admin),
|
||||
):
|
||||
"""
|
||||
Cancel an in-progress or pending cache task.
|
||||
|
||||
Args:
|
||||
package_name: The package name to cancel.
|
||||
|
||||
Requires admin privileges.
|
||||
"""
|
||||
task = cancel_cache_task(db, package_name)
|
||||
if not task:
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"No active cache task found for package '{package_name}'"
|
||||
)
|
||||
return {"message": f"Cancelled task for {task.package_name}", "task_id": str(task.id)}
|
||||
|
||||
Reference in New Issue
Block a user