- Add upload duration/throughput metrics (duration_ms, throughput_mbps) to response
- Add upload progress logging for large files (hash computation and multipart upload)
- Add client disconnect handling during uploads with proper cleanup
- Add upload progress tracking endpoint GET /upload/{upload_id}/progress
- Add large file upload tests (10MB, 100MB, 1GB)
- Add upload cancellation and timeout handling tests
- Add API documentation for upload endpoints with curl, Python, JavaScript examples
1029 lines
35 KiB
Python
1029 lines
35 KiB
Python
import hashlib
|
|
import logging
|
|
from typing import (
|
|
BinaryIO,
|
|
Tuple,
|
|
Optional,
|
|
Dict,
|
|
Any,
|
|
NamedTuple,
|
|
Protocol,
|
|
runtime_checkable,
|
|
)
|
|
import boto3
|
|
from botocore.config import Config
|
|
from botocore.exceptions import (
|
|
ClientError,
|
|
ConnectionError as BotoConnectionError,
|
|
EndpointConnectionError,
|
|
ReadTimeoutError,
|
|
ConnectTimeoutError,
|
|
)
|
|
|
|
from .config import get_settings
|
|
from .checksum import (
|
|
ChecksumMismatchError,
|
|
HashingStreamWrapper,
|
|
VerifyingStreamWrapper,
|
|
compute_sha256,
|
|
is_valid_sha256,
|
|
)
|
|
|
|
settings = get_settings()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# =============================================================================
|
|
# Storage Backend Protocol/Interface (ISSUE 33)
|
|
# =============================================================================
|
|
|
|
|
|
@runtime_checkable
|
|
class StorageBackend(Protocol):
|
|
"""
|
|
Abstract protocol defining the interface for storage backends.
|
|
|
|
All storage implementations (S3, MinIO, future backends) must implement
|
|
this interface to ensure consistent behavior across the application.
|
|
|
|
Note on Deduplication:
|
|
- This system uses whole-file deduplication based on SHA256 hash
|
|
- Partial/chunk-level deduplication is NOT supported (out of scope for MVP)
|
|
- Files with identical content but different metadata are deduplicated
|
|
"""
|
|
|
|
def store(
|
|
self, file: BinaryIO, content_length: Optional[int] = None
|
|
) -> "StorageResult":
|
|
"""
|
|
Store a file and return StorageResult with all checksums.
|
|
|
|
Content-addressable: if the file already exists (by hash), just return
|
|
the existing hash without uploading again.
|
|
|
|
Args:
|
|
file: File-like object to store
|
|
content_length: Optional hint for file size (enables multipart upload)
|
|
|
|
Returns:
|
|
StorageResult with sha256, size, s3_key, and optional checksums
|
|
|
|
Raises:
|
|
HashComputationError: If hash computation fails
|
|
S3ExistenceCheckError: If existence check fails after retries
|
|
S3UploadError: If upload fails
|
|
"""
|
|
...
|
|
|
|
def get(self, s3_key: str) -> bytes:
|
|
"""
|
|
Retrieve a file by its storage key.
|
|
|
|
Args:
|
|
s3_key: The storage key (path) of the file
|
|
|
|
Returns:
|
|
File content as bytes
|
|
"""
|
|
...
|
|
|
|
def get_stream(
|
|
self, s3_key: str, range_header: Optional[str] = None
|
|
) -> Tuple[Any, int, Optional[str]]:
|
|
"""
|
|
Get a streaming response for a file.
|
|
|
|
Supports range requests for partial downloads.
|
|
|
|
Args:
|
|
s3_key: The storage key of the file
|
|
range_header: Optional HTTP Range header value
|
|
|
|
Returns:
|
|
Tuple of (stream, content_length, content_range)
|
|
"""
|
|
...
|
|
|
|
def delete(self, s3_key: str) -> bool:
|
|
"""
|
|
Delete a file from storage.
|
|
|
|
Args:
|
|
s3_key: The storage key of the file to delete
|
|
|
|
Returns:
|
|
True if deleted successfully, False otherwise
|
|
"""
|
|
...
|
|
|
|
def get_object_info(self, s3_key: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get object metadata without downloading content.
|
|
|
|
Args:
|
|
s3_key: The storage key of the file
|
|
|
|
Returns:
|
|
Dict with size, content_type, last_modified, etag, or None if not found
|
|
"""
|
|
...
|
|
|
|
def generate_presigned_url(
|
|
self,
|
|
s3_key: str,
|
|
expiry: Optional[int] = None,
|
|
response_content_type: Optional[str] = None,
|
|
response_content_disposition: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
Generate a presigned URL for downloading an object.
|
|
|
|
Args:
|
|
s3_key: The storage key of the file
|
|
expiry: URL expiry in seconds
|
|
response_content_type: Override Content-Type header in response
|
|
response_content_disposition: Override Content-Disposition header
|
|
|
|
Returns:
|
|
Presigned URL string
|
|
"""
|
|
...
|
|
|
|
def health_check(self) -> bool:
|
|
"""
|
|
Check if the storage backend is healthy and accessible.
|
|
|
|
Returns:
|
|
True if healthy, False otherwise
|
|
"""
|
|
...
|
|
|
|
|
|
# Threshold for multipart upload (100MB)
|
|
MULTIPART_THRESHOLD = 100 * 1024 * 1024
|
|
# Chunk size for multipart upload (10MB)
|
|
MULTIPART_CHUNK_SIZE = 10 * 1024 * 1024
|
|
# Chunk size for streaming hash computation
|
|
HASH_CHUNK_SIZE = 8 * 1024 * 1024
|
|
# Maximum retries for S3 existence check
|
|
MAX_EXISTENCE_CHECK_RETRIES = 3
|
|
|
|
|
|
class StorageError(Exception):
|
|
"""Base exception for storage operations"""
|
|
|
|
pass
|
|
|
|
|
|
class HashComputationError(StorageError):
|
|
"""Raised when hash computation fails"""
|
|
|
|
pass
|
|
|
|
|
|
class FileSizeExceededError(StorageError):
|
|
"""Raised when file exceeds maximum size during upload"""
|
|
|
|
pass
|
|
|
|
|
|
class S3ExistenceCheckError(StorageError):
|
|
"""Raised when S3 existence check fails after retries"""
|
|
|
|
pass
|
|
|
|
|
|
class S3UploadError(StorageError):
|
|
"""Raised when S3 upload fails"""
|
|
|
|
pass
|
|
|
|
|
|
class StorageResult(NamedTuple):
|
|
"""Result of storing a file with all computed checksums"""
|
|
|
|
sha256: str
|
|
size: int
|
|
s3_key: str
|
|
md5: Optional[str] = None
|
|
sha1: Optional[str] = None
|
|
s3_etag: Optional[str] = None
|
|
already_existed: bool = (
|
|
False # True if artifact was deduplicated (S3 object already existed)
|
|
)
|
|
|
|
|
|
class S3StorageUnavailableError(StorageError):
|
|
"""Raised when S3 storage backend is unavailable"""
|
|
|
|
pass
|
|
|
|
|
|
class HashCollisionError(StorageError):
|
|
"""Raised when a hash collision is detected (extremely rare)"""
|
|
|
|
pass
|
|
|
|
|
|
class S3Storage:
|
|
def __init__(self):
|
|
# Build config with retry and timeout settings
|
|
s3_config = {}
|
|
if settings.s3_use_path_style:
|
|
s3_config["addressing_style"] = "path"
|
|
|
|
config = Config(
|
|
s3=s3_config if s3_config else None,
|
|
connect_timeout=settings.s3_connect_timeout,
|
|
read_timeout=settings.s3_read_timeout,
|
|
retries={
|
|
"max_attempts": settings.s3_max_retries,
|
|
"mode": "adaptive", # Adaptive retry mode for better handling
|
|
},
|
|
)
|
|
|
|
self.client = boto3.client(
|
|
"s3",
|
|
endpoint_url=settings.s3_endpoint if settings.s3_endpoint else None,
|
|
region_name=settings.s3_region,
|
|
aws_access_key_id=settings.s3_access_key_id,
|
|
aws_secret_access_key=settings.s3_secret_access_key,
|
|
config=config,
|
|
verify=settings.s3_verify_ssl, # SSL/TLS verification
|
|
)
|
|
self.bucket = settings.s3_bucket
|
|
# Store active multipart uploads for resumable support
|
|
self._active_uploads: Dict[str, Dict[str, Any]] = {}
|
|
|
|
def store(
|
|
self, file: BinaryIO, content_length: Optional[int] = None
|
|
) -> StorageResult:
|
|
"""
|
|
Store a file and return StorageResult with all checksums.
|
|
Content-addressable: if the file already exists, just return the hash.
|
|
Uses multipart upload for files larger than MULTIPART_THRESHOLD.
|
|
"""
|
|
# For small files or unknown size, use the simple approach
|
|
if content_length is None or content_length < MULTIPART_THRESHOLD:
|
|
return self._store_simple(file)
|
|
else:
|
|
return self._store_multipart(file, content_length)
|
|
|
|
def _store_simple(self, file: BinaryIO) -> StorageResult:
|
|
"""
|
|
Store a small file using simple put_object.
|
|
|
|
Raises:
|
|
HashComputationError: If hash computation fails
|
|
FileSizeExceededError: If file exceeds maximum size
|
|
S3ExistenceCheckError: If S3 existence check fails after retries
|
|
S3UploadError: If S3 upload fails
|
|
"""
|
|
# Read file and compute all hashes with error handling
|
|
try:
|
|
content = file.read()
|
|
if not content:
|
|
raise HashComputationError("Empty file content")
|
|
|
|
size = len(content)
|
|
|
|
# Enforce file size limit (protection against Content-Length spoofing)
|
|
if size > settings.max_file_size:
|
|
raise FileSizeExceededError(
|
|
f"File size {size} exceeds maximum {settings.max_file_size}"
|
|
)
|
|
|
|
sha256_hash = hashlib.sha256(content).hexdigest()
|
|
md5_hash = hashlib.md5(content).hexdigest()
|
|
sha1_hash = hashlib.sha1(content).hexdigest()
|
|
except (HashComputationError, FileSizeExceededError):
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Hash computation failed: {e}")
|
|
raise HashComputationError(f"Failed to compute hash: {e}") from e
|
|
|
|
# Check if already exists (with retry logic)
|
|
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
|
|
s3_etag = None
|
|
|
|
try:
|
|
exists = self._exists(s3_key)
|
|
except S3ExistenceCheckError:
|
|
# Re-raise the specific error
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during S3 existence check: {e}")
|
|
raise S3ExistenceCheckError(f"Failed to check S3 existence: {e}") from e
|
|
|
|
if not exists:
|
|
try:
|
|
response = self.client.put_object(
|
|
Bucket=self.bucket,
|
|
Key=s3_key,
|
|
Body=content,
|
|
)
|
|
s3_etag = response.get("ETag", "").strip('"')
|
|
except (EndpointConnectionError, BotoConnectionError) as e:
|
|
logger.error(f"S3 storage unavailable: {e}")
|
|
raise S3StorageUnavailableError(
|
|
f"Storage backend unavailable: {e}"
|
|
) from e
|
|
except (ReadTimeoutError, ConnectTimeoutError) as e:
|
|
logger.error(f"S3 operation timed out: {e}")
|
|
raise S3UploadError(f"Upload timed out: {e}") from e
|
|
except ClientError as e:
|
|
error_code = e.response.get("Error", {}).get("Code", "")
|
|
if error_code == "ServiceUnavailable":
|
|
logger.error(f"S3 service unavailable: {e}")
|
|
raise S3StorageUnavailableError(
|
|
f"Storage service unavailable: {e}"
|
|
) from e
|
|
logger.error(f"S3 upload failed: {e}")
|
|
raise S3UploadError(f"Failed to upload to S3: {e}") from e
|
|
else:
|
|
# Get existing ETag and verify integrity (detect potential hash collision)
|
|
obj_info = self.get_object_info(s3_key)
|
|
if obj_info:
|
|
s3_etag = obj_info.get("etag", "").strip('"')
|
|
# Check for hash collision by comparing size
|
|
existing_size = obj_info.get("size", 0)
|
|
if existing_size != size:
|
|
logger.critical(
|
|
f"HASH COLLISION DETECTED! Hash {sha256_hash} has size mismatch: "
|
|
f"existing={existing_size}, new={size}. This is extremely rare."
|
|
)
|
|
raise HashCollisionError(
|
|
f"Hash collision detected for {sha256_hash}: size mismatch"
|
|
)
|
|
|
|
return StorageResult(
|
|
sha256=sha256_hash,
|
|
size=size,
|
|
s3_key=s3_key,
|
|
md5=md5_hash,
|
|
sha1=sha1_hash,
|
|
s3_etag=s3_etag,
|
|
already_existed=exists,
|
|
)
|
|
|
|
def _store_multipart(self, file: BinaryIO, content_length: int) -> StorageResult:
|
|
"""
|
|
Store a large file using S3 multipart upload with streaming hash computation.
|
|
|
|
Raises:
|
|
HashComputationError: If hash computation fails
|
|
FileSizeExceededError: If file exceeds maximum size
|
|
S3ExistenceCheckError: If S3 existence check fails after retries
|
|
S3UploadError: If S3 upload fails
|
|
"""
|
|
# First pass: compute all hashes by streaming through file
|
|
try:
|
|
import time
|
|
sha256_hasher = hashlib.sha256()
|
|
md5_hasher = hashlib.md5()
|
|
sha1_hasher = hashlib.sha1()
|
|
size = 0
|
|
hash_start_time = time.time()
|
|
last_log_time = hash_start_time
|
|
log_interval_seconds = 5 # Log progress every 5 seconds
|
|
|
|
logger.info(f"Computing hashes for large file: expected_size={content_length}")
|
|
|
|
# Read file in chunks to compute hashes
|
|
while True:
|
|
chunk = file.read(HASH_CHUNK_SIZE)
|
|
if not chunk:
|
|
break
|
|
sha256_hasher.update(chunk)
|
|
md5_hasher.update(chunk)
|
|
sha1_hasher.update(chunk)
|
|
size += len(chunk)
|
|
|
|
# Log hash computation progress periodically
|
|
current_time = time.time()
|
|
if current_time - last_log_time >= log_interval_seconds:
|
|
elapsed = current_time - hash_start_time
|
|
percent = (size / content_length) * 100 if content_length > 0 else 0
|
|
throughput = (size / (1024 * 1024)) / elapsed if elapsed > 0 else 0
|
|
logger.info(
|
|
f"Hash computation progress: bytes={size}/{content_length} ({percent:.1f}%) "
|
|
f"throughput={throughput:.2f}MB/s"
|
|
)
|
|
last_log_time = current_time
|
|
|
|
# Enforce file size limit during streaming (protection against spoofing)
|
|
if size > settings.max_file_size:
|
|
raise FileSizeExceededError(
|
|
f"File size exceeds maximum {settings.max_file_size}"
|
|
)
|
|
|
|
if size == 0:
|
|
raise HashComputationError("Empty file content")
|
|
|
|
sha256_hash = sha256_hasher.hexdigest()
|
|
md5_hash = md5_hasher.hexdigest()
|
|
sha1_hash = sha1_hasher.hexdigest()
|
|
|
|
# Log hash computation completion
|
|
hash_elapsed = time.time() - hash_start_time
|
|
hash_throughput = (size / (1024 * 1024)) / hash_elapsed if hash_elapsed > 0 else 0
|
|
logger.info(
|
|
f"Hash computation completed: hash={sha256_hash[:16]}... "
|
|
f"size={size} duration={hash_elapsed:.2f}s throughput={hash_throughput:.2f}MB/s"
|
|
)
|
|
except (HashComputationError, FileSizeExceededError):
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Hash computation failed for multipart upload: {e}")
|
|
raise HashComputationError(f"Failed to compute hash: {e}") from e
|
|
|
|
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
|
|
|
|
# Check if already exists (deduplication) with retry logic
|
|
try:
|
|
exists = self._exists(s3_key)
|
|
except S3ExistenceCheckError:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during S3 existence check: {e}")
|
|
raise S3ExistenceCheckError(f"Failed to check S3 existence: {e}") from e
|
|
|
|
if exists:
|
|
obj_info = self.get_object_info(s3_key)
|
|
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
|
|
# Check for hash collision by comparing size
|
|
if obj_info:
|
|
existing_size = obj_info.get("size", 0)
|
|
if existing_size != size:
|
|
logger.critical(
|
|
f"HASH COLLISION DETECTED! Hash {sha256_hash} has size mismatch: "
|
|
f"existing={existing_size}, new={size}. This is extremely rare."
|
|
)
|
|
raise HashCollisionError(
|
|
f"Hash collision detected for {sha256_hash}: size mismatch"
|
|
)
|
|
return StorageResult(
|
|
sha256=sha256_hash,
|
|
size=size,
|
|
s3_key=s3_key,
|
|
md5=md5_hash,
|
|
sha1=sha1_hash,
|
|
s3_etag=s3_etag,
|
|
already_existed=True,
|
|
)
|
|
|
|
# Seek back to start for upload
|
|
file.seek(0)
|
|
|
|
# Start multipart upload
|
|
try:
|
|
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
|
|
except (EndpointConnectionError, BotoConnectionError) as e:
|
|
logger.error(f"S3 storage unavailable for multipart upload: {e}")
|
|
raise S3StorageUnavailableError(f"Storage backend unavailable: {e}") from e
|
|
upload_id = mpu["UploadId"]
|
|
|
|
try:
|
|
import time
|
|
parts = []
|
|
part_number = 1
|
|
bytes_uploaded = 0
|
|
upload_start_time = time.time()
|
|
last_log_time = upload_start_time
|
|
log_interval_seconds = 5 # Log progress every 5 seconds
|
|
|
|
total_parts = (content_length + MULTIPART_CHUNK_SIZE - 1) // MULTIPART_CHUNK_SIZE
|
|
logger.info(
|
|
f"Starting multipart upload: hash={sha256_hash[:16]}... "
|
|
f"size={content_length} parts={total_parts}"
|
|
)
|
|
|
|
while True:
|
|
chunk = file.read(MULTIPART_CHUNK_SIZE)
|
|
if not chunk:
|
|
break
|
|
|
|
response = self.client.upload_part(
|
|
Bucket=self.bucket,
|
|
Key=s3_key,
|
|
UploadId=upload_id,
|
|
PartNumber=part_number,
|
|
Body=chunk,
|
|
)
|
|
parts.append(
|
|
{
|
|
"PartNumber": part_number,
|
|
"ETag": response["ETag"],
|
|
}
|
|
)
|
|
bytes_uploaded += len(chunk)
|
|
|
|
# Log progress periodically
|
|
current_time = time.time()
|
|
if current_time - last_log_time >= log_interval_seconds:
|
|
elapsed = current_time - upload_start_time
|
|
percent = (bytes_uploaded / content_length) * 100
|
|
throughput = (bytes_uploaded / (1024 * 1024)) / elapsed if elapsed > 0 else 0
|
|
logger.info(
|
|
f"Upload progress: hash={sha256_hash[:16]}... "
|
|
f"part={part_number}/{total_parts} "
|
|
f"bytes={bytes_uploaded}/{content_length} ({percent:.1f}%) "
|
|
f"throughput={throughput:.2f}MB/s"
|
|
)
|
|
last_log_time = current_time
|
|
|
|
part_number += 1
|
|
|
|
# Log completion
|
|
total_elapsed = time.time() - upload_start_time
|
|
final_throughput = (content_length / (1024 * 1024)) / total_elapsed if total_elapsed > 0 else 0
|
|
logger.info(
|
|
f"Multipart upload completed: hash={sha256_hash[:16]}... "
|
|
f"size={content_length} duration={total_elapsed:.2f}s throughput={final_throughput:.2f}MB/s"
|
|
)
|
|
|
|
# Complete multipart upload
|
|
complete_response = self.client.complete_multipart_upload(
|
|
Bucket=self.bucket,
|
|
Key=s3_key,
|
|
UploadId=upload_id,
|
|
MultipartUpload={"Parts": parts},
|
|
)
|
|
s3_etag = complete_response.get("ETag", "").strip('"')
|
|
|
|
return StorageResult(
|
|
sha256=sha256_hash,
|
|
size=size,
|
|
s3_key=s3_key,
|
|
md5=md5_hash,
|
|
sha1=sha1_hash,
|
|
s3_etag=s3_etag,
|
|
already_existed=False,
|
|
)
|
|
|
|
except Exception as e:
|
|
# Abort multipart upload on failure
|
|
error_str = str(e).lower()
|
|
is_client_disconnect = (
|
|
isinstance(e, (ConnectionResetError, BrokenPipeError)) or
|
|
"connection" in error_str or "broken pipe" in error_str or "reset" in error_str
|
|
)
|
|
if is_client_disconnect:
|
|
logger.warning(
|
|
f"Multipart upload aborted (client disconnect): hash={sha256_hash[:16]}... "
|
|
f"parts_uploaded={len(parts)} bytes_uploaded={bytes_uploaded}"
|
|
)
|
|
else:
|
|
logger.error(f"Multipart upload failed: hash={sha256_hash[:16]}... error={e}")
|
|
|
|
try:
|
|
self.client.abort_multipart_upload(
|
|
Bucket=self.bucket,
|
|
Key=s3_key,
|
|
UploadId=upload_id,
|
|
)
|
|
logger.info(f"Multipart upload aborted and cleaned up: upload_id={upload_id[:16]}...")
|
|
except Exception as abort_error:
|
|
logger.error(f"Failed to abort multipart upload: {abort_error}")
|
|
raise
|
|
|
|
def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]:
|
|
"""
|
|
Initiate a resumable upload session.
|
|
Returns upload session info including upload_id.
|
|
"""
|
|
s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
|
|
|
|
# Check if already exists
|
|
if self._exists(s3_key):
|
|
return {
|
|
"upload_id": None,
|
|
"s3_key": s3_key,
|
|
"already_exists": True,
|
|
"parts": [],
|
|
}
|
|
|
|
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
|
|
upload_id = mpu["UploadId"]
|
|
|
|
import time
|
|
session = {
|
|
"upload_id": upload_id,
|
|
"s3_key": s3_key,
|
|
"already_exists": False,
|
|
"parts": [],
|
|
"expected_hash": expected_hash,
|
|
"started_at": time.time(),
|
|
"bytes_uploaded": 0,
|
|
"expected_size": None, # Set when init provides size
|
|
"status": "in_progress",
|
|
}
|
|
self._active_uploads[upload_id] = session
|
|
return session
|
|
|
|
def upload_part(
|
|
self, upload_id: str, part_number: int, data: bytes
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Upload a part for a resumable upload.
|
|
Returns part info including ETag.
|
|
"""
|
|
session = self._active_uploads.get(upload_id)
|
|
if not session:
|
|
raise ValueError(f"Unknown upload session: {upload_id}")
|
|
|
|
response = self.client.upload_part(
|
|
Bucket=self.bucket,
|
|
Key=session["s3_key"],
|
|
UploadId=upload_id,
|
|
PartNumber=part_number,
|
|
Body=data,
|
|
)
|
|
|
|
part_info = {
|
|
"PartNumber": part_number,
|
|
"ETag": response["ETag"],
|
|
"size": len(data),
|
|
}
|
|
session["parts"].append(part_info)
|
|
session["bytes_uploaded"] = session.get("bytes_uploaded", 0) + len(data)
|
|
return part_info
|
|
|
|
def get_upload_progress(self, upload_id: str) -> Optional[Dict[str, Any]]:
|
|
"""
|
|
Get progress information for a resumable upload.
|
|
Returns None if upload not found.
|
|
"""
|
|
import time
|
|
session = self._active_uploads.get(upload_id)
|
|
if not session:
|
|
return None
|
|
|
|
bytes_uploaded = session.get("bytes_uploaded", 0)
|
|
expected_size = session.get("expected_size")
|
|
started_at = session.get("started_at")
|
|
|
|
progress = {
|
|
"upload_id": upload_id,
|
|
"status": session.get("status", "in_progress"),
|
|
"bytes_uploaded": bytes_uploaded,
|
|
"bytes_total": expected_size,
|
|
"parts_uploaded": len(session.get("parts", [])),
|
|
"parts_total": None,
|
|
"started_at": started_at,
|
|
"elapsed_seconds": None,
|
|
"percent_complete": None,
|
|
"throughput_mbps": None,
|
|
}
|
|
|
|
if expected_size and expected_size > 0:
|
|
progress["percent_complete"] = round((bytes_uploaded / expected_size) * 100, 2)
|
|
progress["parts_total"] = (expected_size + MULTIPART_CHUNK_SIZE - 1) // MULTIPART_CHUNK_SIZE
|
|
|
|
if started_at:
|
|
elapsed = time.time() - started_at
|
|
progress["elapsed_seconds"] = round(elapsed, 2)
|
|
if elapsed > 0 and bytes_uploaded > 0:
|
|
progress["throughput_mbps"] = round((bytes_uploaded / (1024 * 1024)) / elapsed, 2)
|
|
|
|
return progress
|
|
|
|
def set_upload_expected_size(self, upload_id: str, size: int):
|
|
"""Set the expected size for an upload (for progress tracking)."""
|
|
session = self._active_uploads.get(upload_id)
|
|
if session:
|
|
session["expected_size"] = size
|
|
|
|
def complete_resumable_upload(self, upload_id: str) -> Tuple[str, str]:
|
|
"""
|
|
Complete a resumable upload.
|
|
Returns (sha256_hash, s3_key).
|
|
"""
|
|
session = self._active_uploads.get(upload_id)
|
|
if not session:
|
|
raise ValueError(f"Unknown upload session: {upload_id}")
|
|
|
|
# Sort parts by part number
|
|
sorted_parts = sorted(session["parts"], key=lambda x: x["PartNumber"])
|
|
|
|
self.client.complete_multipart_upload(
|
|
Bucket=self.bucket,
|
|
Key=session["s3_key"],
|
|
UploadId=upload_id,
|
|
MultipartUpload={"Parts": sorted_parts},
|
|
)
|
|
|
|
# Clean up session
|
|
del self._active_uploads[upload_id]
|
|
|
|
return session["expected_hash"], session["s3_key"]
|
|
|
|
def abort_resumable_upload(self, upload_id: str):
|
|
"""Abort a resumable upload"""
|
|
session = self._active_uploads.get(upload_id)
|
|
if session:
|
|
self.client.abort_multipart_upload(
|
|
Bucket=self.bucket,
|
|
Key=session["s3_key"],
|
|
UploadId=upload_id,
|
|
)
|
|
del self._active_uploads[upload_id]
|
|
|
|
def list_upload_parts(self, upload_id: str) -> list:
|
|
"""List uploaded parts for a resumable upload (for resume support)"""
|
|
session = self._active_uploads.get(upload_id)
|
|
if not session:
|
|
raise ValueError(f"Unknown upload session: {upload_id}")
|
|
|
|
response = self.client.list_parts(
|
|
Bucket=self.bucket,
|
|
Key=session["s3_key"],
|
|
UploadId=upload_id,
|
|
)
|
|
return response.get("Parts", [])
|
|
|
|
def get(self, s3_key: str) -> bytes:
|
|
"""Retrieve a file by its S3 key"""
|
|
response = self.client.get_object(Bucket=self.bucket, Key=s3_key)
|
|
return response["Body"].read()
|
|
|
|
def get_stream(self, s3_key: str, range_header: Optional[str] = None):
|
|
"""
|
|
Get a streaming response for a file.
|
|
Supports range requests for partial downloads.
|
|
Returns (stream, content_length, content_range, accept_ranges)
|
|
"""
|
|
kwargs = {"Bucket": self.bucket, "Key": s3_key}
|
|
|
|
if range_header:
|
|
kwargs["Range"] = range_header
|
|
|
|
response = self.client.get_object(**kwargs)
|
|
|
|
content_length = response.get("ContentLength", 0)
|
|
content_range = response.get("ContentRange")
|
|
|
|
return response["Body"], content_length, content_range
|
|
|
|
def get_object_info(self, s3_key: str) -> Dict[str, Any]:
|
|
"""Get object metadata without downloading content"""
|
|
try:
|
|
response = self.client.head_object(Bucket=self.bucket, Key=s3_key)
|
|
return {
|
|
"size": response.get("ContentLength", 0),
|
|
"content_type": response.get("ContentType"),
|
|
"last_modified": response.get("LastModified"),
|
|
"etag": response.get("ETag"),
|
|
}
|
|
except ClientError:
|
|
return None
|
|
|
|
def _exists(self, s3_key: str, retry: bool = True) -> bool:
|
|
"""
|
|
Check if an object exists with optional retry logic.
|
|
|
|
Args:
|
|
s3_key: The S3 key to check
|
|
retry: Whether to retry on transient failures (default: True)
|
|
|
|
Returns:
|
|
True if object exists, False otherwise
|
|
|
|
Raises:
|
|
S3ExistenceCheckError: If all retries fail due to non-404 errors
|
|
"""
|
|
import time
|
|
|
|
max_retries = MAX_EXISTENCE_CHECK_RETRIES if retry else 1
|
|
last_error = None
|
|
|
|
for attempt in range(max_retries):
|
|
try:
|
|
self.client.head_object(Bucket=self.bucket, Key=s3_key)
|
|
return True
|
|
except ClientError as e:
|
|
error_code = e.response.get("Error", {}).get("Code", "")
|
|
# 404 means object doesn't exist - not an error
|
|
if error_code in ("404", "NoSuchKey"):
|
|
return False
|
|
|
|
# For other errors, retry
|
|
last_error = e
|
|
if attempt < max_retries - 1:
|
|
logger.warning(
|
|
f"S3 existence check failed (attempt {attempt + 1}/{max_retries}): {e}"
|
|
)
|
|
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
|
|
|
|
# All retries failed
|
|
logger.error(
|
|
f"S3 existence check failed after {max_retries} attempts: {last_error}"
|
|
)
|
|
raise S3ExistenceCheckError(
|
|
f"Failed to check S3 object existence after {max_retries} attempts: {last_error}"
|
|
)
|
|
|
|
def delete(self, s3_key: str) -> bool:
|
|
"""Delete an object"""
|
|
try:
|
|
self.client.delete_object(Bucket=self.bucket, Key=s3_key)
|
|
return True
|
|
except ClientError:
|
|
return False
|
|
|
|
def generate_presigned_url(
|
|
self,
|
|
s3_key: str,
|
|
expiry: Optional[int] = None,
|
|
response_content_type: Optional[str] = None,
|
|
response_content_disposition: Optional[str] = None,
|
|
) -> str:
|
|
"""
|
|
Generate a presigned URL for downloading an object.
|
|
|
|
Args:
|
|
s3_key: The S3 key of the object
|
|
expiry: URL expiry in seconds (defaults to settings.presigned_url_expiry)
|
|
response_content_type: Override Content-Type header in response
|
|
response_content_disposition: Override Content-Disposition header in response
|
|
|
|
Returns:
|
|
Presigned URL string
|
|
"""
|
|
if expiry is None:
|
|
expiry = settings.presigned_url_expiry
|
|
|
|
params = {
|
|
"Bucket": self.bucket,
|
|
"Key": s3_key,
|
|
}
|
|
|
|
# Add response header overrides if specified
|
|
if response_content_type:
|
|
params["ResponseContentType"] = response_content_type
|
|
if response_content_disposition:
|
|
params["ResponseContentDisposition"] = response_content_disposition
|
|
|
|
url = self.client.generate_presigned_url(
|
|
"get_object",
|
|
Params=params,
|
|
ExpiresIn=expiry,
|
|
)
|
|
return url
|
|
|
|
def health_check(self) -> bool:
|
|
"""
|
|
Check if the storage backend is healthy and accessible.
|
|
|
|
Performs a lightweight HEAD request on the bucket to verify connectivity.
|
|
|
|
Returns:
|
|
True if healthy, False otherwise
|
|
"""
|
|
try:
|
|
self.client.head_bucket(Bucket=self.bucket)
|
|
return True
|
|
except ClientError as e:
|
|
logger.warning(f"Storage health check failed: {e}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Unexpected error during storage health check: {e}")
|
|
return False
|
|
|
|
def get_verified(self, s3_key: str, expected_hash: str) -> bytes:
|
|
"""
|
|
Download and verify content matches expected SHA256 hash.
|
|
|
|
This method downloads the entire content, computes its hash, and
|
|
verifies it matches the expected hash before returning.
|
|
|
|
Args:
|
|
s3_key: The S3 storage key of the file
|
|
expected_hash: Expected SHA256 hash (64 hex characters)
|
|
|
|
Returns:
|
|
File content as bytes (only if verification passes)
|
|
|
|
Raises:
|
|
ChecksumMismatchError: If computed hash doesn't match expected
|
|
ClientError: If S3 operation fails
|
|
"""
|
|
if not is_valid_sha256(expected_hash):
|
|
raise ValueError(f"Invalid SHA256 hash format: {expected_hash}")
|
|
|
|
content = self.get(s3_key)
|
|
actual_hash = compute_sha256(content)
|
|
|
|
if actual_hash != expected_hash.lower():
|
|
raise ChecksumMismatchError(
|
|
expected=expected_hash.lower(),
|
|
actual=actual_hash,
|
|
s3_key=s3_key,
|
|
size=len(content),
|
|
)
|
|
|
|
logger.debug(f"Verification passed for {s3_key}: {actual_hash[:16]}...")
|
|
return content
|
|
|
|
def get_stream_verified(
|
|
self,
|
|
s3_key: str,
|
|
expected_hash: str,
|
|
range_header: Optional[str] = None,
|
|
) -> Tuple[VerifyingStreamWrapper, int, Optional[str]]:
|
|
"""
|
|
Get a verifying stream wrapper for an object.
|
|
|
|
Returns a wrapper that computes the hash as chunks are read and
|
|
can verify after streaming completes. Note that verification happens
|
|
AFTER content has been streamed to the client.
|
|
|
|
IMPORTANT: For range requests, verification is not supported because
|
|
we cannot verify a partial download against the full file hash.
|
|
|
|
Args:
|
|
s3_key: The S3 storage key of the file
|
|
expected_hash: Expected SHA256 hash (64 hex characters)
|
|
range_header: Optional HTTP Range header (verification disabled if set)
|
|
|
|
Returns:
|
|
Tuple of (VerifyingStreamWrapper, content_length, content_range)
|
|
The wrapper has a verify() method to call after streaming.
|
|
|
|
Raises:
|
|
ValueError: If expected_hash is invalid format
|
|
ClientError: If S3 operation fails
|
|
"""
|
|
if not is_valid_sha256(expected_hash):
|
|
raise ValueError(f"Invalid SHA256 hash format: {expected_hash}")
|
|
|
|
# Get the S3 stream
|
|
stream, content_length, content_range = self.get_stream(s3_key, range_header)
|
|
|
|
# For range requests, we cannot verify (partial content)
|
|
# Return a HashingStreamWrapper that just tracks bytes without verification
|
|
if range_header or content_range:
|
|
logger.debug(
|
|
f"Range request for {s3_key} - verification disabled (partial content)"
|
|
)
|
|
# Return a basic hashing wrapper (caller should not verify)
|
|
hashing_wrapper = HashingStreamWrapper(stream)
|
|
return hashing_wrapper, content_length, content_range
|
|
|
|
# Create verifying wrapper
|
|
verifying_wrapper = VerifyingStreamWrapper(
|
|
stream=stream,
|
|
expected_hash=expected_hash,
|
|
s3_key=s3_key,
|
|
)
|
|
|
|
return verifying_wrapper, content_length, content_range
|
|
|
|
def verify_integrity(self, s3_key: str, expected_sha256: str) -> bool:
|
|
"""
|
|
Verify the integrity of a stored object by downloading and re-hashing.
|
|
|
|
This is an expensive operation and should only be used for critical
|
|
verification scenarios.
|
|
|
|
Args:
|
|
s3_key: The storage key of the file
|
|
expected_sha256: The expected SHA256 hash
|
|
|
|
Returns:
|
|
True if hash matches, False otherwise
|
|
"""
|
|
try:
|
|
content = self.get(s3_key)
|
|
actual_hash = hashlib.sha256(content).hexdigest()
|
|
if actual_hash != expected_sha256:
|
|
logger.error(
|
|
f"Integrity verification failed for {s3_key}: "
|
|
f"expected {expected_sha256[:12]}..., got {actual_hash[:12]}..."
|
|
)
|
|
return False
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error during integrity verification for {s3_key}: {e}")
|
|
return False
|
|
|
|
|
|
# Singleton instance
|
|
_storage: Optional[S3Storage] = None
|
|
|
|
|
|
def get_storage() -> StorageBackend:
|
|
"""
|
|
Get the configured storage backend instance.
|
|
|
|
Currently returns S3Storage (works with S3-compatible backends like MinIO).
|
|
Future implementations may support backend selection via configuration.
|
|
|
|
Returns:
|
|
StorageBackend instance
|
|
"""
|
|
global _storage
|
|
if _storage is None:
|
|
_storage = S3Storage()
|
|
return _storage
|