""" Transparent PyPI proxy implementing PEP 503 (Simple API). Provides endpoints that allow pip to use Orchard as a PyPI index URL. Artifacts are cached on first access through configured upstream sources. """ import hashlib import json import logging import os import re import tarfile import tempfile import zipfile from io import BytesIO from typing import Optional, List, Tuple from urllib.parse import urljoin, urlparse, quote, unquote import httpx from fastapi import APIRouter, Depends, HTTPException, Request, Response from fastapi.responses import StreamingResponse, HTMLResponse, RedirectResponse from sqlalchemy.orm import Session from .database import get_db from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, PackageVersion, ArtifactDependency from .storage import S3Storage, get_storage from .config import get_env_upstream_sources, get_settings from .http_client import HttpClientManager from .cache_service import CacheService, CacheCategory from .db_utils import ArtifactRepository logger = logging.getLogger(__name__) router = APIRouter(prefix="/pypi", tags=["pypi-proxy"]) def get_http_client(request: Request) -> HttpClientManager: """Get HttpClientManager from app state.""" return request.app.state.http_client def get_cache(request: Request) -> CacheService: """Get CacheService from app state.""" return request.app.state.cache # Timeout configuration for proxy requests PROXY_CONNECT_TIMEOUT = 30.0 PROXY_READ_TIMEOUT = 60.0 def _parse_requires_dist(requires_dist: str) -> Tuple[str, Optional[str]]: """Parse a Requires-Dist line into (package_name, version_constraint). Examples: "requests (>=2.25.0)" -> ("requests", ">=2.25.0") "typing-extensions; python_version < '3.8'" -> ("typing-extensions", None) "numpy>=1.21.0" -> ("numpy", ">=1.21.0") "certifi" -> ("certifi", None) Returns: Tuple of (normalized_package_name, version_constraint or None) """ # Remove any environment markers (after semicolon) if ';' in requires_dist: requires_dist = requires_dist.split(';')[0].strip() # Match patterns like "package (>=1.0)" or "package>=1.0" or "package" match = re.match( r'^([a-zA-Z0-9][-a-zA-Z0-9._]*)\s*(?:\(([^)]+)\)|([<>=!~][^\s;]+))?', requires_dist.strip() ) if not match: return None, None package_name = match.group(1) # Version can be in parentheses (group 2) or directly after name (group 3) version_constraint = match.group(2) or match.group(3) # Normalize package name (PEP 503) normalized_name = re.sub(r'[-_.]+', '-', package_name).lower() # Clean up version constraint if version_constraint: version_constraint = version_constraint.strip() return normalized_name, version_constraint def _extract_requires_from_metadata(metadata_content: str) -> List[Tuple[str, Optional[str]]]: """Extract all Requires-Dist entries from METADATA/PKG-INFO content. Args: metadata_content: The content of a METADATA or PKG-INFO file Returns: List of (package_name, version_constraint) tuples """ dependencies = [] for line in metadata_content.split('\n'): if line.startswith('Requires-Dist:'): value = line[len('Requires-Dist:'):].strip() pkg_name, version = _parse_requires_dist(value) if pkg_name: dependencies.append((pkg_name, version)) return dependencies def _extract_metadata_from_wheel(file_path: str) -> Optional[str]: """Extract METADATA file content from a wheel (zip) file. Args: file_path: Path to the wheel file Returns: METADATA file content as string, or None if not found """ try: with zipfile.ZipFile(file_path) as zf: for name in zf.namelist(): if name.endswith('.dist-info/METADATA'): return zf.read(name).decode('utf-8', errors='replace') except Exception as e: logger.warning(f"Failed to extract metadata from wheel: {e}") return None def _extract_metadata_from_sdist(file_path: str) -> Optional[str]: """Extract PKG-INFO file content from a source distribution (.tar.gz). Args: file_path: Path to the tarball file Returns: PKG-INFO file content as string, or None if not found """ try: with tarfile.open(file_path, mode='r:gz') as tf: for member in tf.getmembers(): if member.name.endswith('/PKG-INFO') and member.name.count('/') == 1: f = tf.extractfile(member) if f: return f.read().decode('utf-8', errors='replace') except Exception as e: logger.warning(f"Failed to extract metadata from sdist: {e}") return None def _extract_dependencies_from_file(file_path: str, filename: str) -> List[Tuple[str, Optional[str]]]: """Extract dependencies from a PyPI package file. Supports wheel (.whl) and source distribution (.tar.gz) formats. Args: file_path: Path to the package file filename: The original filename Returns: List of (package_name, version_constraint) tuples """ metadata = None if filename.endswith('.whl'): metadata = _extract_metadata_from_wheel(file_path) elif filename.endswith('.tar.gz'): metadata = _extract_metadata_from_sdist(file_path) if metadata: return _extract_requires_from_metadata(metadata) return [] def _parse_upstream_error(response: httpx.Response) -> str: """Parse upstream error response to extract useful error details. Handles JFrog/Artifactory policy errors and other common formats. Returns a user-friendly error message. """ status = response.status_code try: body = response.text except Exception: return f"HTTP {status}" # Try to parse as JSON (JFrog/Artifactory format) try: data = json.loads(body) # JFrog Artifactory format: {"errors": [{"status": 403, "message": "..."}]} if "errors" in data and isinstance(data["errors"], list): messages = [] for err in data["errors"]: if isinstance(err, dict) and "message" in err: messages.append(err["message"]) if messages: return "; ".join(messages) # Alternative format: {"message": "..."} if "message" in data: return data["message"] # Alternative format: {"error": "..."} if "error" in data: return data["error"] except (json.JSONDecodeError, ValueError): pass # Check for policy-related keywords in plain text response policy_keywords = ["policy", "blocked", "forbidden", "curation", "security"] if any(kw in body.lower() for kw in policy_keywords): # Truncate long responses but preserve the message if len(body) > 500: return body[:500] + "..." return body # Default: just return status code return f"HTTP {status}" def _extract_pypi_version(filename: str) -> Optional[str]: """Extract version from PyPI filename. Handles formats like: - cowsay-6.1-py3-none-any.whl - cowsay-1.0.tar.gz - some_package-1.2.3.post1-cp39-cp39-linux_x86_64.whl """ # Remove extension if filename.endswith('.whl'): # Wheel: name-version-pytag-abitag-platform.whl parts = filename[:-4].split('-') if len(parts) >= 2: return parts[1] elif filename.endswith('.tar.gz'): # Source: name-version.tar.gz base = filename[:-7] # Find the last hyphen that precedes a version-like string match = re.match(r'^(.+)-(\d+.*)$', base) if match: return match.group(2) elif filename.endswith('.zip'): # Egg/zip: name-version.zip base = filename[:-4] match = re.match(r'^(.+)-(\d+.*)$', base) if match: return match.group(2) return None def _get_pypi_upstream_sources(db: Session) -> list[UpstreamSource]: """Get all enabled upstream sources configured for PyPI.""" # Get database sources db_sources = ( db.query(UpstreamSource) .filter( UpstreamSource.source_type == "pypi", UpstreamSource.enabled == True, ) .order_by(UpstreamSource.priority) .all() ) # Get env sources env_sources = [ s for s in get_env_upstream_sources() if s.source_type == "pypi" and s.enabled ] # Combine and sort by priority all_sources = list(db_sources) + list(env_sources) return sorted(all_sources, key=lambda s: s.priority) def _build_auth_headers(source) -> dict: """Build authentication headers for an upstream source.""" headers = {} if hasattr(source, 'auth_type'): if source.auth_type == "bearer": password = source.get_password() if hasattr(source, 'get_password') else getattr(source, 'password', None) if password: headers["Authorization"] = f"Bearer {password}" elif source.auth_type == "api_key": custom_headers = source.get_headers() if hasattr(source, 'get_headers') else {} if custom_headers: headers.update(custom_headers) return headers def _get_basic_auth(source) -> Optional[tuple[str, str]]: """Get basic auth credentials if applicable.""" if hasattr(source, 'auth_type') and source.auth_type == "basic": username = getattr(source, 'username', None) if username: password = source.get_password() if hasattr(source, 'get_password') else getattr(source, 'password', '') return (username, password or '') return None def _get_base_url(request: Request) -> str: """ Get the external base URL, respecting X-Forwarded-Proto header. When behind a reverse proxy that terminates SSL, the request.base_url will show http:// even though the external URL is https://. This function checks the X-Forwarded-Proto header to determine the correct scheme. """ base_url = str(request.base_url).rstrip('/') # Check for X-Forwarded-Proto header (set by reverse proxies) forwarded_proto = request.headers.get('x-forwarded-proto') if forwarded_proto: # Replace the scheme with the forwarded protocol parsed = urlparse(base_url) base_url = f"{forwarded_proto}://{parsed.netloc}{parsed.path}" return base_url def _rewrite_package_links(html: str, base_url: str, package_name: str, upstream_base_url: str) -> str: """ Rewrite download links in a PyPI simple page to go through our proxy. Args: html: The HTML content from upstream base_url: Our server's base URL package_name: The package name for the URL path upstream_base_url: The upstream URL used to fetch this page (for resolving relative URLs) Returns: HTML with rewritten download links """ # Pattern to match href attributes in anchor tags # PyPI simple pages have links like: # file.tar.gz # Or relative URLs from Artifactory like: # def replace_href(match): original_url = match.group(1) # Resolve relative URLs to absolute using the upstream base URL if not original_url.startswith(('http://', 'https://')): # Split off fragment before resolving url_without_fragment = original_url.split('#')[0] fragment_part = original_url[len(url_without_fragment):] absolute_url = urljoin(upstream_base_url, url_without_fragment) + fragment_part else: absolute_url = original_url # Extract the filename from the URL parsed = urlparse(absolute_url) path_parts = parsed.path.split('/') filename = path_parts[-1] if path_parts else '' # Keep the hash fragment if present fragment = f"#{parsed.fragment}" if parsed.fragment else "" # Encode the absolute URL (without fragment) for safe transmission encoded_url = quote(absolute_url.split('#')[0], safe='') # Build new URL pointing to our proxy new_url = f"{base_url}/pypi/simple/{package_name}/{filename}?upstream={encoded_url}{fragment}" return f'href="{new_url}"' # Match href="..." patterns rewritten = re.sub(r'href="([^"]+)"', replace_href, html) return rewritten @router.get("/simple/") async def pypi_simple_index( request: Request, db: Session = Depends(get_db), ): """ PyPI Simple API index - lists all packages. Proxies to the first available upstream PyPI source. """ sources = _get_pypi_upstream_sources(db) if not sources: raise HTTPException( status_code=503, detail="No PyPI upstream sources configured" ) # Try each source in priority order last_error = None last_status = None for source in sources: try: headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"} headers.update(_build_auth_headers(source)) auth = _get_basic_auth(source) # Use URL as-is - users should provide full path including /simple simple_url = source.url.rstrip('/') + '/' timeout = httpx.Timeout(PROXY_READ_TIMEOUT, connect=PROXY_CONNECT_TIMEOUT) async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client: response = await client.get( simple_url, headers=headers, auth=auth, ) # Handle redirects manually to avoid loops if response.status_code in (301, 302, 303, 307, 308): redirect_url = response.headers.get('location') if redirect_url: # Follow the redirect once response = await client.get( redirect_url, headers=headers, auth=auth, follow_redirects=False, ) if response.status_code == 200: # Return the index as-is (links are to package pages, not files) # We could rewrite these too, but for now just proxy content = response.text # Rewrite package links to go through our proxy base_url = _get_base_url(request) content = re.sub( r'href="([^"]+)/"', lambda m: f'href="{base_url}/pypi/simple/{m.group(1)}/"', content ) return HTMLResponse(content=content) # Parse upstream error for policy/blocking messages last_error = _parse_upstream_error(response) last_status = response.status_code logger.warning(f"PyPI proxy: upstream returned {response.status_code}: {last_error}") except httpx.ConnectError as e: last_error = f"Connection failed: {e}" last_status = 502 logger.warning(f"PyPI proxy: failed to connect to {source.url}: {e}") except httpx.TimeoutException as e: last_error = f"Timeout: {e}" last_status = 504 logger.warning(f"PyPI proxy: timeout connecting to {source.url}: {e}") except Exception as e: last_error = str(e) last_status = 502 logger.warning(f"PyPI proxy: error fetching from {source.url}: {e}") # Pass through 4xx errors (like 403 policy blocks) so users understand why status_code = last_status if last_status and 400 <= last_status < 500 else 502 raise HTTPException( status_code=status_code, detail=f"Upstream error: {last_error}" ) @router.get("/simple/{package_name}/") async def pypi_package_versions( request: Request, package_name: str, db: Session = Depends(get_db), ): """ PyPI Simple API package page - lists all versions/files for a package. Proxies to upstream and rewrites download links to go through our cache. """ sources = _get_pypi_upstream_sources(db) if not sources: raise HTTPException( status_code=503, detail="No PyPI upstream sources configured" ) base_url = _get_base_url(request) # Normalize package name (PEP 503) normalized_name = re.sub(r'[-_.]+', '-', package_name).lower() # Try each source in priority order last_error = None last_status = None for source in sources: try: headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"} headers.update(_build_auth_headers(source)) auth = _get_basic_auth(source) # Use URL as-is - users should provide full path including /simple package_url = source.url.rstrip('/') + f'/{normalized_name}/' final_url = package_url # Track final URL after redirects timeout = httpx.Timeout(PROXY_READ_TIMEOUT, connect=PROXY_CONNECT_TIMEOUT) async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client: response = await client.get( package_url, headers=headers, auth=auth, ) # Handle redirects manually redirect_count = 0 while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5: redirect_url = response.headers.get('location') if not redirect_url: break # Make redirect URL absolute if needed if not redirect_url.startswith('http'): redirect_url = urljoin(final_url, redirect_url) final_url = redirect_url # Update final URL response = await client.get( redirect_url, headers=headers, auth=auth, follow_redirects=False, ) redirect_count += 1 if response.status_code == 200: content = response.text # Rewrite download links to go through our proxy # Pass final_url so relative URLs can be resolved correctly content = _rewrite_package_links(content, base_url, normalized_name, final_url) return HTMLResponse(content=content) if response.status_code == 404: # Package not found in this source, try next last_error = f"Package not found in {source.name}" last_status = 404 continue # Parse upstream error for policy/blocking messages last_error = _parse_upstream_error(response) last_status = response.status_code logger.warning(f"PyPI proxy: upstream returned {response.status_code} for {package_name}: {last_error}") except httpx.ConnectError as e: last_error = f"Connection failed: {e}" last_status = 502 logger.warning(f"PyPI proxy: failed to connect to {source.url}: {e}") except httpx.TimeoutException as e: last_error = f"Timeout: {e}" last_status = 504 logger.warning(f"PyPI proxy: timeout connecting to {source.url}: {e}") except Exception as e: last_error = str(e) last_status = 502 logger.warning(f"PyPI proxy: error fetching {package_name} from {source.url}: {e}") # Pass through 4xx errors (like 403 policy blocks) so users understand why status_code = last_status if last_status and 400 <= last_status < 500 else 404 raise HTTPException( status_code=status_code, detail=f"Package '{package_name}' error: {last_error}" ) @router.get("/simple/{package_name}/{filename}") async def pypi_download_file( request: Request, package_name: str, filename: str, upstream: Optional[str] = None, db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), ): """ Download a package file, caching it in Orchard. Args: package_name: The package name filename: The filename to download upstream: URL-encoded upstream URL to fetch from """ if not upstream: raise HTTPException( status_code=400, detail="Missing 'upstream' query parameter with source URL" ) # Decode the upstream URL upstream_url = unquote(upstream) # Check if we already have this URL cached url_hash = hashlib.sha256(upstream_url.encode()).hexdigest() cached_url = db.query(CachedUrl).filter(CachedUrl.url_hash == url_hash).first() if cached_url: # Serve from cache artifact = db.query(Artifact).filter(Artifact.id == cached_url.artifact_id).first() if artifact: logger.info(f"PyPI proxy: serving cached {filename} (artifact {artifact.id[:12]})") settings = get_settings() try: if settings.pypi_download_mode == "redirect": # Redirect to S3 presigned URL - client downloads directly from S3 presigned_url = storage.generate_presigned_url(artifact.s3_key) return RedirectResponse( url=presigned_url, status_code=302, headers={ "X-Checksum-SHA256": artifact.id, "X-Cache": "HIT", } ) else: # Proxy mode - stream from S3 through Orchard stream, content_length, _ = storage.get_stream(artifact.s3_key) def stream_content(): """Generator that yields chunks from the S3 stream.""" try: for chunk in stream.iter_chunks(): yield chunk finally: stream.close() return StreamingResponse( stream_content(), media_type=artifact.content_type or "application/octet-stream", headers={ "Content-Disposition": f'attachment; filename="{filename}"', "Content-Length": str(content_length), "X-Checksum-SHA256": artifact.id, "X-Cache": "HIT", } ) except Exception as e: logger.error(f"PyPI proxy: error serving cached artifact: {e}") # Fall through to fetch from upstream # Not cached - fetch from upstream sources = _get_pypi_upstream_sources(db) # Use the first available source for authentication headers # Note: The upstream URL may point to files.pythonhosted.org or other CDNs, # not the configured source URL directly, so we can't strictly validate the host matched_source = sources[0] if sources else None try: headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"} if matched_source: headers.update(_build_auth_headers(matched_source)) auth = _get_basic_auth(matched_source) if matched_source else None timeout = httpx.Timeout(300.0, connect=PROXY_CONNECT_TIMEOUT) # 5 minutes for large files # Initialize extracted dependencies list extracted_deps = [] # Fetch the file logger.info(f"PyPI proxy: fetching {filename} from {upstream_url}") async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client: response = await client.get( upstream_url, headers=headers, auth=auth, ) # Handle redirects manually redirect_count = 0 while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5: redirect_url = response.headers.get('location') if not redirect_url: break if not redirect_url.startswith('http'): redirect_url = urljoin(upstream_url, redirect_url) logger.info(f"PyPI proxy: following redirect to {redirect_url}") # Don't send auth to different hosts redirect_headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"} redirect_auth = None if urlparse(redirect_url).netloc == urlparse(upstream_url).netloc: redirect_headers.update(headers) redirect_auth = auth response = await client.get( redirect_url, headers=redirect_headers, auth=redirect_auth, follow_redirects=False, ) redirect_count += 1 if response.status_code != 200: # Parse upstream error for policy/blocking messages error_detail = _parse_upstream_error(response) logger.warning(f"PyPI proxy: upstream returned {response.status_code} for {filename}: {error_detail}") raise HTTPException( status_code=response.status_code, detail=f"Upstream error: {error_detail}" ) content_type = response.headers.get('content-type', 'application/octet-stream') # Stream to temp file to avoid loading large packages into memory # This keeps memory usage constant regardless of package size # Using async iteration to avoid blocking the event loop tmp_path = None try: with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file: tmp_path = tmp_file.name async for chunk in response.aiter_bytes(chunk_size=65536): # 64KB chunks tmp_file.write(chunk) # Store in S3 from temp file (computes hash and deduplicates automatically) with open(tmp_path, 'rb') as f: result = storage.store(f) sha256 = result.sha256 size = result.size s3_key = result.s3_key # Extract dependencies from the temp file before cleaning up extracted_deps = _extract_dependencies_from_file(tmp_path, filename) if extracted_deps: logger.info(f"PyPI proxy: extracted {len(extracted_deps)} dependencies from {filename}") logger.info(f"PyPI proxy: downloaded {filename}, {size} bytes, sha256={sha256[:12]}") finally: # Clean up temp file if tmp_path and os.path.exists(tmp_path): os.unlink(tmp_path) # Check if artifact already exists existing = db.query(Artifact).filter(Artifact.id == sha256).first() if existing: # Increment ref count existing.ref_count += 1 db.flush() else: # Create artifact record new_artifact = Artifact( id=sha256, original_name=filename, content_type=content_type, size=size, ref_count=1, created_by="pypi-proxy", s3_key=result.s3_key, checksum_md5=result.md5, checksum_sha1=result.sha1, s3_etag=result.s3_etag, ) db.add(new_artifact) db.flush() # Create/get system project and package system_project = db.query(Project).filter(Project.name == "_pypi").first() if not system_project: system_project = Project( name="_pypi", description="System project for cached PyPI packages", is_public=True, is_system=True, created_by="pypi-proxy", ) db.add(system_project) db.flush() elif not system_project.is_system: # Ensure existing project is marked as system system_project.is_system = True db.flush() # Normalize package name normalized_name = re.sub(r'[-_.]+', '-', package_name).lower() package = db.query(Package).filter( Package.project_id == system_project.id, Package.name == normalized_name, ).first() if not package: package = Package( project_id=system_project.id, name=normalized_name, description=f"PyPI package: {normalized_name}", format="pypi", ) db.add(package) db.flush() # Extract and create version # Only create version for actual package files, not .metadata files version = _extract_pypi_version(filename) if version and not filename.endswith('.metadata'): # Check by version string (the unique constraint is on package_id + version) existing_version = db.query(PackageVersion).filter( PackageVersion.package_id == package.id, PackageVersion.version == version, ).first() if not existing_version: pkg_version = PackageVersion( package_id=package.id, artifact_id=sha256, version=version, version_source="filename", created_by="pypi-proxy", ) db.add(pkg_version) # Cache the URL mapping existing_cached = db.query(CachedUrl).filter(CachedUrl.url_hash == url_hash).first() if not existing_cached: cached_url_record = CachedUrl( url_hash=url_hash, url=upstream_url, artifact_id=sha256, ) db.add(cached_url_record) # Store extracted dependencies (deduplicate first - METADATA can list same dep under multiple extras) if extracted_deps: # Deduplicate: keep first version constraint seen for each package name seen_deps: dict[str, str] = {} for dep_name, dep_version in extracted_deps: if dep_name not in seen_deps: seen_deps[dep_name] = dep_version if dep_version else "*" for dep_name, dep_version in seen_deps.items(): # Check if this dependency already exists for this artifact existing_dep = db.query(ArtifactDependency).filter( ArtifactDependency.artifact_id == sha256, ArtifactDependency.dependency_project == "_pypi", ArtifactDependency.dependency_package == dep_name, ).first() if not existing_dep: dep = ArtifactDependency( artifact_id=sha256, dependency_project="_pypi", dependency_package=dep_name, version_constraint=dep_version, ) db.add(dep) db.commit() # Serve the file from S3 settings = get_settings() try: if settings.pypi_download_mode == "redirect": # Redirect to S3 presigned URL - client downloads directly from S3 presigned_url = storage.generate_presigned_url(s3_key) return RedirectResponse( url=presigned_url, status_code=302, headers={ "X-Checksum-SHA256": sha256, "X-Cache": "MISS", } ) else: # Proxy mode - stream from S3 through Orchard stream, content_length, _ = storage.get_stream(s3_key) def stream_content(): """Generator that yields chunks from the S3 stream.""" try: for chunk in stream.iter_chunks(): yield chunk finally: stream.close() return StreamingResponse( stream_content(), media_type=content_type, headers={ "Content-Disposition": f'attachment; filename="{filename}"', "Content-Length": str(size), "X-Checksum-SHA256": sha256, "X-Cache": "MISS", } ) except Exception as e: logger.error(f"PyPI proxy: error serving from S3: {e}") raise HTTPException(status_code=500, detail=f"Error serving file: {e}") except httpx.ConnectError as e: raise HTTPException(status_code=502, detail=f"Connection failed: {e}") except httpx.TimeoutException as e: raise HTTPException(status_code=504, detail=f"Timeout: {e}") except HTTPException: raise except Exception as e: logger.exception(f"PyPI proxy: error downloading {filename}") raise HTTPException(status_code=500, detail=str(e))