The pypi_download_file, pypi_simple_index, and pypi_package_versions endpoints were using synchronous httpx.Client inside async functions. When upstream PyPI servers respond slowly, this blocked the entire FastAPI event loop, preventing health checks from responding. Kubernetes would then kill the pod after the liveness probe timed out. Changes: - httpx.Client → httpx.AsyncClient - client.get() → await client.get() - response.iter_bytes() → response.aiter_bytes() This ensures the event loop remains responsive during slow upstream downloads, allowing health checks to succeed even when downloads take 20+ seconds.
952 lines
33 KiB
Python
952 lines
33 KiB
Python
"""
|
|
Transparent PyPI proxy implementing PEP 503 (Simple API).
|
|
|
|
Provides endpoints that allow pip to use Orchard as a PyPI index URL.
|
|
Artifacts are cached on first access through configured upstream sources.
|
|
"""
|
|
|
|
import hashlib
|
|
import logging
|
|
import os
|
|
import re
|
|
import tarfile
|
|
import tempfile
|
|
import zipfile
|
|
from io import BytesIO
|
|
from typing import Optional, List, Tuple
|
|
from urllib.parse import urljoin, urlparse, quote, unquote
|
|
|
|
import httpx
|
|
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Query, Request, Response
|
|
from fastapi.responses import StreamingResponse, HTMLResponse
|
|
from sqlalchemy.orm import Session
|
|
|
|
from .auth import require_admin
|
|
from .database import get_db
|
|
from .models import User, UpstreamSource, CachedUrl, Artifact, Project, Package, Tag, PackageVersion, ArtifactDependency
|
|
from .storage import S3Storage, get_storage
|
|
from .config import get_env_upstream_sources
|
|
from .pypi_cache_worker import (
|
|
enqueue_cache_task,
|
|
get_cache_status,
|
|
get_failed_tasks,
|
|
get_active_tasks,
|
|
get_recent_activity,
|
|
retry_failed_task,
|
|
retry_all_failed_tasks,
|
|
cancel_cache_task,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/pypi", tags=["pypi-proxy"])
|
|
|
|
|
|
def _parse_requires_dist(requires_dist: str) -> Tuple[str, Optional[str]]:
|
|
"""Parse a Requires-Dist line into (package_name, version_constraint).
|
|
|
|
Examples:
|
|
"requests (>=2.25.0)" -> ("requests", ">=2.25.0")
|
|
"typing-extensions; python_version < '3.8'" -> ("typing-extensions", None)
|
|
"numpy>=1.21.0" -> ("numpy", ">=1.21.0")
|
|
"certifi" -> ("certifi", None)
|
|
|
|
Returns:
|
|
Tuple of (normalized_package_name, version_constraint or None)
|
|
"""
|
|
# Remove any environment markers (after semicolon)
|
|
if ';' in requires_dist:
|
|
requires_dist = requires_dist.split(';')[0].strip()
|
|
|
|
# Match patterns like "package (>=1.0)" or "package>=1.0" or "package"
|
|
# Pattern breakdown: package name, optional whitespace, optional version in parens or directly
|
|
match = re.match(
|
|
r'^([a-zA-Z0-9][-a-zA-Z0-9._]*)\s*(?:\(([^)]+)\)|([<>=!~][^\s;]+))?',
|
|
requires_dist.strip()
|
|
)
|
|
|
|
if not match:
|
|
return None, None
|
|
|
|
package_name = match.group(1)
|
|
# Version can be in parentheses (group 2) or directly after name (group 3)
|
|
version_constraint = match.group(2) or match.group(3)
|
|
|
|
# Normalize package name (PEP 503)
|
|
normalized_name = re.sub(r'[-_.]+', '-', package_name).lower()
|
|
|
|
# Clean up version constraint
|
|
if version_constraint:
|
|
version_constraint = version_constraint.strip()
|
|
|
|
return normalized_name, version_constraint
|
|
|
|
|
|
def _extract_requires_from_metadata(metadata_content: str) -> List[Tuple[str, Optional[str]]]:
|
|
"""Extract all Requires-Dist entries from METADATA/PKG-INFO content.
|
|
|
|
Args:
|
|
metadata_content: The content of a METADATA or PKG-INFO file
|
|
|
|
Returns:
|
|
List of (package_name, version_constraint) tuples
|
|
"""
|
|
dependencies = []
|
|
|
|
for line in metadata_content.split('\n'):
|
|
if line.startswith('Requires-Dist:'):
|
|
# Extract the value after "Requires-Dist:"
|
|
value = line[len('Requires-Dist:'):].strip()
|
|
pkg_name, version = _parse_requires_dist(value)
|
|
if pkg_name:
|
|
dependencies.append((pkg_name, version))
|
|
|
|
return dependencies
|
|
|
|
|
|
def _extract_metadata_from_wheel(content: bytes) -> Optional[str]:
|
|
"""Extract METADATA file content from a wheel (zip) file.
|
|
|
|
Wheel files have structure: {package}-{version}.dist-info/METADATA
|
|
|
|
Args:
|
|
content: The wheel file content as bytes
|
|
|
|
Returns:
|
|
METADATA file content as string, or None if not found
|
|
"""
|
|
try:
|
|
with zipfile.ZipFile(BytesIO(content)) as zf:
|
|
# Find the .dist-info directory
|
|
for name in zf.namelist():
|
|
if name.endswith('.dist-info/METADATA'):
|
|
return zf.read(name).decode('utf-8', errors='replace')
|
|
except Exception as e:
|
|
logger.warning(f"Failed to extract metadata from wheel: {e}")
|
|
return None
|
|
|
|
|
|
def _extract_metadata_from_sdist(content: bytes, filename: str) -> Optional[str]:
|
|
"""Extract PKG-INFO file content from a source distribution (.tar.gz).
|
|
|
|
Source distributions have structure: {package}-{version}/PKG-INFO
|
|
|
|
Args:
|
|
content: The tarball content as bytes
|
|
filename: The original filename (used to determine package name)
|
|
|
|
Returns:
|
|
PKG-INFO file content as string, or None if not found
|
|
"""
|
|
try:
|
|
with tarfile.open(fileobj=BytesIO(content), mode='r:gz') as tf:
|
|
# Find PKG-INFO in the root directory of the archive
|
|
for member in tf.getmembers():
|
|
if member.name.endswith('/PKG-INFO') and member.name.count('/') == 1:
|
|
f = tf.extractfile(member)
|
|
if f:
|
|
return f.read().decode('utf-8', errors='replace')
|
|
except Exception as e:
|
|
logger.warning(f"Failed to extract metadata from sdist {filename}: {e}")
|
|
return None
|
|
|
|
|
|
def _extract_dependencies(content: bytes, filename: str) -> List[Tuple[str, Optional[str]]]:
|
|
"""Extract dependencies from a PyPI package file.
|
|
|
|
Supports wheel (.whl) and source distribution (.tar.gz) formats.
|
|
|
|
Args:
|
|
content: The package file content as bytes
|
|
filename: The original filename
|
|
|
|
Returns:
|
|
List of (package_name, version_constraint) tuples
|
|
"""
|
|
metadata = None
|
|
|
|
if filename.endswith('.whl'):
|
|
metadata = _extract_metadata_from_wheel(content)
|
|
elif filename.endswith('.tar.gz'):
|
|
metadata = _extract_metadata_from_sdist(content, filename)
|
|
|
|
if metadata:
|
|
return _extract_requires_from_metadata(metadata)
|
|
|
|
return []
|
|
|
|
# Timeout configuration for proxy requests
|
|
PROXY_CONNECT_TIMEOUT = 30.0
|
|
PROXY_READ_TIMEOUT = 60.0
|
|
|
|
|
|
def _extract_pypi_version(filename: str) -> Optional[str]:
|
|
"""Extract version from PyPI filename.
|
|
|
|
Handles formats like:
|
|
- cowsay-6.1-py3-none-any.whl
|
|
- cowsay-1.0.tar.gz
|
|
- some_package-1.2.3.post1-cp39-cp39-linux_x86_64.whl
|
|
"""
|
|
# Remove extension
|
|
if filename.endswith('.whl'):
|
|
# Wheel: name-version-pytag-abitag-platform.whl
|
|
parts = filename[:-4].split('-')
|
|
if len(parts) >= 2:
|
|
return parts[1]
|
|
elif filename.endswith('.tar.gz'):
|
|
# Source: name-version.tar.gz
|
|
base = filename[:-7]
|
|
# Find the last hyphen that precedes a version-like string
|
|
match = re.match(r'^(.+)-(\d+.*)$', base)
|
|
if match:
|
|
return match.group(2)
|
|
elif filename.endswith('.zip'):
|
|
# Egg/zip: name-version.zip
|
|
base = filename[:-4]
|
|
match = re.match(r'^(.+)-(\d+.*)$', base)
|
|
if match:
|
|
return match.group(2)
|
|
return None
|
|
|
|
|
|
def _get_pypi_upstream_sources(db: Session) -> list[UpstreamSource]:
|
|
"""Get all enabled upstream sources configured for PyPI."""
|
|
# Get database sources
|
|
db_sources = (
|
|
db.query(UpstreamSource)
|
|
.filter(
|
|
UpstreamSource.source_type == "pypi",
|
|
UpstreamSource.enabled == True,
|
|
)
|
|
.order_by(UpstreamSource.priority)
|
|
.all()
|
|
)
|
|
|
|
# Get env sources
|
|
env_sources = [
|
|
s for s in get_env_upstream_sources()
|
|
if s.source_type == "pypi" and s.enabled
|
|
]
|
|
|
|
# Combine and sort by priority
|
|
all_sources = list(db_sources) + list(env_sources)
|
|
return sorted(all_sources, key=lambda s: s.priority)
|
|
|
|
|
|
def _build_auth_headers(source) -> dict:
|
|
"""Build authentication headers for an upstream source."""
|
|
headers = {}
|
|
|
|
if hasattr(source, 'auth_type'):
|
|
if source.auth_type == "bearer":
|
|
password = source.get_password() if hasattr(source, 'get_password') else getattr(source, 'password', None)
|
|
if password:
|
|
headers["Authorization"] = f"Bearer {password}"
|
|
elif source.auth_type == "api_key":
|
|
custom_headers = source.get_headers() if hasattr(source, 'get_headers') else {}
|
|
if custom_headers:
|
|
headers.update(custom_headers)
|
|
|
|
return headers
|
|
|
|
|
|
def _get_basic_auth(source) -> Optional[tuple[str, str]]:
|
|
"""Get basic auth credentials if applicable."""
|
|
if hasattr(source, 'auth_type') and source.auth_type == "basic":
|
|
username = getattr(source, 'username', None)
|
|
if username:
|
|
password = source.get_password() if hasattr(source, 'get_password') else getattr(source, 'password', '')
|
|
return (username, password or '')
|
|
return None
|
|
|
|
|
|
def _get_base_url(request: Request) -> str:
|
|
"""
|
|
Get the external base URL, respecting X-Forwarded-Proto header.
|
|
|
|
When behind a reverse proxy that terminates SSL, the request.base_url
|
|
will show http:// even though the external URL is https://. This function
|
|
checks the X-Forwarded-Proto header to determine the correct scheme.
|
|
"""
|
|
base_url = str(request.base_url).rstrip('/')
|
|
|
|
# Check for X-Forwarded-Proto header (set by reverse proxies)
|
|
forwarded_proto = request.headers.get('x-forwarded-proto')
|
|
if forwarded_proto:
|
|
# Replace the scheme with the forwarded protocol
|
|
parsed = urlparse(base_url)
|
|
base_url = f"{forwarded_proto}://{parsed.netloc}{parsed.path}"
|
|
|
|
return base_url
|
|
|
|
|
|
def _rewrite_package_links(html: str, base_url: str, package_name: str, upstream_base_url: str) -> str:
|
|
"""
|
|
Rewrite download links in a PyPI simple page to go through our proxy.
|
|
|
|
Args:
|
|
html: The HTML content from upstream
|
|
base_url: Our server's base URL
|
|
package_name: The package name for the URL path
|
|
upstream_base_url: The upstream URL used to fetch this page (for resolving relative URLs)
|
|
|
|
Returns:
|
|
HTML with rewritten download links
|
|
"""
|
|
# Pattern to match href attributes in anchor tags
|
|
# PyPI simple pages have links like:
|
|
# <a href="https://files.pythonhosted.org/packages/.../file.tar.gz#sha256=...">file.tar.gz</a>
|
|
# Or relative URLs from Artifactory like:
|
|
# <a href="../../packages/packages/62/35/.../requests-0.10.0.tar.gz#sha256=...">
|
|
|
|
def replace_href(match):
|
|
original_url = match.group(1)
|
|
|
|
# Resolve relative URLs to absolute using the upstream base URL
|
|
if not original_url.startswith(('http://', 'https://')):
|
|
# Split off fragment before resolving
|
|
url_without_fragment = original_url.split('#')[0]
|
|
fragment_part = original_url[len(url_without_fragment):]
|
|
absolute_url = urljoin(upstream_base_url, url_without_fragment) + fragment_part
|
|
else:
|
|
absolute_url = original_url
|
|
|
|
# Extract the filename from the URL
|
|
parsed = urlparse(absolute_url)
|
|
path_parts = parsed.path.split('/')
|
|
filename = path_parts[-1] if path_parts else ''
|
|
|
|
# Keep the hash fragment if present
|
|
fragment = f"#{parsed.fragment}" if parsed.fragment else ""
|
|
|
|
# Encode the absolute URL (without fragment) for safe transmission
|
|
encoded_url = quote(absolute_url.split('#')[0], safe='')
|
|
|
|
# Build new URL pointing to our proxy
|
|
new_url = f"{base_url}/pypi/simple/{package_name}/{filename}?upstream={encoded_url}{fragment}"
|
|
|
|
return f'href="{new_url}"'
|
|
|
|
# Match href="..." patterns
|
|
rewritten = re.sub(r'href="([^"]+)"', replace_href, html)
|
|
|
|
return rewritten
|
|
|
|
|
|
@router.get("/simple/")
|
|
async def pypi_simple_index(
|
|
request: Request,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
PyPI Simple API index - lists all packages.
|
|
|
|
Proxies to the first available upstream PyPI source.
|
|
"""
|
|
sources = _get_pypi_upstream_sources(db)
|
|
|
|
if not sources:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="No PyPI upstream sources configured"
|
|
)
|
|
|
|
# Try each source in priority order
|
|
last_error = None
|
|
for source in sources:
|
|
try:
|
|
headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
|
headers.update(_build_auth_headers(source))
|
|
auth = _get_basic_auth(source)
|
|
|
|
# Use URL as-is - users should provide full path including /simple
|
|
simple_url = source.url.rstrip('/') + '/'
|
|
|
|
timeout = httpx.Timeout(PROXY_READ_TIMEOUT, connect=PROXY_CONNECT_TIMEOUT)
|
|
|
|
async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client:
|
|
response = await client.get(
|
|
simple_url,
|
|
headers=headers,
|
|
auth=auth,
|
|
)
|
|
|
|
# Handle redirects manually to avoid loops
|
|
if response.status_code in (301, 302, 303, 307, 308):
|
|
redirect_url = response.headers.get('location')
|
|
if redirect_url:
|
|
# Follow the redirect once
|
|
response = await client.get(
|
|
redirect_url,
|
|
headers=headers,
|
|
auth=auth,
|
|
follow_redirects=False,
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
# Return the index as-is (links are to package pages, not files)
|
|
# We could rewrite these too, but for now just proxy
|
|
content = response.text
|
|
|
|
# Rewrite package links to go through our proxy
|
|
base_url = _get_base_url(request)
|
|
content = re.sub(
|
|
r'href="([^"]+)/"',
|
|
lambda m: f'href="{base_url}/pypi/simple/{m.group(1)}/"',
|
|
content
|
|
)
|
|
|
|
return HTMLResponse(content=content)
|
|
|
|
last_error = f"HTTP {response.status_code}"
|
|
|
|
except httpx.ConnectError as e:
|
|
last_error = f"Connection failed: {e}"
|
|
logger.warning(f"PyPI proxy: failed to connect to {source.url}: {e}")
|
|
except httpx.TimeoutException as e:
|
|
last_error = f"Timeout: {e}"
|
|
logger.warning(f"PyPI proxy: timeout connecting to {source.url}: {e}")
|
|
except Exception as e:
|
|
last_error = str(e)
|
|
logger.warning(f"PyPI proxy: error fetching from {source.url}: {e}")
|
|
|
|
raise HTTPException(
|
|
status_code=502,
|
|
detail=f"Failed to fetch package index from upstream: {last_error}"
|
|
)
|
|
|
|
|
|
@router.get("/simple/{package_name}/")
|
|
async def pypi_package_versions(
|
|
request: Request,
|
|
package_name: str,
|
|
db: Session = Depends(get_db),
|
|
):
|
|
"""
|
|
PyPI Simple API package page - lists all versions/files for a package.
|
|
|
|
Proxies to upstream and rewrites download links to go through our cache.
|
|
"""
|
|
sources = _get_pypi_upstream_sources(db)
|
|
|
|
if not sources:
|
|
raise HTTPException(
|
|
status_code=503,
|
|
detail="No PyPI upstream sources configured"
|
|
)
|
|
|
|
base_url = _get_base_url(request)
|
|
|
|
# Normalize package name (PEP 503)
|
|
normalized_name = re.sub(r'[-_.]+', '-', package_name).lower()
|
|
|
|
# Try each source in priority order
|
|
last_error = None
|
|
for source in sources:
|
|
try:
|
|
headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
|
headers.update(_build_auth_headers(source))
|
|
auth = _get_basic_auth(source)
|
|
|
|
# Use URL as-is - users should provide full path including /simple
|
|
package_url = source.url.rstrip('/') + f'/{normalized_name}/'
|
|
final_url = package_url # Track final URL after redirects
|
|
|
|
timeout = httpx.Timeout(PROXY_READ_TIMEOUT, connect=PROXY_CONNECT_TIMEOUT)
|
|
|
|
async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client:
|
|
response = await client.get(
|
|
package_url,
|
|
headers=headers,
|
|
auth=auth,
|
|
)
|
|
|
|
# Handle redirects manually
|
|
redirect_count = 0
|
|
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
|
|
redirect_url = response.headers.get('location')
|
|
if not redirect_url:
|
|
break
|
|
|
|
# Make redirect URL absolute if needed
|
|
if not redirect_url.startswith('http'):
|
|
redirect_url = urljoin(final_url, redirect_url)
|
|
|
|
final_url = redirect_url # Update final URL
|
|
|
|
response = await client.get(
|
|
redirect_url,
|
|
headers=headers,
|
|
auth=auth,
|
|
follow_redirects=False,
|
|
)
|
|
redirect_count += 1
|
|
|
|
if response.status_code == 200:
|
|
content = response.text
|
|
|
|
# Rewrite download links to go through our proxy
|
|
# Pass final_url so relative URLs can be resolved correctly
|
|
content = _rewrite_package_links(content, base_url, normalized_name, final_url)
|
|
|
|
return HTMLResponse(content=content)
|
|
|
|
if response.status_code == 404:
|
|
# Package not found in this source, try next
|
|
last_error = f"Package not found in {source.name}"
|
|
continue
|
|
|
|
last_error = f"HTTP {response.status_code}"
|
|
|
|
except httpx.ConnectError as e:
|
|
last_error = f"Connection failed: {e}"
|
|
logger.warning(f"PyPI proxy: failed to connect to {source.url}: {e}")
|
|
except httpx.TimeoutException as e:
|
|
last_error = f"Timeout: {e}"
|
|
logger.warning(f"PyPI proxy: timeout connecting to {source.url}: {e}")
|
|
except Exception as e:
|
|
last_error = str(e)
|
|
logger.warning(f"PyPI proxy: error fetching {package_name} from {source.url}: {e}")
|
|
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"Package '{package_name}' not found: {last_error}"
|
|
)
|
|
|
|
|
|
@router.get("/simple/{package_name}/{filename}")
|
|
async def pypi_download_file(
|
|
request: Request,
|
|
package_name: str,
|
|
filename: str,
|
|
upstream: Optional[str] = None,
|
|
cache_depth: int = Query(default=0, ge=0, le=100, alias="cache-depth"),
|
|
db: Session = Depends(get_db),
|
|
storage: S3Storage = Depends(get_storage),
|
|
):
|
|
"""
|
|
Download a package file, caching it in Orchard.
|
|
|
|
Args:
|
|
package_name: The package name
|
|
filename: The filename to download
|
|
upstream: URL-encoded upstream URL to fetch from
|
|
cache_depth: Current cache recursion depth (used by cache worker for nested deps)
|
|
"""
|
|
if not upstream:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Missing 'upstream' query parameter with source URL"
|
|
)
|
|
|
|
# Decode the upstream URL
|
|
upstream_url = unquote(upstream)
|
|
|
|
# Check if we already have this URL cached
|
|
url_hash = hashlib.sha256(upstream_url.encode()).hexdigest()
|
|
cached_url = db.query(CachedUrl).filter(CachedUrl.url_hash == url_hash).first()
|
|
|
|
if cached_url:
|
|
# Serve from cache
|
|
artifact = db.query(Artifact).filter(Artifact.id == cached_url.artifact_id).first()
|
|
if artifact:
|
|
logger.info(f"PyPI proxy: serving cached {filename} (artifact {artifact.id[:12]})")
|
|
|
|
# Stream from S3
|
|
try:
|
|
stream, content_length, _ = storage.get_stream(artifact.s3_key)
|
|
|
|
def stream_content():
|
|
"""Generator that yields chunks from the S3 stream."""
|
|
try:
|
|
for chunk in stream.iter_chunks():
|
|
yield chunk
|
|
finally:
|
|
stream.close()
|
|
|
|
return StreamingResponse(
|
|
stream_content(),
|
|
media_type=artifact.content_type or "application/octet-stream",
|
|
headers={
|
|
"Content-Disposition": f'attachment; filename="{filename}"',
|
|
"Content-Length": str(content_length),
|
|
"X-Checksum-SHA256": artifact.id,
|
|
"X-Cache": "HIT",
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"PyPI proxy: error streaming cached artifact: {e}")
|
|
# Fall through to fetch from upstream
|
|
|
|
# Not cached - fetch from upstream
|
|
sources = _get_pypi_upstream_sources(db)
|
|
|
|
# Use the first available source for authentication headers
|
|
# Note: The upstream URL may point to files.pythonhosted.org or other CDNs,
|
|
# not the configured source URL directly, so we can't strictly validate the host
|
|
matched_source = sources[0] if sources else None
|
|
|
|
try:
|
|
headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
|
if matched_source:
|
|
headers.update(_build_auth_headers(matched_source))
|
|
auth = _get_basic_auth(matched_source) if matched_source else None
|
|
|
|
timeout = httpx.Timeout(300.0, connect=PROXY_CONNECT_TIMEOUT) # 5 minutes for large files
|
|
|
|
# Fetch the file
|
|
logger.info(f"PyPI proxy: fetching {filename} from {upstream_url}")
|
|
|
|
async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client:
|
|
response = await client.get(
|
|
upstream_url,
|
|
headers=headers,
|
|
auth=auth,
|
|
)
|
|
|
|
# Handle redirects manually
|
|
redirect_count = 0
|
|
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
|
|
redirect_url = response.headers.get('location')
|
|
if not redirect_url:
|
|
break
|
|
|
|
if not redirect_url.startswith('http'):
|
|
redirect_url = urljoin(upstream_url, redirect_url)
|
|
|
|
logger.info(f"PyPI proxy: following redirect to {redirect_url}")
|
|
|
|
# Don't send auth to different hosts
|
|
redirect_headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
|
redirect_auth = None
|
|
if urlparse(redirect_url).netloc == urlparse(upstream_url).netloc:
|
|
redirect_headers.update(headers)
|
|
redirect_auth = auth
|
|
|
|
response = await client.get(
|
|
redirect_url,
|
|
headers=redirect_headers,
|
|
auth=redirect_auth,
|
|
follow_redirects=False,
|
|
)
|
|
redirect_count += 1
|
|
|
|
if response.status_code != 200:
|
|
raise HTTPException(
|
|
status_code=response.status_code,
|
|
detail=f"Upstream returned {response.status_code}"
|
|
)
|
|
|
|
content_type = response.headers.get('content-type', 'application/octet-stream')
|
|
|
|
# Stream to temp file to avoid loading large packages into memory
|
|
# This keeps memory usage constant regardless of package size
|
|
# Using async iteration to avoid blocking the event loop
|
|
tmp_path = None
|
|
try:
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file:
|
|
tmp_path = tmp_file.name
|
|
async for chunk in response.aiter_bytes(chunk_size=65536): # 64KB chunks
|
|
tmp_file.write(chunk)
|
|
|
|
# Store in S3 from temp file (computes hash and deduplicates automatically)
|
|
with open(tmp_path, 'rb') as f:
|
|
result = storage.store(f)
|
|
sha256 = result.sha256
|
|
size = result.size
|
|
|
|
# Read content for metadata extraction and response
|
|
with open(tmp_path, 'rb') as f:
|
|
content = f.read()
|
|
|
|
logger.info(f"PyPI proxy: downloaded {filename}, {size} bytes, sha256={sha256[:12]}")
|
|
finally:
|
|
# Clean up temp file
|
|
if tmp_path and os.path.exists(tmp_path):
|
|
os.unlink(tmp_path)
|
|
|
|
# Check if artifact already exists
|
|
existing = db.query(Artifact).filter(Artifact.id == sha256).first()
|
|
if existing:
|
|
# Increment ref count
|
|
existing.ref_count += 1
|
|
db.flush()
|
|
else:
|
|
# Create artifact record
|
|
new_artifact = Artifact(
|
|
id=sha256,
|
|
original_name=filename,
|
|
content_type=content_type,
|
|
size=size,
|
|
ref_count=1,
|
|
created_by="pypi-proxy",
|
|
s3_key=result.s3_key,
|
|
checksum_md5=result.md5,
|
|
checksum_sha1=result.sha1,
|
|
s3_etag=result.s3_etag,
|
|
)
|
|
db.add(new_artifact)
|
|
db.flush()
|
|
|
|
# Create/get system project and package
|
|
system_project = db.query(Project).filter(Project.name == "_pypi").first()
|
|
if not system_project:
|
|
system_project = Project(
|
|
name="_pypi",
|
|
description="System project for cached PyPI packages",
|
|
is_public=True,
|
|
is_system=True,
|
|
created_by="pypi-proxy",
|
|
)
|
|
db.add(system_project)
|
|
db.flush()
|
|
elif not system_project.is_system:
|
|
# Ensure existing project is marked as system
|
|
system_project.is_system = True
|
|
db.flush()
|
|
|
|
# Normalize package name
|
|
normalized_name = re.sub(r'[-_.]+', '-', package_name).lower()
|
|
|
|
package = db.query(Package).filter(
|
|
Package.project_id == system_project.id,
|
|
Package.name == normalized_name,
|
|
).first()
|
|
if not package:
|
|
package = Package(
|
|
project_id=system_project.id,
|
|
name=normalized_name,
|
|
description=f"PyPI package: {normalized_name}",
|
|
format="pypi",
|
|
)
|
|
db.add(package)
|
|
db.flush()
|
|
|
|
# Create tag with filename
|
|
existing_tag = db.query(Tag).filter(
|
|
Tag.package_id == package.id,
|
|
Tag.name == filename,
|
|
).first()
|
|
if not existing_tag:
|
|
tag = Tag(
|
|
package_id=package.id,
|
|
name=filename,
|
|
artifact_id=sha256,
|
|
created_by="pypi-proxy",
|
|
)
|
|
db.add(tag)
|
|
|
|
# Extract and create version
|
|
# Only create version for actual package files, not .metadata files
|
|
version = _extract_pypi_version(filename)
|
|
if version and not filename.endswith('.metadata'):
|
|
# Check by version string (the unique constraint is on package_id + version)
|
|
existing_version = db.query(PackageVersion).filter(
|
|
PackageVersion.package_id == package.id,
|
|
PackageVersion.version == version,
|
|
).first()
|
|
if not existing_version:
|
|
pkg_version = PackageVersion(
|
|
package_id=package.id,
|
|
artifact_id=sha256,
|
|
version=version,
|
|
version_source="filename",
|
|
created_by="pypi-proxy",
|
|
)
|
|
db.add(pkg_version)
|
|
|
|
# Cache the URL mapping
|
|
existing_cached = db.query(CachedUrl).filter(CachedUrl.url_hash == url_hash).first()
|
|
if not existing_cached:
|
|
cached_url_record = CachedUrl(
|
|
url_hash=url_hash,
|
|
url=upstream_url,
|
|
artifact_id=sha256,
|
|
)
|
|
db.add(cached_url_record)
|
|
|
|
# Extract and store dependencies
|
|
dependencies = _extract_dependencies(content, filename)
|
|
unique_deps = []
|
|
if dependencies:
|
|
# Deduplicate dependencies by package name (keep first occurrence)
|
|
seen_packages = set()
|
|
for dep_name, dep_version in dependencies:
|
|
if dep_name not in seen_packages:
|
|
seen_packages.add(dep_name)
|
|
unique_deps.append((dep_name, dep_version))
|
|
|
|
logger.info(f"PyPI proxy: extracted {len(unique_deps)} dependencies from {filename} (deduped from {len(dependencies)})")
|
|
for dep_name, dep_version in unique_deps:
|
|
# Check if this dependency already exists for this artifact
|
|
existing_dep = db.query(ArtifactDependency).filter(
|
|
ArtifactDependency.artifact_id == sha256,
|
|
ArtifactDependency.dependency_project == "_pypi",
|
|
ArtifactDependency.dependency_package == dep_name,
|
|
).first()
|
|
|
|
if not existing_dep:
|
|
dep = ArtifactDependency(
|
|
artifact_id=sha256,
|
|
dependency_project="_pypi",
|
|
dependency_package=dep_name,
|
|
version_constraint=dep_version if dep_version else "*",
|
|
)
|
|
db.add(dep)
|
|
|
|
# Proactively cache dependencies via task queue
|
|
# Dependencies are queued at cache_depth + 1 to track recursion
|
|
if unique_deps:
|
|
next_depth = cache_depth + 1
|
|
for dep_name, dep_version in unique_deps:
|
|
enqueue_cache_task(
|
|
db,
|
|
package_name=dep_name,
|
|
version_constraint=dep_version,
|
|
parent_task_id=None, # Top-level, triggered by user download
|
|
depth=next_depth,
|
|
triggered_by_artifact=sha256,
|
|
)
|
|
logger.info(f"PyPI proxy: queued {len(unique_deps)} dependencies for caching (depth={next_depth})")
|
|
|
|
db.commit()
|
|
|
|
# Return the file
|
|
return Response(
|
|
content=content,
|
|
media_type=content_type,
|
|
headers={
|
|
"Content-Disposition": f'attachment; filename="{filename}"',
|
|
"Content-Length": str(size),
|
|
"X-Checksum-SHA256": sha256,
|
|
"X-Cache": "MISS",
|
|
}
|
|
)
|
|
|
|
except httpx.ConnectError as e:
|
|
raise HTTPException(status_code=502, detail=f"Connection failed: {e}")
|
|
except httpx.TimeoutException as e:
|
|
raise HTTPException(status_code=504, detail=f"Timeout: {e}")
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.exception(f"PyPI proxy: error downloading {filename}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
|
|
# =============================================================================
|
|
# Cache Status and Management Endpoints
|
|
# =============================================================================
|
|
|
|
|
|
@router.get("/cache/status")
|
|
async def pypi_cache_status(
|
|
db: Session = Depends(get_db),
|
|
_current_user: User = Depends(require_admin),
|
|
):
|
|
"""
|
|
Get summary of the PyPI cache task queue.
|
|
|
|
Returns counts of tasks by status (pending, in_progress, completed, failed).
|
|
Requires admin privileges.
|
|
"""
|
|
return get_cache_status(db)
|
|
|
|
|
|
@router.get("/cache/failed")
|
|
async def pypi_cache_failed(
|
|
limit: int = Query(default=50, ge=1, le=500),
|
|
db: Session = Depends(get_db),
|
|
_current_user: User = Depends(require_admin),
|
|
):
|
|
"""
|
|
Get list of failed cache tasks for debugging.
|
|
|
|
Args:
|
|
limit: Maximum number of tasks to return (default 50, max 500).
|
|
|
|
Requires admin privileges.
|
|
"""
|
|
return get_failed_tasks(db, limit=limit)
|
|
|
|
|
|
@router.get("/cache/active")
|
|
async def pypi_cache_active(
|
|
limit: int = Query(default=50, ge=1, le=500),
|
|
db: Session = Depends(get_db),
|
|
_current_user: User = Depends(require_admin),
|
|
):
|
|
"""
|
|
Get list of currently active (in_progress) cache tasks.
|
|
|
|
Shows what the cache workers are currently processing.
|
|
|
|
Args:
|
|
limit: Maximum number of tasks to return (default 50, max 500).
|
|
|
|
Requires admin privileges.
|
|
"""
|
|
return get_active_tasks(db, limit=limit)
|
|
|
|
|
|
@router.post("/cache/retry/{package_name}")
|
|
async def pypi_cache_retry(
|
|
package_name: str,
|
|
db: Session = Depends(get_db),
|
|
_current_user: User = Depends(require_admin),
|
|
):
|
|
"""
|
|
Reset a failed cache task to retry.
|
|
|
|
Args:
|
|
package_name: The package name to retry.
|
|
|
|
Requires admin privileges.
|
|
"""
|
|
task = retry_failed_task(db, package_name)
|
|
if not task:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"No failed cache task found for package '{package_name}'"
|
|
)
|
|
return {"message": f"Retry queued for {task.package_name}", "task_id": str(task.id)}
|
|
|
|
|
|
@router.post("/cache/retry-all")
|
|
async def pypi_cache_retry_all(
|
|
db: Session = Depends(get_db),
|
|
_current_user: User = Depends(require_admin),
|
|
):
|
|
"""
|
|
Reset all failed cache tasks to retry.
|
|
|
|
Returns the count of tasks that were reset.
|
|
Requires admin privileges.
|
|
"""
|
|
count = retry_all_failed_tasks(db)
|
|
return {"message": f"Queued {count} tasks for retry", "count": count}
|
|
|
|
|
|
@router.post("/cache/cancel/{package_name}")
|
|
async def pypi_cache_cancel(
|
|
package_name: str,
|
|
db: Session = Depends(get_db),
|
|
_current_user: User = Depends(require_admin),
|
|
):
|
|
"""
|
|
Cancel an in-progress or pending cache task.
|
|
|
|
Args:
|
|
package_name: The package name to cancel.
|
|
|
|
Requires admin privileges.
|
|
"""
|
|
task = cancel_cache_task(db, package_name)
|
|
if not task:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"No active cache task found for package '{package_name}'"
|
|
)
|
|
return {"message": f"Cancelled task for {task.package_name}", "task_id": str(task.id)}
|