perf: use shared HTTP client pool in pypi_download_file
This commit is contained in:
@@ -642,6 +642,8 @@ async def pypi_download_file(
|
||||
upstream: Optional[str] = None,
|
||||
db: Session = Depends(get_db),
|
||||
storage: S3Storage = Depends(get_storage),
|
||||
http_client: HttpClientManager = Depends(get_http_client),
|
||||
cache: CacheService = Depends(get_cache),
|
||||
):
|
||||
"""
|
||||
Download a package file, caching it in Orchard.
|
||||
@@ -723,7 +725,9 @@ async def pypi_download_file(
|
||||
headers.update(_build_auth_headers(matched_source))
|
||||
auth = _get_basic_auth(matched_source) if matched_source else None
|
||||
|
||||
timeout = httpx.Timeout(300.0, connect=PROXY_CONNECT_TIMEOUT) # 5 minutes for large files
|
||||
# Use shared HTTP client from pool with longer timeout for file downloads
|
||||
client = http_client.get_client()
|
||||
download_timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0)
|
||||
|
||||
# Initialize extracted dependencies list
|
||||
extracted_deps = []
|
||||
@@ -731,78 +735,79 @@ async def pypi_download_file(
|
||||
# Fetch the file
|
||||
logger.info(f"PyPI proxy: fetching {filename} from {upstream_url}")
|
||||
|
||||
async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client:
|
||||
response = await client.get(
|
||||
upstream_url,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
timeout=download_timeout,
|
||||
)
|
||||
|
||||
# Handle redirects manually
|
||||
redirect_count = 0
|
||||
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
|
||||
redirect_url = response.headers.get('location')
|
||||
if not redirect_url:
|
||||
break
|
||||
|
||||
if not redirect_url.startswith('http'):
|
||||
redirect_url = urljoin(upstream_url, redirect_url)
|
||||
|
||||
logger.info(f"PyPI proxy: following redirect to {redirect_url}")
|
||||
|
||||
# Don't send auth to different hosts
|
||||
redirect_headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
||||
redirect_auth = None
|
||||
if urlparse(redirect_url).netloc == urlparse(upstream_url).netloc:
|
||||
redirect_headers.update(headers)
|
||||
redirect_auth = auth
|
||||
|
||||
response = await client.get(
|
||||
upstream_url,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
redirect_url,
|
||||
headers=redirect_headers,
|
||||
auth=redirect_auth,
|
||||
follow_redirects=False,
|
||||
timeout=download_timeout,
|
||||
)
|
||||
redirect_count += 1
|
||||
|
||||
if response.status_code != 200:
|
||||
# Parse upstream error for policy/blocking messages
|
||||
error_detail = _parse_upstream_error(response)
|
||||
logger.warning(f"PyPI proxy: upstream returned {response.status_code} for {filename}: {error_detail}")
|
||||
raise HTTPException(
|
||||
status_code=response.status_code,
|
||||
detail=f"Upstream error: {error_detail}"
|
||||
)
|
||||
|
||||
# Handle redirects manually
|
||||
redirect_count = 0
|
||||
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
|
||||
redirect_url = response.headers.get('location')
|
||||
if not redirect_url:
|
||||
break
|
||||
content_type = response.headers.get('content-type', 'application/octet-stream')
|
||||
|
||||
if not redirect_url.startswith('http'):
|
||||
redirect_url = urljoin(upstream_url, redirect_url)
|
||||
# Stream to temp file to avoid loading large packages into memory
|
||||
# This keeps memory usage constant regardless of package size
|
||||
# Using async iteration to avoid blocking the event loop
|
||||
tmp_path = None
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file:
|
||||
tmp_path = tmp_file.name
|
||||
async for chunk in response.aiter_bytes(chunk_size=65536): # 64KB chunks
|
||||
tmp_file.write(chunk)
|
||||
|
||||
logger.info(f"PyPI proxy: following redirect to {redirect_url}")
|
||||
# Store in S3 from temp file (computes hash and deduplicates automatically)
|
||||
with open(tmp_path, 'rb') as f:
|
||||
result = storage.store(f)
|
||||
sha256 = result.sha256
|
||||
size = result.size
|
||||
s3_key = result.s3_key
|
||||
|
||||
# Don't send auth to different hosts
|
||||
redirect_headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
||||
redirect_auth = None
|
||||
if urlparse(redirect_url).netloc == urlparse(upstream_url).netloc:
|
||||
redirect_headers.update(headers)
|
||||
redirect_auth = auth
|
||||
# Extract dependencies from the temp file before cleaning up
|
||||
extracted_deps = _extract_dependencies_from_file(tmp_path, filename)
|
||||
if extracted_deps:
|
||||
logger.info(f"PyPI proxy: extracted {len(extracted_deps)} dependencies from {filename}")
|
||||
|
||||
response = await client.get(
|
||||
redirect_url,
|
||||
headers=redirect_headers,
|
||||
auth=redirect_auth,
|
||||
follow_redirects=False,
|
||||
)
|
||||
redirect_count += 1
|
||||
|
||||
if response.status_code != 200:
|
||||
# Parse upstream error for policy/blocking messages
|
||||
error_detail = _parse_upstream_error(response)
|
||||
logger.warning(f"PyPI proxy: upstream returned {response.status_code} for {filename}: {error_detail}")
|
||||
raise HTTPException(
|
||||
status_code=response.status_code,
|
||||
detail=f"Upstream error: {error_detail}"
|
||||
)
|
||||
|
||||
content_type = response.headers.get('content-type', 'application/octet-stream')
|
||||
|
||||
# Stream to temp file to avoid loading large packages into memory
|
||||
# This keeps memory usage constant regardless of package size
|
||||
# Using async iteration to avoid blocking the event loop
|
||||
tmp_path = None
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file:
|
||||
tmp_path = tmp_file.name
|
||||
async for chunk in response.aiter_bytes(chunk_size=65536): # 64KB chunks
|
||||
tmp_file.write(chunk)
|
||||
|
||||
# Store in S3 from temp file (computes hash and deduplicates automatically)
|
||||
with open(tmp_path, 'rb') as f:
|
||||
result = storage.store(f)
|
||||
sha256 = result.sha256
|
||||
size = result.size
|
||||
s3_key = result.s3_key
|
||||
|
||||
# Extract dependencies from the temp file before cleaning up
|
||||
extracted_deps = _extract_dependencies_from_file(tmp_path, filename)
|
||||
if extracted_deps:
|
||||
logger.info(f"PyPI proxy: extracted {len(extracted_deps)} dependencies from {filename}")
|
||||
|
||||
logger.info(f"PyPI proxy: downloaded {filename}, {size} bytes, sha256={sha256[:12]}")
|
||||
finally:
|
||||
# Clean up temp file
|
||||
if tmp_path and os.path.exists(tmp_path):
|
||||
os.unlink(tmp_path)
|
||||
logger.info(f"PyPI proxy: downloaded {filename}, {size} bytes, sha256={sha256[:12]}")
|
||||
finally:
|
||||
# Clean up temp file
|
||||
if tmp_path and os.path.exists(tmp_path):
|
||||
os.unlink(tmp_path)
|
||||
|
||||
# Check if artifact already exists
|
||||
existing = db.query(Artifact).filter(Artifact.id == sha256).first()
|
||||
|
||||
Reference in New Issue
Block a user