import hashlib import logging from typing import ( BinaryIO, Tuple, Optional, Dict, Any, NamedTuple, Protocol, runtime_checkable, ) import boto3 from botocore.config import Config from botocore.exceptions import ( ClientError, ConnectionError as BotoConnectionError, EndpointConnectionError, ReadTimeoutError, ConnectTimeoutError, ) from .config import get_settings from .checksum import ( ChecksumMismatchError, HashingStreamWrapper, VerifyingStreamWrapper, compute_sha256, is_valid_sha256, ) settings = get_settings() logger = logging.getLogger(__name__) # ============================================================================= # Storage Backend Protocol/Interface (ISSUE 33) # ============================================================================= @runtime_checkable class StorageBackend(Protocol): """ Abstract protocol defining the interface for storage backends. All storage implementations (S3, MinIO, future backends) must implement this interface to ensure consistent behavior across the application. Note on Deduplication: - This system uses whole-file deduplication based on SHA256 hash - Partial/chunk-level deduplication is NOT supported (out of scope for MVP) - Files with identical content but different metadata are deduplicated """ def store( self, file: BinaryIO, content_length: Optional[int] = None ) -> "StorageResult": """ Store a file and return StorageResult with all checksums. Content-addressable: if the file already exists (by hash), just return the existing hash without uploading again. Args: file: File-like object to store content_length: Optional hint for file size (enables multipart upload) Returns: StorageResult with sha256, size, s3_key, and optional checksums Raises: HashComputationError: If hash computation fails S3ExistenceCheckError: If existence check fails after retries S3UploadError: If upload fails """ ... def get(self, s3_key: str) -> bytes: """ Retrieve a file by its storage key. Args: s3_key: The storage key (path) of the file Returns: File content as bytes """ ... def get_stream( self, s3_key: str, range_header: Optional[str] = None ) -> Tuple[Any, int, Optional[str]]: """ Get a streaming response for a file. Supports range requests for partial downloads. Args: s3_key: The storage key of the file range_header: Optional HTTP Range header value Returns: Tuple of (stream, content_length, content_range) """ ... def delete(self, s3_key: str) -> bool: """ Delete a file from storage. Args: s3_key: The storage key of the file to delete Returns: True if deleted successfully, False otherwise """ ... def get_object_info(self, s3_key: str) -> Optional[Dict[str, Any]]: """ Get object metadata without downloading content. Args: s3_key: The storage key of the file Returns: Dict with size, content_type, last_modified, etag, or None if not found """ ... def generate_presigned_url( self, s3_key: str, expiry: Optional[int] = None, response_content_type: Optional[str] = None, response_content_disposition: Optional[str] = None, ) -> str: """ Generate a presigned URL for downloading an object. Args: s3_key: The storage key of the file expiry: URL expiry in seconds response_content_type: Override Content-Type header in response response_content_disposition: Override Content-Disposition header Returns: Presigned URL string """ ... def health_check(self) -> bool: """ Check if the storage backend is healthy and accessible. Returns: True if healthy, False otherwise """ ... # Threshold for multipart upload (100MB) MULTIPART_THRESHOLD = 100 * 1024 * 1024 # Chunk size for multipart upload (10MB) MULTIPART_CHUNK_SIZE = 10 * 1024 * 1024 # Chunk size for streaming hash computation HASH_CHUNK_SIZE = 8 * 1024 * 1024 # Maximum retries for S3 existence check MAX_EXISTENCE_CHECK_RETRIES = 3 class StorageError(Exception): """Base exception for storage operations""" pass class HashComputationError(StorageError): """Raised when hash computation fails""" pass class FileSizeExceededError(StorageError): """Raised when file exceeds maximum size during upload""" pass class S3ExistenceCheckError(StorageError): """Raised when S3 existence check fails after retries""" pass class S3UploadError(StorageError): """Raised when S3 upload fails""" pass class StorageResult(NamedTuple): """Result of storing a file with all computed checksums""" sha256: str size: int s3_key: str md5: Optional[str] = None sha1: Optional[str] = None s3_etag: Optional[str] = None already_existed: bool = ( False # True if artifact was deduplicated (S3 object already existed) ) class S3StorageUnavailableError(StorageError): """Raised when S3 storage backend is unavailable""" pass class HashCollisionError(StorageError): """Raised when a hash collision is detected (extremely rare)""" pass class S3Storage: def __init__(self): # Build config with retry and timeout settings s3_config = {} if settings.s3_use_path_style: s3_config["addressing_style"] = "path" config = Config( s3=s3_config if s3_config else None, connect_timeout=settings.s3_connect_timeout, read_timeout=settings.s3_read_timeout, retries={ "max_attempts": settings.s3_max_retries, "mode": "adaptive", # Adaptive retry mode for better handling }, ) # Build client kwargs - only include credentials if explicitly provided # This allows IRSA/IAM role credentials to be used when no explicit creds are set client_kwargs = { "endpoint_url": settings.s3_endpoint if settings.s3_endpoint else None, "region_name": settings.s3_region, "config": config, "verify": settings.s3_verify_ssl, } if settings.s3_access_key_id and settings.s3_secret_access_key: client_kwargs["aws_access_key_id"] = settings.s3_access_key_id client_kwargs["aws_secret_access_key"] = settings.s3_secret_access_key self.client = boto3.client("s3", **client_kwargs) self.bucket = settings.s3_bucket # Store active multipart uploads for resumable support self._active_uploads: Dict[str, Dict[str, Any]] = {} def store( self, file: BinaryIO, content_length: Optional[int] = None ) -> StorageResult: """ Store a file and return StorageResult with all checksums. Content-addressable: if the file already exists, just return the hash. Uses multipart upload for files larger than MULTIPART_THRESHOLD. """ # For small files or unknown size, use the simple approach if content_length is None or content_length < MULTIPART_THRESHOLD: return self._store_simple(file) else: return self._store_multipart(file, content_length) def _store_simple(self, file: BinaryIO) -> StorageResult: """ Store a small file using simple put_object. Raises: HashComputationError: If hash computation fails FileSizeExceededError: If file exceeds maximum size S3ExistenceCheckError: If S3 existence check fails after retries S3UploadError: If S3 upload fails """ # Read file and compute all hashes with error handling try: content = file.read() if not content: raise HashComputationError("Empty file content") size = len(content) # Enforce file size limit (protection against Content-Length spoofing) if size > settings.max_file_size: raise FileSizeExceededError( f"File size {size} exceeds maximum {settings.max_file_size}" ) sha256_hash = hashlib.sha256(content).hexdigest() md5_hash = hashlib.md5(content).hexdigest() sha1_hash = hashlib.sha1(content).hexdigest() except (HashComputationError, FileSizeExceededError): raise except Exception as e: logger.error(f"Hash computation failed: {e}") raise HashComputationError(f"Failed to compute hash: {e}") from e # Check if already exists (with retry logic) s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}" s3_etag = None try: exists = self._exists(s3_key) except S3ExistenceCheckError: # Re-raise the specific error raise except Exception as e: logger.error(f"Unexpected error during S3 existence check: {e}") raise S3ExistenceCheckError(f"Failed to check S3 existence: {e}") from e if not exists: try: response = self.client.put_object( Bucket=self.bucket, Key=s3_key, Body=content, ) s3_etag = response.get("ETag", "").strip('"') except (EndpointConnectionError, BotoConnectionError) as e: logger.error(f"S3 storage unavailable: {e}") raise S3StorageUnavailableError( f"Storage backend unavailable: {e}" ) from e except (ReadTimeoutError, ConnectTimeoutError) as e: logger.error(f"S3 operation timed out: {e}") raise S3UploadError(f"Upload timed out: {e}") from e except ClientError as e: error_code = e.response.get("Error", {}).get("Code", "") if error_code == "ServiceUnavailable": logger.error(f"S3 service unavailable: {e}") raise S3StorageUnavailableError( f"Storage service unavailable: {e}" ) from e logger.error(f"S3 upload failed: {e}") raise S3UploadError(f"Failed to upload to S3: {e}") from e else: # Get existing ETag and verify integrity (detect potential hash collision) obj_info = self.get_object_info(s3_key) if obj_info: s3_etag = obj_info.get("etag", "").strip('"') # Check for hash collision by comparing size existing_size = obj_info.get("size", 0) if existing_size != size: logger.critical( f"HASH COLLISION DETECTED! Hash {sha256_hash} has size mismatch: " f"existing={existing_size}, new={size}. This is extremely rare." ) raise HashCollisionError( f"Hash collision detected for {sha256_hash}: size mismatch" ) return StorageResult( sha256=sha256_hash, size=size, s3_key=s3_key, md5=md5_hash, sha1=sha1_hash, s3_etag=s3_etag, already_existed=exists, ) def _store_multipart(self, file: BinaryIO, content_length: int) -> StorageResult: """ Store a large file using S3 multipart upload with streaming hash computation. Raises: HashComputationError: If hash computation fails FileSizeExceededError: If file exceeds maximum size S3ExistenceCheckError: If S3 existence check fails after retries S3UploadError: If S3 upload fails """ # First pass: compute all hashes by streaming through file try: import time sha256_hasher = hashlib.sha256() md5_hasher = hashlib.md5() sha1_hasher = hashlib.sha1() size = 0 hash_start_time = time.time() last_log_time = hash_start_time log_interval_seconds = 5 # Log progress every 5 seconds logger.info(f"Computing hashes for large file: expected_size={content_length}") # Read file in chunks to compute hashes while True: chunk = file.read(HASH_CHUNK_SIZE) if not chunk: break sha256_hasher.update(chunk) md5_hasher.update(chunk) sha1_hasher.update(chunk) size += len(chunk) # Log hash computation progress periodically current_time = time.time() if current_time - last_log_time >= log_interval_seconds: elapsed = current_time - hash_start_time percent = (size / content_length) * 100 if content_length > 0 else 0 throughput = (size / (1024 * 1024)) / elapsed if elapsed > 0 else 0 logger.info( f"Hash computation progress: bytes={size}/{content_length} ({percent:.1f}%) " f"throughput={throughput:.2f}MB/s" ) last_log_time = current_time # Enforce file size limit during streaming (protection against spoofing) if size > settings.max_file_size: raise FileSizeExceededError( f"File size exceeds maximum {settings.max_file_size}" ) if size == 0: raise HashComputationError("Empty file content") sha256_hash = sha256_hasher.hexdigest() md5_hash = md5_hasher.hexdigest() sha1_hash = sha1_hasher.hexdigest() # Log hash computation completion hash_elapsed = time.time() - hash_start_time hash_throughput = (size / (1024 * 1024)) / hash_elapsed if hash_elapsed > 0 else 0 logger.info( f"Hash computation completed: hash={sha256_hash[:16]}... " f"size={size} duration={hash_elapsed:.2f}s throughput={hash_throughput:.2f}MB/s" ) except (HashComputationError, FileSizeExceededError): raise except Exception as e: logger.error(f"Hash computation failed for multipart upload: {e}") raise HashComputationError(f"Failed to compute hash: {e}") from e s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}" # Check if already exists (deduplication) with retry logic try: exists = self._exists(s3_key) except S3ExistenceCheckError: raise except Exception as e: logger.error(f"Unexpected error during S3 existence check: {e}") raise S3ExistenceCheckError(f"Failed to check S3 existence: {e}") from e if exists: obj_info = self.get_object_info(s3_key) s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None # Check for hash collision by comparing size if obj_info: existing_size = obj_info.get("size", 0) if existing_size != size: logger.critical( f"HASH COLLISION DETECTED! Hash {sha256_hash} has size mismatch: " f"existing={existing_size}, new={size}. This is extremely rare." ) raise HashCollisionError( f"Hash collision detected for {sha256_hash}: size mismatch" ) return StorageResult( sha256=sha256_hash, size=size, s3_key=s3_key, md5=md5_hash, sha1=sha1_hash, s3_etag=s3_etag, already_existed=True, ) # Seek back to start for upload file.seek(0) # Start multipart upload try: mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key) except (EndpointConnectionError, BotoConnectionError) as e: logger.error(f"S3 storage unavailable for multipart upload: {e}") raise S3StorageUnavailableError(f"Storage backend unavailable: {e}") from e upload_id = mpu["UploadId"] try: import time parts = [] part_number = 1 bytes_uploaded = 0 upload_start_time = time.time() last_log_time = upload_start_time log_interval_seconds = 5 # Log progress every 5 seconds total_parts = (content_length + MULTIPART_CHUNK_SIZE - 1) // MULTIPART_CHUNK_SIZE logger.info( f"Starting multipart upload: hash={sha256_hash[:16]}... " f"size={content_length} parts={total_parts}" ) while True: chunk = file.read(MULTIPART_CHUNK_SIZE) if not chunk: break response = self.client.upload_part( Bucket=self.bucket, Key=s3_key, UploadId=upload_id, PartNumber=part_number, Body=chunk, ) parts.append( { "PartNumber": part_number, "ETag": response["ETag"], } ) bytes_uploaded += len(chunk) # Log progress periodically current_time = time.time() if current_time - last_log_time >= log_interval_seconds: elapsed = current_time - upload_start_time percent = (bytes_uploaded / content_length) * 100 throughput = (bytes_uploaded / (1024 * 1024)) / elapsed if elapsed > 0 else 0 logger.info( f"Upload progress: hash={sha256_hash[:16]}... " f"part={part_number}/{total_parts} " f"bytes={bytes_uploaded}/{content_length} ({percent:.1f}%) " f"throughput={throughput:.2f}MB/s" ) last_log_time = current_time part_number += 1 # Log completion total_elapsed = time.time() - upload_start_time final_throughput = (content_length / (1024 * 1024)) / total_elapsed if total_elapsed > 0 else 0 logger.info( f"Multipart upload completed: hash={sha256_hash[:16]}... " f"size={content_length} duration={total_elapsed:.2f}s throughput={final_throughput:.2f}MB/s" ) # Complete multipart upload complete_response = self.client.complete_multipart_upload( Bucket=self.bucket, Key=s3_key, UploadId=upload_id, MultipartUpload={"Parts": parts}, ) s3_etag = complete_response.get("ETag", "").strip('"') return StorageResult( sha256=sha256_hash, size=size, s3_key=s3_key, md5=md5_hash, sha1=sha1_hash, s3_etag=s3_etag, already_existed=False, ) except Exception as e: # Abort multipart upload on failure error_str = str(e).lower() is_client_disconnect = ( isinstance(e, (ConnectionResetError, BrokenPipeError)) or "connection" in error_str or "broken pipe" in error_str or "reset" in error_str ) if is_client_disconnect: logger.warning( f"Multipart upload aborted (client disconnect): hash={sha256_hash[:16]}... " f"parts_uploaded={len(parts)} bytes_uploaded={bytes_uploaded}" ) else: logger.error(f"Multipart upload failed: hash={sha256_hash[:16]}... error={e}") try: self.client.abort_multipart_upload( Bucket=self.bucket, Key=s3_key, UploadId=upload_id, ) logger.info(f"Multipart upload aborted and cleaned up: upload_id={upload_id[:16]}...") except Exception as abort_error: logger.error(f"Failed to abort multipart upload: {abort_error}") raise def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]: """ Initiate a resumable upload session. Returns upload session info including upload_id. """ s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" # Check if already exists if self._exists(s3_key): return { "upload_id": None, "s3_key": s3_key, "already_exists": True, "parts": [], } mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key) upload_id = mpu["UploadId"] import time session = { "upload_id": upload_id, "s3_key": s3_key, "already_exists": False, "parts": [], "expected_hash": expected_hash, "started_at": time.time(), "bytes_uploaded": 0, "expected_size": None, # Set when init provides size "status": "in_progress", } self._active_uploads[upload_id] = session return session def upload_part( self, upload_id: str, part_number: int, data: bytes ) -> Dict[str, Any]: """ Upload a part for a resumable upload. Returns part info including ETag. """ session = self._active_uploads.get(upload_id) if not session: raise ValueError(f"Unknown upload session: {upload_id}") response = self.client.upload_part( Bucket=self.bucket, Key=session["s3_key"], UploadId=upload_id, PartNumber=part_number, Body=data, ) part_info = { "PartNumber": part_number, "ETag": response["ETag"], "size": len(data), } session["parts"].append(part_info) session["bytes_uploaded"] = session.get("bytes_uploaded", 0) + len(data) return part_info def get_upload_progress(self, upload_id: str) -> Optional[Dict[str, Any]]: """ Get progress information for a resumable upload. Returns None if upload not found. """ import time session = self._active_uploads.get(upload_id) if not session: return None bytes_uploaded = session.get("bytes_uploaded", 0) expected_size = session.get("expected_size") started_at = session.get("started_at") progress = { "upload_id": upload_id, "status": session.get("status", "in_progress"), "bytes_uploaded": bytes_uploaded, "bytes_total": expected_size, "parts_uploaded": len(session.get("parts", [])), "parts_total": None, "started_at": started_at, "elapsed_seconds": None, "percent_complete": None, "throughput_mbps": None, } if expected_size and expected_size > 0: progress["percent_complete"] = round((bytes_uploaded / expected_size) * 100, 2) progress["parts_total"] = (expected_size + MULTIPART_CHUNK_SIZE - 1) // MULTIPART_CHUNK_SIZE if started_at: elapsed = time.time() - started_at progress["elapsed_seconds"] = round(elapsed, 2) if elapsed > 0 and bytes_uploaded > 0: progress["throughput_mbps"] = round((bytes_uploaded / (1024 * 1024)) / elapsed, 2) return progress def set_upload_expected_size(self, upload_id: str, size: int): """Set the expected size for an upload (for progress tracking).""" session = self._active_uploads.get(upload_id) if session: session["expected_size"] = size def complete_resumable_upload(self, upload_id: str) -> Tuple[str, str]: """ Complete a resumable upload. Returns (sha256_hash, s3_key). """ session = self._active_uploads.get(upload_id) if not session: raise ValueError(f"Unknown upload session: {upload_id}") # Sort parts by part number sorted_parts = sorted(session["parts"], key=lambda x: x["PartNumber"]) self.client.complete_multipart_upload( Bucket=self.bucket, Key=session["s3_key"], UploadId=upload_id, MultipartUpload={"Parts": sorted_parts}, ) # Clean up session del self._active_uploads[upload_id] return session["expected_hash"], session["s3_key"] def abort_resumable_upload(self, upload_id: str): """Abort a resumable upload""" session = self._active_uploads.get(upload_id) if session: self.client.abort_multipart_upload( Bucket=self.bucket, Key=session["s3_key"], UploadId=upload_id, ) del self._active_uploads[upload_id] def list_upload_parts(self, upload_id: str) -> list: """List uploaded parts for a resumable upload (for resume support)""" session = self._active_uploads.get(upload_id) if not session: raise ValueError(f"Unknown upload session: {upload_id}") response = self.client.list_parts( Bucket=self.bucket, Key=session["s3_key"], UploadId=upload_id, ) return response.get("Parts", []) def get(self, s3_key: str) -> bytes: """Retrieve a file by its S3 key""" response = self.client.get_object(Bucket=self.bucket, Key=s3_key) return response["Body"].read() def get_stream(self, s3_key: str, range_header: Optional[str] = None): """ Get a streaming response for a file. Supports range requests for partial downloads. Returns (stream, content_length, content_range, accept_ranges) """ kwargs = {"Bucket": self.bucket, "Key": s3_key} if range_header: kwargs["Range"] = range_header response = self.client.get_object(**kwargs) content_length = response.get("ContentLength", 0) content_range = response.get("ContentRange") return response["Body"], content_length, content_range def get_object_info(self, s3_key: str) -> Dict[str, Any]: """Get object metadata without downloading content""" try: response = self.client.head_object(Bucket=self.bucket, Key=s3_key) return { "size": response.get("ContentLength", 0), "content_type": response.get("ContentType"), "last_modified": response.get("LastModified"), "etag": response.get("ETag"), } except ClientError: return None def _exists(self, s3_key: str, retry: bool = True) -> bool: """ Check if an object exists with optional retry logic. Args: s3_key: The S3 key to check retry: Whether to retry on transient failures (default: True) Returns: True if object exists, False otherwise Raises: S3ExistenceCheckError: If all retries fail due to non-404 errors """ import time max_retries = MAX_EXISTENCE_CHECK_RETRIES if retry else 1 last_error = None for attempt in range(max_retries): try: self.client.head_object(Bucket=self.bucket, Key=s3_key) return True except ClientError as e: error_code = e.response.get("Error", {}).get("Code", "") # 404 means object doesn't exist - not an error if error_code in ("404", "NoSuchKey"): return False # For other errors, retry last_error = e if attempt < max_retries - 1: logger.warning( f"S3 existence check failed (attempt {attempt + 1}/{max_retries}): {e}" ) time.sleep(0.1 * (attempt + 1)) # Exponential backoff # All retries failed logger.error( f"S3 existence check failed after {max_retries} attempts: {last_error}" ) raise S3ExistenceCheckError( f"Failed to check S3 object existence after {max_retries} attempts: {last_error}" ) def delete(self, s3_key: str) -> bool: """Delete an object""" try: self.client.delete_object(Bucket=self.bucket, Key=s3_key) return True except ClientError: return False def delete_all(self) -> int: """ Delete all objects in the bucket. Returns: Number of objects deleted """ deleted_count = 0 try: paginator = self.client.get_paginator("list_objects_v2") for page in paginator.paginate(Bucket=self.bucket): objects = page.get("Contents", []) if not objects: continue # Delete objects in batches of 1000 (S3 limit) delete_keys = [{"Key": obj["Key"]} for obj in objects] if delete_keys: self.client.delete_objects( Bucket=self.bucket, Delete={"Objects": delete_keys} ) deleted_count += len(delete_keys) logger.info(f"Deleted {len(delete_keys)} objects from S3") logger.info(f"Total objects deleted from S3: {deleted_count}") return deleted_count except ClientError as e: logger.error(f"Failed to delete all S3 objects: {e}") raise def generate_presigned_url( self, s3_key: str, expiry: Optional[int] = None, response_content_type: Optional[str] = None, response_content_disposition: Optional[str] = None, ) -> str: """ Generate a presigned URL for downloading an object. Args: s3_key: The S3 key of the object expiry: URL expiry in seconds (defaults to settings.presigned_url_expiry) response_content_type: Override Content-Type header in response response_content_disposition: Override Content-Disposition header in response Returns: Presigned URL string """ if expiry is None: expiry = settings.presigned_url_expiry params = { "Bucket": self.bucket, "Key": s3_key, } # Add response header overrides if specified if response_content_type: params["ResponseContentType"] = response_content_type if response_content_disposition: params["ResponseContentDisposition"] = response_content_disposition url = self.client.generate_presigned_url( "get_object", Params=params, ExpiresIn=expiry, ) return url def health_check(self) -> bool: """ Check if the storage backend is healthy and accessible. Performs a lightweight HEAD request on the bucket to verify connectivity. Returns: True if healthy, False otherwise """ try: self.client.head_bucket(Bucket=self.bucket) return True except ClientError as e: logger.warning(f"Storage health check failed: {e}") return False except Exception as e: logger.error(f"Unexpected error during storage health check: {e}") return False def get_verified(self, s3_key: str, expected_hash: str) -> bytes: """ Download and verify content matches expected SHA256 hash. This method downloads the entire content, computes its hash, and verifies it matches the expected hash before returning. Args: s3_key: The S3 storage key of the file expected_hash: Expected SHA256 hash (64 hex characters) Returns: File content as bytes (only if verification passes) Raises: ChecksumMismatchError: If computed hash doesn't match expected ClientError: If S3 operation fails """ if not is_valid_sha256(expected_hash): raise ValueError(f"Invalid SHA256 hash format: {expected_hash}") content = self.get(s3_key) actual_hash = compute_sha256(content) if actual_hash != expected_hash.lower(): raise ChecksumMismatchError( expected=expected_hash.lower(), actual=actual_hash, s3_key=s3_key, size=len(content), ) logger.debug(f"Verification passed for {s3_key}: {actual_hash[:16]}...") return content def get_stream_verified( self, s3_key: str, expected_hash: str, range_header: Optional[str] = None, ) -> Tuple[VerifyingStreamWrapper, int, Optional[str]]: """ Get a verifying stream wrapper for an object. Returns a wrapper that computes the hash as chunks are read and can verify after streaming completes. Note that verification happens AFTER content has been streamed to the client. IMPORTANT: For range requests, verification is not supported because we cannot verify a partial download against the full file hash. Args: s3_key: The S3 storage key of the file expected_hash: Expected SHA256 hash (64 hex characters) range_header: Optional HTTP Range header (verification disabled if set) Returns: Tuple of (VerifyingStreamWrapper, content_length, content_range) The wrapper has a verify() method to call after streaming. Raises: ValueError: If expected_hash is invalid format ClientError: If S3 operation fails """ if not is_valid_sha256(expected_hash): raise ValueError(f"Invalid SHA256 hash format: {expected_hash}") # Get the S3 stream stream, content_length, content_range = self.get_stream(s3_key, range_header) # For range requests, we cannot verify (partial content) # Return a HashingStreamWrapper that just tracks bytes without verification if range_header or content_range: logger.debug( f"Range request for {s3_key} - verification disabled (partial content)" ) # Return a basic hashing wrapper (caller should not verify) hashing_wrapper = HashingStreamWrapper(stream) return hashing_wrapper, content_length, content_range # Create verifying wrapper verifying_wrapper = VerifyingStreamWrapper( stream=stream, expected_hash=expected_hash, s3_key=s3_key, ) return verifying_wrapper, content_length, content_range def verify_integrity(self, s3_key: str, expected_sha256: str) -> bool: """ Verify the integrity of a stored object by downloading and re-hashing. This is an expensive operation and should only be used for critical verification scenarios. Args: s3_key: The storage key of the file expected_sha256: The expected SHA256 hash Returns: True if hash matches, False otherwise """ try: content = self.get(s3_key) actual_hash = hashlib.sha256(content).hexdigest() if actual_hash != expected_sha256: logger.error( f"Integrity verification failed for {s3_key}: " f"expected {expected_sha256[:12]}..., got {actual_hash[:12]}..." ) return False return True except Exception as e: logger.error(f"Error during integrity verification for {s3_key}: {e}") return False # Singleton instance _storage: Optional[S3Storage] = None def get_storage() -> StorageBackend: """ Get the configured storage backend instance. Currently returns S3Storage (works with S3-compatible backends like MinIO). Future implementations may support backend selection via configuration. Returns: StorageBackend instance """ global _storage if _storage is None: _storage = S3Storage() return _storage