Add transparent PyPI proxy and improve upstream sources UI
This commit is contained in:
543
backend/app/pypi_proxy.py
Normal file
543
backend/app/pypi_proxy.py
Normal file
@@ -0,0 +1,543 @@
|
||||
"""
|
||||
Transparent PyPI proxy implementing PEP 503 (Simple API).
|
||||
|
||||
Provides endpoints that allow pip to use Orchard as a PyPI index URL.
|
||||
Artifacts are cached on first access through configured upstream sources.
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import re
|
||||
from typing import Optional
|
||||
from urllib.parse import urljoin, urlparse, quote, unquote
|
||||
|
||||
import httpx
|
||||
from fastapi import APIRouter, Depends, HTTPException, Request, Response
|
||||
from fastapi.responses import StreamingResponse, HTMLResponse
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from .database import get_db
|
||||
from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, Tag
|
||||
from .storage import S3Storage, get_storage
|
||||
from .upstream import (
|
||||
UpstreamClient,
|
||||
UpstreamClientConfig,
|
||||
UpstreamHTTPError,
|
||||
UpstreamConnectionError,
|
||||
UpstreamTimeoutError,
|
||||
)
|
||||
from .config import get_env_upstream_sources
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/pypi", tags=["pypi-proxy"])
|
||||
|
||||
# Timeout configuration for proxy requests
|
||||
PROXY_CONNECT_TIMEOUT = 30.0
|
||||
PROXY_READ_TIMEOUT = 60.0
|
||||
|
||||
|
||||
def _get_pypi_upstream_sources(db: Session) -> list[UpstreamSource]:
|
||||
"""Get all enabled upstream sources configured for PyPI."""
|
||||
# Get database sources
|
||||
db_sources = (
|
||||
db.query(UpstreamSource)
|
||||
.filter(
|
||||
UpstreamSource.source_type == "pypi",
|
||||
UpstreamSource.enabled == True,
|
||||
)
|
||||
.order_by(UpstreamSource.priority)
|
||||
.all()
|
||||
)
|
||||
|
||||
# Get env sources
|
||||
env_sources = [
|
||||
s for s in get_env_upstream_sources()
|
||||
if s.source_type == "pypi" and s.enabled
|
||||
]
|
||||
|
||||
# Combine and sort by priority
|
||||
all_sources = list(db_sources) + list(env_sources)
|
||||
return sorted(all_sources, key=lambda s: s.priority)
|
||||
|
||||
|
||||
def _build_auth_headers(source) -> dict:
|
||||
"""Build authentication headers for an upstream source."""
|
||||
headers = {}
|
||||
|
||||
if hasattr(source, 'auth_type'):
|
||||
if source.auth_type == "bearer":
|
||||
password = source.get_password() if hasattr(source, 'get_password') else getattr(source, 'password', None)
|
||||
if password:
|
||||
headers["Authorization"] = f"Bearer {password}"
|
||||
elif source.auth_type == "api_key":
|
||||
custom_headers = source.get_headers() if hasattr(source, 'get_headers') else {}
|
||||
if custom_headers:
|
||||
headers.update(custom_headers)
|
||||
|
||||
return headers
|
||||
|
||||
|
||||
def _get_basic_auth(source) -> Optional[tuple[str, str]]:
|
||||
"""Get basic auth credentials if applicable."""
|
||||
if hasattr(source, 'auth_type') and source.auth_type == "basic":
|
||||
username = getattr(source, 'username', None)
|
||||
if username:
|
||||
password = source.get_password() if hasattr(source, 'get_password') else getattr(source, 'password', '')
|
||||
return (username, password or '')
|
||||
return None
|
||||
|
||||
|
||||
def _rewrite_package_links(html: str, base_url: str, package_name: str) -> str:
|
||||
"""
|
||||
Rewrite download links in a PyPI simple page to go through our proxy.
|
||||
|
||||
Args:
|
||||
html: The HTML content from upstream
|
||||
base_url: Our server's base URL
|
||||
package_name: The package name for the URL path
|
||||
|
||||
Returns:
|
||||
HTML with rewritten download links
|
||||
"""
|
||||
# Pattern to match href attributes in anchor tags
|
||||
# PyPI simple pages have links like:
|
||||
# <a href="https://files.pythonhosted.org/packages/.../file.tar.gz#sha256=...">file.tar.gz</a>
|
||||
|
||||
def replace_href(match):
|
||||
original_url = match.group(1)
|
||||
# Extract the filename from the URL
|
||||
parsed = urlparse(original_url)
|
||||
path_parts = parsed.path.split('/')
|
||||
filename = path_parts[-1] if path_parts else ''
|
||||
|
||||
# Keep the hash fragment if present
|
||||
fragment = f"#{parsed.fragment}" if parsed.fragment else ""
|
||||
|
||||
# Encode the original URL for safe transmission
|
||||
encoded_url = quote(original_url.split('#')[0], safe='')
|
||||
|
||||
# Build new URL pointing to our proxy
|
||||
new_url = f"{base_url}/pypi/simple/{package_name}/{filename}?upstream={encoded_url}{fragment}"
|
||||
|
||||
return f'href="{new_url}"'
|
||||
|
||||
# Match href="..." patterns
|
||||
rewritten = re.sub(r'href="([^"]+)"', replace_href, html)
|
||||
|
||||
return rewritten
|
||||
|
||||
|
||||
@router.get("/simple/")
|
||||
async def pypi_simple_index(
|
||||
request: Request,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""
|
||||
PyPI Simple API index - lists all packages.
|
||||
|
||||
Proxies to the first available upstream PyPI source.
|
||||
"""
|
||||
sources = _get_pypi_upstream_sources(db)
|
||||
|
||||
if not sources:
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail="No PyPI upstream sources configured"
|
||||
)
|
||||
|
||||
# Try each source in priority order
|
||||
last_error = None
|
||||
for source in sources:
|
||||
try:
|
||||
headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
||||
headers.update(_build_auth_headers(source))
|
||||
auth = _get_basic_auth(source)
|
||||
|
||||
simple_url = source.url.rstrip('/') + '/simple/'
|
||||
|
||||
timeout = httpx.Timeout(
|
||||
connect=PROXY_CONNECT_TIMEOUT,
|
||||
read=PROXY_READ_TIMEOUT,
|
||||
)
|
||||
|
||||
with httpx.Client(timeout=timeout, follow_redirects=False) as client:
|
||||
response = client.get(
|
||||
simple_url,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
)
|
||||
|
||||
# Handle redirects manually to avoid loops
|
||||
if response.status_code in (301, 302, 303, 307, 308):
|
||||
redirect_url = response.headers.get('location')
|
||||
if redirect_url:
|
||||
# Follow the redirect once
|
||||
response = client.get(
|
||||
redirect_url,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
follow_redirects=False,
|
||||
)
|
||||
|
||||
if response.status_code == 200:
|
||||
# Return the index as-is (links are to package pages, not files)
|
||||
# We could rewrite these too, but for now just proxy
|
||||
content = response.text
|
||||
|
||||
# Rewrite package links to go through our proxy
|
||||
base_url = str(request.base_url).rstrip('/')
|
||||
content = re.sub(
|
||||
r'href="([^"]+)/"',
|
||||
lambda m: f'href="{base_url}/pypi/simple/{m.group(1)}/"',
|
||||
content
|
||||
)
|
||||
|
||||
return HTMLResponse(content=content)
|
||||
|
||||
last_error = f"HTTP {response.status_code}"
|
||||
|
||||
except httpx.ConnectError as e:
|
||||
last_error = f"Connection failed: {e}"
|
||||
logger.warning(f"PyPI proxy: failed to connect to {source.url}: {e}")
|
||||
except httpx.TimeoutException as e:
|
||||
last_error = f"Timeout: {e}"
|
||||
logger.warning(f"PyPI proxy: timeout connecting to {source.url}: {e}")
|
||||
except Exception as e:
|
||||
last_error = str(e)
|
||||
logger.warning(f"PyPI proxy: error fetching from {source.url}: {e}")
|
||||
|
||||
raise HTTPException(
|
||||
status_code=502,
|
||||
detail=f"Failed to fetch package index from upstream: {last_error}"
|
||||
)
|
||||
|
||||
|
||||
@router.get("/simple/{package_name}/")
|
||||
async def pypi_package_versions(
|
||||
request: Request,
|
||||
package_name: str,
|
||||
db: Session = Depends(get_db),
|
||||
):
|
||||
"""
|
||||
PyPI Simple API package page - lists all versions/files for a package.
|
||||
|
||||
Proxies to upstream and rewrites download links to go through our cache.
|
||||
"""
|
||||
sources = _get_pypi_upstream_sources(db)
|
||||
|
||||
if not sources:
|
||||
raise HTTPException(
|
||||
status_code=503,
|
||||
detail="No PyPI upstream sources configured"
|
||||
)
|
||||
|
||||
base_url = str(request.base_url).rstrip('/')
|
||||
|
||||
# Normalize package name (PEP 503)
|
||||
normalized_name = re.sub(r'[-_.]+', '-', package_name).lower()
|
||||
|
||||
# Try each source in priority order
|
||||
last_error = None
|
||||
for source in sources:
|
||||
try:
|
||||
headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
||||
headers.update(_build_auth_headers(source))
|
||||
auth = _get_basic_auth(source)
|
||||
|
||||
package_url = source.url.rstrip('/') + f'/simple/{normalized_name}/'
|
||||
|
||||
timeout = httpx.Timeout(
|
||||
connect=PROXY_CONNECT_TIMEOUT,
|
||||
read=PROXY_READ_TIMEOUT,
|
||||
)
|
||||
|
||||
with httpx.Client(timeout=timeout, follow_redirects=False) as client:
|
||||
response = client.get(
|
||||
package_url,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
)
|
||||
|
||||
# Handle redirects manually
|
||||
redirect_count = 0
|
||||
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
|
||||
redirect_url = response.headers.get('location')
|
||||
if not redirect_url:
|
||||
break
|
||||
|
||||
# Make redirect URL absolute if needed
|
||||
if not redirect_url.startswith('http'):
|
||||
redirect_url = urljoin(package_url, redirect_url)
|
||||
|
||||
response = client.get(
|
||||
redirect_url,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
follow_redirects=False,
|
||||
)
|
||||
redirect_count += 1
|
||||
|
||||
if response.status_code == 200:
|
||||
content = response.text
|
||||
|
||||
# Rewrite download links to go through our proxy
|
||||
content = _rewrite_package_links(content, base_url, normalized_name)
|
||||
|
||||
return HTMLResponse(content=content)
|
||||
|
||||
if response.status_code == 404:
|
||||
# Package not found in this source, try next
|
||||
last_error = f"Package not found in {source.name}"
|
||||
continue
|
||||
|
||||
last_error = f"HTTP {response.status_code}"
|
||||
|
||||
except httpx.ConnectError as e:
|
||||
last_error = f"Connection failed: {e}"
|
||||
logger.warning(f"PyPI proxy: failed to connect to {source.url}: {e}")
|
||||
except httpx.TimeoutException as e:
|
||||
last_error = f"Timeout: {e}"
|
||||
logger.warning(f"PyPI proxy: timeout connecting to {source.url}: {e}")
|
||||
except Exception as e:
|
||||
last_error = str(e)
|
||||
logger.warning(f"PyPI proxy: error fetching {package_name} from {source.url}: {e}")
|
||||
|
||||
raise HTTPException(
|
||||
status_code=404,
|
||||
detail=f"Package '{package_name}' not found: {last_error}"
|
||||
)
|
||||
|
||||
|
||||
@router.get("/simple/{package_name}/{filename}")
|
||||
async def pypi_download_file(
|
||||
request: Request,
|
||||
package_name: str,
|
||||
filename: str,
|
||||
upstream: Optional[str] = None,
|
||||
db: Session = Depends(get_db),
|
||||
storage: S3Storage = Depends(get_storage),
|
||||
):
|
||||
"""
|
||||
Download a package file, caching it in Orchard.
|
||||
|
||||
Args:
|
||||
package_name: The package name
|
||||
filename: The filename to download
|
||||
upstream: URL-encoded upstream URL to fetch from
|
||||
"""
|
||||
if not upstream:
|
||||
raise HTTPException(
|
||||
status_code=400,
|
||||
detail="Missing 'upstream' query parameter with source URL"
|
||||
)
|
||||
|
||||
# Decode the upstream URL
|
||||
upstream_url = unquote(upstream)
|
||||
|
||||
# Check if we already have this URL cached
|
||||
url_hash = hashlib.sha256(upstream_url.encode()).hexdigest()
|
||||
cached_url = db.query(CachedUrl).filter(CachedUrl.url_hash == url_hash).first()
|
||||
|
||||
if cached_url:
|
||||
# Serve from cache
|
||||
artifact = db.query(Artifact).filter(Artifact.id == cached_url.artifact_id).first()
|
||||
if artifact:
|
||||
logger.info(f"PyPI proxy: serving cached {filename} (artifact {artifact.id[:12]})")
|
||||
|
||||
# Stream from S3
|
||||
try:
|
||||
content_stream = storage.get_artifact_stream(artifact.id)
|
||||
|
||||
return StreamingResponse(
|
||||
content_stream,
|
||||
media_type=artifact.content_type or "application/octet-stream",
|
||||
headers={
|
||||
"Content-Disposition": f'attachment; filename="{filename}"',
|
||||
"Content-Length": str(artifact.size),
|
||||
"X-Checksum-SHA256": artifact.id,
|
||||
"X-Cache": "HIT",
|
||||
}
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"PyPI proxy: error streaming cached artifact: {e}")
|
||||
# Fall through to fetch from upstream
|
||||
|
||||
# Not cached - fetch from upstream
|
||||
sources = _get_pypi_upstream_sources(db)
|
||||
|
||||
# Find a source that matches the upstream URL
|
||||
matched_source = None
|
||||
for source in sources:
|
||||
source_url = getattr(source, 'url', '')
|
||||
# Check if the upstream URL could come from this source
|
||||
# (This is a loose check - the URL might be from files.pythonhosted.org)
|
||||
if urlparse(upstream_url).netloc in source_url or True: # Allow any source for now
|
||||
matched_source = source
|
||||
break
|
||||
|
||||
if not matched_source and sources:
|
||||
matched_source = sources[0] # Use first source for auth if available
|
||||
|
||||
try:
|
||||
headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
||||
if matched_source:
|
||||
headers.update(_build_auth_headers(matched_source))
|
||||
auth = _get_basic_auth(matched_source) if matched_source else None
|
||||
|
||||
timeout = httpx.Timeout(
|
||||
connect=PROXY_CONNECT_TIMEOUT,
|
||||
read=300.0, # 5 minutes for large files
|
||||
)
|
||||
|
||||
# Fetch the file
|
||||
logger.info(f"PyPI proxy: fetching {filename} from {upstream_url}")
|
||||
|
||||
with httpx.Client(timeout=timeout, follow_redirects=False) as client:
|
||||
response = client.get(
|
||||
upstream_url,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
)
|
||||
|
||||
# Handle redirects manually
|
||||
redirect_count = 0
|
||||
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
|
||||
redirect_url = response.headers.get('location')
|
||||
if not redirect_url:
|
||||
break
|
||||
|
||||
if not redirect_url.startswith('http'):
|
||||
redirect_url = urljoin(upstream_url, redirect_url)
|
||||
|
||||
logger.info(f"PyPI proxy: following redirect to {redirect_url}")
|
||||
|
||||
# Don't send auth to different hosts
|
||||
redirect_headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
||||
redirect_auth = None
|
||||
if urlparse(redirect_url).netloc == urlparse(upstream_url).netloc:
|
||||
redirect_headers.update(headers)
|
||||
redirect_auth = auth
|
||||
|
||||
response = client.get(
|
||||
redirect_url,
|
||||
headers=redirect_headers,
|
||||
auth=redirect_auth,
|
||||
follow_redirects=False,
|
||||
)
|
||||
redirect_count += 1
|
||||
|
||||
if response.status_code != 200:
|
||||
raise HTTPException(
|
||||
status_code=response.status_code,
|
||||
detail=f"Upstream returned {response.status_code}"
|
||||
)
|
||||
|
||||
content = response.content
|
||||
content_type = response.headers.get('content-type', 'application/octet-stream')
|
||||
|
||||
# Compute hash
|
||||
sha256 = hashlib.sha256(content).hexdigest()
|
||||
size = len(content)
|
||||
|
||||
logger.info(f"PyPI proxy: downloaded {filename}, {size} bytes, sha256={sha256[:12]}")
|
||||
|
||||
# Store in S3
|
||||
from io import BytesIO
|
||||
artifact = storage.store_artifact(
|
||||
file_obj=BytesIO(content),
|
||||
filename=filename,
|
||||
content_type=content_type,
|
||||
)
|
||||
|
||||
# Check if artifact already exists
|
||||
existing = db.query(Artifact).filter(Artifact.id == sha256).first()
|
||||
if existing:
|
||||
# Increment ref count
|
||||
existing.ref_count += 1
|
||||
db.flush()
|
||||
else:
|
||||
# Create artifact record
|
||||
new_artifact = Artifact(
|
||||
id=sha256,
|
||||
filename=filename,
|
||||
content_type=content_type,
|
||||
size=size,
|
||||
ref_count=1,
|
||||
)
|
||||
db.add(new_artifact)
|
||||
db.flush()
|
||||
|
||||
# Create/get system project and package
|
||||
system_project = db.query(Project).filter(Project.name == "_pypi").first()
|
||||
if not system_project:
|
||||
system_project = Project(
|
||||
name="_pypi",
|
||||
description="System project for cached PyPI packages",
|
||||
visibility="private",
|
||||
)
|
||||
db.add(system_project)
|
||||
db.flush()
|
||||
|
||||
# Normalize package name
|
||||
normalized_name = re.sub(r'[-_.]+', '-', package_name).lower()
|
||||
|
||||
package = db.query(Package).filter(
|
||||
Package.project_id == system_project.id,
|
||||
Package.name == normalized_name,
|
||||
).first()
|
||||
if not package:
|
||||
package = Package(
|
||||
project_id=system_project.id,
|
||||
name=normalized_name,
|
||||
description=f"PyPI package: {normalized_name}",
|
||||
)
|
||||
db.add(package)
|
||||
db.flush()
|
||||
|
||||
# Create tag with filename
|
||||
existing_tag = db.query(Tag).filter(
|
||||
Tag.package_id == package.id,
|
||||
Tag.name == filename,
|
||||
).first()
|
||||
if not existing_tag:
|
||||
tag = Tag(
|
||||
package_id=package.id,
|
||||
name=filename,
|
||||
artifact_id=sha256,
|
||||
)
|
||||
db.add(tag)
|
||||
|
||||
# Cache the URL mapping
|
||||
existing_cached = db.query(CachedUrl).filter(CachedUrl.url_hash == url_hash).first()
|
||||
if not existing_cached:
|
||||
cached_url_record = CachedUrl(
|
||||
url_hash=url_hash,
|
||||
url=upstream_url,
|
||||
artifact_id=sha256,
|
||||
)
|
||||
db.add(cached_url_record)
|
||||
|
||||
db.commit()
|
||||
|
||||
# Return the file
|
||||
return Response(
|
||||
content=content,
|
||||
media_type=content_type,
|
||||
headers={
|
||||
"Content-Disposition": f'attachment; filename="{filename}"',
|
||||
"Content-Length": str(size),
|
||||
"X-Checksum-SHA256": sha256,
|
||||
"X-Cache": "MISS",
|
||||
}
|
||||
)
|
||||
|
||||
except httpx.ConnectError as e:
|
||||
raise HTTPException(status_code=502, detail=f"Connection failed: {e}")
|
||||
except httpx.TimeoutException as e:
|
||||
raise HTTPException(status_code=504, detail=f"Timeout: {e}")
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.exception(f"PyPI proxy: error downloading {filename}")
|
||||
raise HTTPException(status_code=500, detail=str(e))
|
||||
Reference in New Issue
Block a user