Add streaming download enhancements and tests (#42)
- Add conditional request support (If-None-Match, If-Modified-Since) returning 304 Not Modified - Add caching headers: Cache-Control (immutable), Last-Modified - Add 416 Range Not Satisfiable response for invalid range requests - Add download completion logging with bytes transferred and throughput - Add client disconnect handling during streaming downloads - Add comprehensive streaming download tests
This commit is contained in:
@@ -2972,6 +2972,8 @@ def download_artifact(
|
||||
storage: S3Storage = Depends(get_storage),
|
||||
current_user: Optional[User] = Depends(get_current_user_optional),
|
||||
range: Optional[str] = Header(None),
|
||||
if_none_match: Optional[str] = Header(None, alias="If-None-Match"),
|
||||
if_modified_since: Optional[str] = Header(None, alias="If-Modified-Since"),
|
||||
mode: Optional[Literal["proxy", "redirect", "presigned"]] = Query(
|
||||
default=None,
|
||||
description="Download mode: proxy (stream through backend), redirect (302 to presigned URL), presigned (return JSON with URL)",
|
||||
@@ -2988,6 +2990,15 @@ def download_artifact(
|
||||
"""
|
||||
Download an artifact by reference (tag name, artifact:hash, tag:name).
|
||||
|
||||
Supports conditional requests:
|
||||
- If-None-Match: Returns 304 Not Modified if ETag matches
|
||||
- If-Modified-Since: Returns 304 Not Modified if not modified since date
|
||||
|
||||
Supports range requests for partial downloads and resume:
|
||||
- Range: bytes=0-1023 (first 1KB)
|
||||
- Range: bytes=-1024 (last 1KB)
|
||||
- Returns 206 Partial Content with Content-Range header
|
||||
|
||||
Verification modes:
|
||||
- verify=false (default): No verification, maximum performance
|
||||
- verify=true&verify_mode=stream: Compute hash while streaming, verify after completion.
|
||||
@@ -3000,6 +3011,9 @@ def download_artifact(
|
||||
- X-Content-Length: File size in bytes
|
||||
- ETag: Artifact ID (SHA256)
|
||||
- Digest: RFC 3230 format sha-256 hash
|
||||
- Last-Modified: Artifact creation timestamp
|
||||
- Cache-Control: Immutable caching for content-addressable storage
|
||||
- Accept-Ranges: bytes (advertises range request support)
|
||||
|
||||
When verify=true:
|
||||
- X-Verified: 'true' if verified, 'false' if verification failed
|
||||
@@ -3024,6 +3038,52 @@ def download_artifact(
|
||||
|
||||
filename = sanitize_filename(artifact.original_name or f"{artifact.id}")
|
||||
|
||||
# Format Last-Modified header (RFC 7231 format)
|
||||
last_modified = None
|
||||
last_modified_str = None
|
||||
if artifact.created_at:
|
||||
last_modified = artifact.created_at
|
||||
if last_modified.tzinfo is None:
|
||||
last_modified = last_modified.replace(tzinfo=timezone.utc)
|
||||
last_modified_str = last_modified.strftime("%a, %d %b %Y %H:%M:%S GMT")
|
||||
|
||||
# Handle conditional requests (If-None-Match, If-Modified-Since)
|
||||
# Return 304 Not Modified if content hasn't changed
|
||||
artifact_etag = f'"{artifact.id}"'
|
||||
|
||||
if if_none_match:
|
||||
# Strip quotes and compare with artifact ETag
|
||||
client_etag = if_none_match.strip().strip('"')
|
||||
if client_etag == artifact.id or if_none_match == artifact_etag:
|
||||
return Response(
|
||||
status_code=304,
|
||||
headers={
|
||||
"ETag": artifact_etag,
|
||||
"Cache-Control": "public, max-age=31536000, immutable",
|
||||
**({"Last-Modified": last_modified_str} if last_modified_str else {}),
|
||||
},
|
||||
)
|
||||
|
||||
if if_modified_since and last_modified:
|
||||
try:
|
||||
# Parse If-Modified-Since header
|
||||
from email.utils import parsedate_to_datetime
|
||||
client_date = parsedate_to_datetime(if_modified_since)
|
||||
if client_date.tzinfo is None:
|
||||
client_date = client_date.replace(tzinfo=timezone.utc)
|
||||
# If artifact hasn't been modified since client's date, return 304
|
||||
if last_modified <= client_date:
|
||||
return Response(
|
||||
status_code=304,
|
||||
headers={
|
||||
"ETag": artifact_etag,
|
||||
"Cache-Control": "public, max-age=31536000, immutable",
|
||||
**({"Last-Modified": last_modified_str} if last_modified_str else {}),
|
||||
},
|
||||
)
|
||||
except (ValueError, TypeError):
|
||||
pass # Invalid date format, ignore and continue with download
|
||||
|
||||
# Audit log download
|
||||
user_id = get_user_id(request)
|
||||
_log_audit(
|
||||
@@ -3041,22 +3101,28 @@ def download_artifact(
|
||||
)
|
||||
db.commit()
|
||||
|
||||
# Build common checksum headers (always included)
|
||||
checksum_headers = {
|
||||
# Build common headers (always included)
|
||||
common_headers = {
|
||||
"X-Checksum-SHA256": artifact.id,
|
||||
"X-Content-Length": str(artifact.size),
|
||||
"ETag": f'"{artifact.id}"',
|
||||
"ETag": artifact_etag,
|
||||
# Cache-Control: content-addressable storage is immutable
|
||||
"Cache-Control": "public, max-age=31536000, immutable",
|
||||
}
|
||||
# Add Last-Modified header
|
||||
if last_modified_str:
|
||||
common_headers["Last-Modified"] = last_modified_str
|
||||
|
||||
# Add RFC 3230 Digest header
|
||||
try:
|
||||
digest_base64 = sha256_to_base64(artifact.id)
|
||||
checksum_headers["Digest"] = f"sha-256={digest_base64}"
|
||||
common_headers["Digest"] = f"sha-256={digest_base64}"
|
||||
except Exception:
|
||||
pass # Skip if conversion fails
|
||||
|
||||
# Add MD5 checksum if available
|
||||
if artifact.checksum_md5:
|
||||
checksum_headers["X-Checksum-MD5"] = artifact.checksum_md5
|
||||
common_headers["X-Checksum-MD5"] = artifact.checksum_md5
|
||||
|
||||
# Determine download mode (query param overrides server default)
|
||||
download_mode = mode or settings.download_mode
|
||||
@@ -3096,15 +3162,29 @@ def download_artifact(
|
||||
# Proxy mode (default fallback) - stream through backend
|
||||
# Handle range requests (verification not supported for partial downloads)
|
||||
if range:
|
||||
stream, content_length, content_range = storage.get_stream(
|
||||
artifact.s3_key, range
|
||||
)
|
||||
try:
|
||||
stream, content_length, content_range = storage.get_stream(
|
||||
artifact.s3_key, range
|
||||
)
|
||||
except Exception as e:
|
||||
# S3 returns InvalidRange error for unsatisfiable ranges
|
||||
error_str = str(e).lower()
|
||||
if "invalidrange" in error_str or "range" in error_str:
|
||||
raise HTTPException(
|
||||
status_code=416,
|
||||
detail="Range Not Satisfiable",
|
||||
headers={
|
||||
"Content-Range": f"bytes */{artifact.size}",
|
||||
"Accept-Ranges": "bytes",
|
||||
},
|
||||
)
|
||||
raise
|
||||
|
||||
headers = {
|
||||
"Content-Disposition": build_content_disposition(filename),
|
||||
"Accept-Ranges": "bytes",
|
||||
"Content-Length": str(content_length),
|
||||
**checksum_headers,
|
||||
**common_headers,
|
||||
}
|
||||
if content_range:
|
||||
headers["Content-Range"] = content_range
|
||||
@@ -3121,7 +3201,7 @@ def download_artifact(
|
||||
base_headers = {
|
||||
"Content-Disposition": build_content_disposition(filename),
|
||||
"Accept-Ranges": "bytes",
|
||||
**checksum_headers,
|
||||
**common_headers,
|
||||
}
|
||||
|
||||
# Pre-verification mode: verify before streaming
|
||||
@@ -3189,11 +3269,42 @@ def download_artifact(
|
||||
},
|
||||
)
|
||||
|
||||
# No verification - direct streaming
|
||||
# No verification - direct streaming with completion logging
|
||||
stream, content_length, _ = storage.get_stream(artifact.s3_key)
|
||||
|
||||
def logged_stream():
|
||||
"""Generator that yields chunks and logs completion/disconnection."""
|
||||
import time
|
||||
start_time = time.time()
|
||||
bytes_sent = 0
|
||||
try:
|
||||
for chunk in stream:
|
||||
bytes_sent += len(chunk)
|
||||
yield chunk
|
||||
# Download completed successfully
|
||||
duration = time.time() - start_time
|
||||
throughput_mbps = (bytes_sent / (1024 * 1024)) / duration if duration > 0 else 0
|
||||
logger.info(
|
||||
f"Download completed: artifact={artifact.id[:16]}... "
|
||||
f"bytes={bytes_sent} duration={duration:.2f}s throughput={throughput_mbps:.2f}MB/s"
|
||||
)
|
||||
except GeneratorExit:
|
||||
# Client disconnected before download completed
|
||||
duration = time.time() - start_time
|
||||
logger.warning(
|
||||
f"Download interrupted: artifact={artifact.id[:16]}... "
|
||||
f"bytes_sent={bytes_sent}/{content_length} duration={duration:.2f}s"
|
||||
)
|
||||
except Exception as e:
|
||||
duration = time.time() - start_time
|
||||
logger.error(
|
||||
f"Download error: artifact={artifact.id[:16]}... "
|
||||
f"bytes_sent={bytes_sent} duration={duration:.2f}s error={e}"
|
||||
)
|
||||
raise
|
||||
|
||||
return StreamingResponse(
|
||||
stream,
|
||||
logged_stream(),
|
||||
media_type=artifact.content_type or "application/octet-stream",
|
||||
headers={
|
||||
**base_headers,
|
||||
|
||||
Reference in New Issue
Block a user