Files
orchard/backend/app/storage.py
Mondo Diaz e51f3ab9d0 Fix S3 client to support IRSA credentials
Only pass explicit credentials to boto3 if they're actually set.
This allows the default credential chain (including IRSA web identity
tokens) to be used when no access key is configured.
2026-01-21 20:05:43 +00:00

1033 lines
36 KiB
Python

import hashlib
import logging
from typing import (
BinaryIO,
Tuple,
Optional,
Dict,
Any,
NamedTuple,
Protocol,
runtime_checkable,
)
import boto3
from botocore.config import Config
from botocore.exceptions import (
ClientError,
ConnectionError as BotoConnectionError,
EndpointConnectionError,
ReadTimeoutError,
ConnectTimeoutError,
)
from .config import get_settings
from .checksum import (
ChecksumMismatchError,
HashingStreamWrapper,
VerifyingStreamWrapper,
compute_sha256,
is_valid_sha256,
)
settings = get_settings()
logger = logging.getLogger(__name__)
# =============================================================================
# Storage Backend Protocol/Interface (ISSUE 33)
# =============================================================================
@runtime_checkable
class StorageBackend(Protocol):
"""
Abstract protocol defining the interface for storage backends.
All storage implementations (S3, MinIO, future backends) must implement
this interface to ensure consistent behavior across the application.
Note on Deduplication:
- This system uses whole-file deduplication based on SHA256 hash
- Partial/chunk-level deduplication is NOT supported (out of scope for MVP)
- Files with identical content but different metadata are deduplicated
"""
def store(
self, file: BinaryIO, content_length: Optional[int] = None
) -> "StorageResult":
"""
Store a file and return StorageResult with all checksums.
Content-addressable: if the file already exists (by hash), just return
the existing hash without uploading again.
Args:
file: File-like object to store
content_length: Optional hint for file size (enables multipart upload)
Returns:
StorageResult with sha256, size, s3_key, and optional checksums
Raises:
HashComputationError: If hash computation fails
S3ExistenceCheckError: If existence check fails after retries
S3UploadError: If upload fails
"""
...
def get(self, s3_key: str) -> bytes:
"""
Retrieve a file by its storage key.
Args:
s3_key: The storage key (path) of the file
Returns:
File content as bytes
"""
...
def get_stream(
self, s3_key: str, range_header: Optional[str] = None
) -> Tuple[Any, int, Optional[str]]:
"""
Get a streaming response for a file.
Supports range requests for partial downloads.
Args:
s3_key: The storage key of the file
range_header: Optional HTTP Range header value
Returns:
Tuple of (stream, content_length, content_range)
"""
...
def delete(self, s3_key: str) -> bool:
"""
Delete a file from storage.
Args:
s3_key: The storage key of the file to delete
Returns:
True if deleted successfully, False otherwise
"""
...
def get_object_info(self, s3_key: str) -> Optional[Dict[str, Any]]:
"""
Get object metadata without downloading content.
Args:
s3_key: The storage key of the file
Returns:
Dict with size, content_type, last_modified, etag, or None if not found
"""
...
def generate_presigned_url(
self,
s3_key: str,
expiry: Optional[int] = None,
response_content_type: Optional[str] = None,
response_content_disposition: Optional[str] = None,
) -> str:
"""
Generate a presigned URL for downloading an object.
Args:
s3_key: The storage key of the file
expiry: URL expiry in seconds
response_content_type: Override Content-Type header in response
response_content_disposition: Override Content-Disposition header
Returns:
Presigned URL string
"""
...
def health_check(self) -> bool:
"""
Check if the storage backend is healthy and accessible.
Returns:
True if healthy, False otherwise
"""
...
# Threshold for multipart upload (100MB)
MULTIPART_THRESHOLD = 100 * 1024 * 1024
# Chunk size for multipart upload (10MB)
MULTIPART_CHUNK_SIZE = 10 * 1024 * 1024
# Chunk size for streaming hash computation
HASH_CHUNK_SIZE = 8 * 1024 * 1024
# Maximum retries for S3 existence check
MAX_EXISTENCE_CHECK_RETRIES = 3
class StorageError(Exception):
"""Base exception for storage operations"""
pass
class HashComputationError(StorageError):
"""Raised when hash computation fails"""
pass
class FileSizeExceededError(StorageError):
"""Raised when file exceeds maximum size during upload"""
pass
class S3ExistenceCheckError(StorageError):
"""Raised when S3 existence check fails after retries"""
pass
class S3UploadError(StorageError):
"""Raised when S3 upload fails"""
pass
class StorageResult(NamedTuple):
"""Result of storing a file with all computed checksums"""
sha256: str
size: int
s3_key: str
md5: Optional[str] = None
sha1: Optional[str] = None
s3_etag: Optional[str] = None
already_existed: bool = (
False # True if artifact was deduplicated (S3 object already existed)
)
class S3StorageUnavailableError(StorageError):
"""Raised when S3 storage backend is unavailable"""
pass
class HashCollisionError(StorageError):
"""Raised when a hash collision is detected (extremely rare)"""
pass
class S3Storage:
def __init__(self):
# Build config with retry and timeout settings
s3_config = {}
if settings.s3_use_path_style:
s3_config["addressing_style"] = "path"
config = Config(
s3=s3_config if s3_config else None,
connect_timeout=settings.s3_connect_timeout,
read_timeout=settings.s3_read_timeout,
retries={
"max_attempts": settings.s3_max_retries,
"mode": "adaptive", # Adaptive retry mode for better handling
},
)
# Build client kwargs - only include credentials if explicitly provided
# This allows IRSA/IAM role credentials to be used when no explicit creds are set
client_kwargs = {
"endpoint_url": settings.s3_endpoint if settings.s3_endpoint else None,
"region_name": settings.s3_region,
"config": config,
"verify": settings.s3_verify_ssl,
}
if settings.s3_access_key_id and settings.s3_secret_access_key:
client_kwargs["aws_access_key_id"] = settings.s3_access_key_id
client_kwargs["aws_secret_access_key"] = settings.s3_secret_access_key
self.client = boto3.client("s3", **client_kwargs)
self.bucket = settings.s3_bucket
# Store active multipart uploads for resumable support
self._active_uploads: Dict[str, Dict[str, Any]] = {}
def store(
self, file: BinaryIO, content_length: Optional[int] = None
) -> StorageResult:
"""
Store a file and return StorageResult with all checksums.
Content-addressable: if the file already exists, just return the hash.
Uses multipart upload for files larger than MULTIPART_THRESHOLD.
"""
# For small files or unknown size, use the simple approach
if content_length is None or content_length < MULTIPART_THRESHOLD:
return self._store_simple(file)
else:
return self._store_multipart(file, content_length)
def _store_simple(self, file: BinaryIO) -> StorageResult:
"""
Store a small file using simple put_object.
Raises:
HashComputationError: If hash computation fails
FileSizeExceededError: If file exceeds maximum size
S3ExistenceCheckError: If S3 existence check fails after retries
S3UploadError: If S3 upload fails
"""
# Read file and compute all hashes with error handling
try:
content = file.read()
if not content:
raise HashComputationError("Empty file content")
size = len(content)
# Enforce file size limit (protection against Content-Length spoofing)
if size > settings.max_file_size:
raise FileSizeExceededError(
f"File size {size} exceeds maximum {settings.max_file_size}"
)
sha256_hash = hashlib.sha256(content).hexdigest()
md5_hash = hashlib.md5(content).hexdigest()
sha1_hash = hashlib.sha1(content).hexdigest()
except (HashComputationError, FileSizeExceededError):
raise
except Exception as e:
logger.error(f"Hash computation failed: {e}")
raise HashComputationError(f"Failed to compute hash: {e}") from e
# Check if already exists (with retry logic)
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
s3_etag = None
try:
exists = self._exists(s3_key)
except S3ExistenceCheckError:
# Re-raise the specific error
raise
except Exception as e:
logger.error(f"Unexpected error during S3 existence check: {e}")
raise S3ExistenceCheckError(f"Failed to check S3 existence: {e}") from e
if not exists:
try:
response = self.client.put_object(
Bucket=self.bucket,
Key=s3_key,
Body=content,
)
s3_etag = response.get("ETag", "").strip('"')
except (EndpointConnectionError, BotoConnectionError) as e:
logger.error(f"S3 storage unavailable: {e}")
raise S3StorageUnavailableError(
f"Storage backend unavailable: {e}"
) from e
except (ReadTimeoutError, ConnectTimeoutError) as e:
logger.error(f"S3 operation timed out: {e}")
raise S3UploadError(f"Upload timed out: {e}") from e
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
if error_code == "ServiceUnavailable":
logger.error(f"S3 service unavailable: {e}")
raise S3StorageUnavailableError(
f"Storage service unavailable: {e}"
) from e
logger.error(f"S3 upload failed: {e}")
raise S3UploadError(f"Failed to upload to S3: {e}") from e
else:
# Get existing ETag and verify integrity (detect potential hash collision)
obj_info = self.get_object_info(s3_key)
if obj_info:
s3_etag = obj_info.get("etag", "").strip('"')
# Check for hash collision by comparing size
existing_size = obj_info.get("size", 0)
if existing_size != size:
logger.critical(
f"HASH COLLISION DETECTED! Hash {sha256_hash} has size mismatch: "
f"existing={existing_size}, new={size}. This is extremely rare."
)
raise HashCollisionError(
f"Hash collision detected for {sha256_hash}: size mismatch"
)
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
already_existed=exists,
)
def _store_multipart(self, file: BinaryIO, content_length: int) -> StorageResult:
"""
Store a large file using S3 multipart upload with streaming hash computation.
Raises:
HashComputationError: If hash computation fails
FileSizeExceededError: If file exceeds maximum size
S3ExistenceCheckError: If S3 existence check fails after retries
S3UploadError: If S3 upload fails
"""
# First pass: compute all hashes by streaming through file
try:
import time
sha256_hasher = hashlib.sha256()
md5_hasher = hashlib.md5()
sha1_hasher = hashlib.sha1()
size = 0
hash_start_time = time.time()
last_log_time = hash_start_time
log_interval_seconds = 5 # Log progress every 5 seconds
logger.info(f"Computing hashes for large file: expected_size={content_length}")
# Read file in chunks to compute hashes
while True:
chunk = file.read(HASH_CHUNK_SIZE)
if not chunk:
break
sha256_hasher.update(chunk)
md5_hasher.update(chunk)
sha1_hasher.update(chunk)
size += len(chunk)
# Log hash computation progress periodically
current_time = time.time()
if current_time - last_log_time >= log_interval_seconds:
elapsed = current_time - hash_start_time
percent = (size / content_length) * 100 if content_length > 0 else 0
throughput = (size / (1024 * 1024)) / elapsed if elapsed > 0 else 0
logger.info(
f"Hash computation progress: bytes={size}/{content_length} ({percent:.1f}%) "
f"throughput={throughput:.2f}MB/s"
)
last_log_time = current_time
# Enforce file size limit during streaming (protection against spoofing)
if size > settings.max_file_size:
raise FileSizeExceededError(
f"File size exceeds maximum {settings.max_file_size}"
)
if size == 0:
raise HashComputationError("Empty file content")
sha256_hash = sha256_hasher.hexdigest()
md5_hash = md5_hasher.hexdigest()
sha1_hash = sha1_hasher.hexdigest()
# Log hash computation completion
hash_elapsed = time.time() - hash_start_time
hash_throughput = (size / (1024 * 1024)) / hash_elapsed if hash_elapsed > 0 else 0
logger.info(
f"Hash computation completed: hash={sha256_hash[:16]}... "
f"size={size} duration={hash_elapsed:.2f}s throughput={hash_throughput:.2f}MB/s"
)
except (HashComputationError, FileSizeExceededError):
raise
except Exception as e:
logger.error(f"Hash computation failed for multipart upload: {e}")
raise HashComputationError(f"Failed to compute hash: {e}") from e
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
# Check if already exists (deduplication) with retry logic
try:
exists = self._exists(s3_key)
except S3ExistenceCheckError:
raise
except Exception as e:
logger.error(f"Unexpected error during S3 existence check: {e}")
raise S3ExistenceCheckError(f"Failed to check S3 existence: {e}") from e
if exists:
obj_info = self.get_object_info(s3_key)
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
# Check for hash collision by comparing size
if obj_info:
existing_size = obj_info.get("size", 0)
if existing_size != size:
logger.critical(
f"HASH COLLISION DETECTED! Hash {sha256_hash} has size mismatch: "
f"existing={existing_size}, new={size}. This is extremely rare."
)
raise HashCollisionError(
f"Hash collision detected for {sha256_hash}: size mismatch"
)
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
already_existed=True,
)
# Seek back to start for upload
file.seek(0)
# Start multipart upload
try:
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
except (EndpointConnectionError, BotoConnectionError) as e:
logger.error(f"S3 storage unavailable for multipart upload: {e}")
raise S3StorageUnavailableError(f"Storage backend unavailable: {e}") from e
upload_id = mpu["UploadId"]
try:
import time
parts = []
part_number = 1
bytes_uploaded = 0
upload_start_time = time.time()
last_log_time = upload_start_time
log_interval_seconds = 5 # Log progress every 5 seconds
total_parts = (content_length + MULTIPART_CHUNK_SIZE - 1) // MULTIPART_CHUNK_SIZE
logger.info(
f"Starting multipart upload: hash={sha256_hash[:16]}... "
f"size={content_length} parts={total_parts}"
)
while True:
chunk = file.read(MULTIPART_CHUNK_SIZE)
if not chunk:
break
response = self.client.upload_part(
Bucket=self.bucket,
Key=s3_key,
UploadId=upload_id,
PartNumber=part_number,
Body=chunk,
)
parts.append(
{
"PartNumber": part_number,
"ETag": response["ETag"],
}
)
bytes_uploaded += len(chunk)
# Log progress periodically
current_time = time.time()
if current_time - last_log_time >= log_interval_seconds:
elapsed = current_time - upload_start_time
percent = (bytes_uploaded / content_length) * 100
throughput = (bytes_uploaded / (1024 * 1024)) / elapsed if elapsed > 0 else 0
logger.info(
f"Upload progress: hash={sha256_hash[:16]}... "
f"part={part_number}/{total_parts} "
f"bytes={bytes_uploaded}/{content_length} ({percent:.1f}%) "
f"throughput={throughput:.2f}MB/s"
)
last_log_time = current_time
part_number += 1
# Log completion
total_elapsed = time.time() - upload_start_time
final_throughput = (content_length / (1024 * 1024)) / total_elapsed if total_elapsed > 0 else 0
logger.info(
f"Multipart upload completed: hash={sha256_hash[:16]}... "
f"size={content_length} duration={total_elapsed:.2f}s throughput={final_throughput:.2f}MB/s"
)
# Complete multipart upload
complete_response = self.client.complete_multipart_upload(
Bucket=self.bucket,
Key=s3_key,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
s3_etag = complete_response.get("ETag", "").strip('"')
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
already_existed=False,
)
except Exception as e:
# Abort multipart upload on failure
error_str = str(e).lower()
is_client_disconnect = (
isinstance(e, (ConnectionResetError, BrokenPipeError)) or
"connection" in error_str or "broken pipe" in error_str or "reset" in error_str
)
if is_client_disconnect:
logger.warning(
f"Multipart upload aborted (client disconnect): hash={sha256_hash[:16]}... "
f"parts_uploaded={len(parts)} bytes_uploaded={bytes_uploaded}"
)
else:
logger.error(f"Multipart upload failed: hash={sha256_hash[:16]}... error={e}")
try:
self.client.abort_multipart_upload(
Bucket=self.bucket,
Key=s3_key,
UploadId=upload_id,
)
logger.info(f"Multipart upload aborted and cleaned up: upload_id={upload_id[:16]}...")
except Exception as abort_error:
logger.error(f"Failed to abort multipart upload: {abort_error}")
raise
def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]:
"""
Initiate a resumable upload session.
Returns upload session info including upload_id.
"""
s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
# Check if already exists
if self._exists(s3_key):
return {
"upload_id": None,
"s3_key": s3_key,
"already_exists": True,
"parts": [],
}
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
upload_id = mpu["UploadId"]
import time
session = {
"upload_id": upload_id,
"s3_key": s3_key,
"already_exists": False,
"parts": [],
"expected_hash": expected_hash,
"started_at": time.time(),
"bytes_uploaded": 0,
"expected_size": None, # Set when init provides size
"status": "in_progress",
}
self._active_uploads[upload_id] = session
return session
def upload_part(
self, upload_id: str, part_number: int, data: bytes
) -> Dict[str, Any]:
"""
Upload a part for a resumable upload.
Returns part info including ETag.
"""
session = self._active_uploads.get(upload_id)
if not session:
raise ValueError(f"Unknown upload session: {upload_id}")
response = self.client.upload_part(
Bucket=self.bucket,
Key=session["s3_key"],
UploadId=upload_id,
PartNumber=part_number,
Body=data,
)
part_info = {
"PartNumber": part_number,
"ETag": response["ETag"],
"size": len(data),
}
session["parts"].append(part_info)
session["bytes_uploaded"] = session.get("bytes_uploaded", 0) + len(data)
return part_info
def get_upload_progress(self, upload_id: str) -> Optional[Dict[str, Any]]:
"""
Get progress information for a resumable upload.
Returns None if upload not found.
"""
import time
session = self._active_uploads.get(upload_id)
if not session:
return None
bytes_uploaded = session.get("bytes_uploaded", 0)
expected_size = session.get("expected_size")
started_at = session.get("started_at")
progress = {
"upload_id": upload_id,
"status": session.get("status", "in_progress"),
"bytes_uploaded": bytes_uploaded,
"bytes_total": expected_size,
"parts_uploaded": len(session.get("parts", [])),
"parts_total": None,
"started_at": started_at,
"elapsed_seconds": None,
"percent_complete": None,
"throughput_mbps": None,
}
if expected_size and expected_size > 0:
progress["percent_complete"] = round((bytes_uploaded / expected_size) * 100, 2)
progress["parts_total"] = (expected_size + MULTIPART_CHUNK_SIZE - 1) // MULTIPART_CHUNK_SIZE
if started_at:
elapsed = time.time() - started_at
progress["elapsed_seconds"] = round(elapsed, 2)
if elapsed > 0 and bytes_uploaded > 0:
progress["throughput_mbps"] = round((bytes_uploaded / (1024 * 1024)) / elapsed, 2)
return progress
def set_upload_expected_size(self, upload_id: str, size: int):
"""Set the expected size for an upload (for progress tracking)."""
session = self._active_uploads.get(upload_id)
if session:
session["expected_size"] = size
def complete_resumable_upload(self, upload_id: str) -> Tuple[str, str]:
"""
Complete a resumable upload.
Returns (sha256_hash, s3_key).
"""
session = self._active_uploads.get(upload_id)
if not session:
raise ValueError(f"Unknown upload session: {upload_id}")
# Sort parts by part number
sorted_parts = sorted(session["parts"], key=lambda x: x["PartNumber"])
self.client.complete_multipart_upload(
Bucket=self.bucket,
Key=session["s3_key"],
UploadId=upload_id,
MultipartUpload={"Parts": sorted_parts},
)
# Clean up session
del self._active_uploads[upload_id]
return session["expected_hash"], session["s3_key"]
def abort_resumable_upload(self, upload_id: str):
"""Abort a resumable upload"""
session = self._active_uploads.get(upload_id)
if session:
self.client.abort_multipart_upload(
Bucket=self.bucket,
Key=session["s3_key"],
UploadId=upload_id,
)
del self._active_uploads[upload_id]
def list_upload_parts(self, upload_id: str) -> list:
"""List uploaded parts for a resumable upload (for resume support)"""
session = self._active_uploads.get(upload_id)
if not session:
raise ValueError(f"Unknown upload session: {upload_id}")
response = self.client.list_parts(
Bucket=self.bucket,
Key=session["s3_key"],
UploadId=upload_id,
)
return response.get("Parts", [])
def get(self, s3_key: str) -> bytes:
"""Retrieve a file by its S3 key"""
response = self.client.get_object(Bucket=self.bucket, Key=s3_key)
return response["Body"].read()
def get_stream(self, s3_key: str, range_header: Optional[str] = None):
"""
Get a streaming response for a file.
Supports range requests for partial downloads.
Returns (stream, content_length, content_range, accept_ranges)
"""
kwargs = {"Bucket": self.bucket, "Key": s3_key}
if range_header:
kwargs["Range"] = range_header
response = self.client.get_object(**kwargs)
content_length = response.get("ContentLength", 0)
content_range = response.get("ContentRange")
return response["Body"], content_length, content_range
def get_object_info(self, s3_key: str) -> Dict[str, Any]:
"""Get object metadata without downloading content"""
try:
response = self.client.head_object(Bucket=self.bucket, Key=s3_key)
return {
"size": response.get("ContentLength", 0),
"content_type": response.get("ContentType"),
"last_modified": response.get("LastModified"),
"etag": response.get("ETag"),
}
except ClientError:
return None
def _exists(self, s3_key: str, retry: bool = True) -> bool:
"""
Check if an object exists with optional retry logic.
Args:
s3_key: The S3 key to check
retry: Whether to retry on transient failures (default: True)
Returns:
True if object exists, False otherwise
Raises:
S3ExistenceCheckError: If all retries fail due to non-404 errors
"""
import time
max_retries = MAX_EXISTENCE_CHECK_RETRIES if retry else 1
last_error = None
for attempt in range(max_retries):
try:
self.client.head_object(Bucket=self.bucket, Key=s3_key)
return True
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
# 404 means object doesn't exist - not an error
if error_code in ("404", "NoSuchKey"):
return False
# For other errors, retry
last_error = e
if attempt < max_retries - 1:
logger.warning(
f"S3 existence check failed (attempt {attempt + 1}/{max_retries}): {e}"
)
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
# All retries failed
logger.error(
f"S3 existence check failed after {max_retries} attempts: {last_error}"
)
raise S3ExistenceCheckError(
f"Failed to check S3 object existence after {max_retries} attempts: {last_error}"
)
def delete(self, s3_key: str) -> bool:
"""Delete an object"""
try:
self.client.delete_object(Bucket=self.bucket, Key=s3_key)
return True
except ClientError:
return False
def generate_presigned_url(
self,
s3_key: str,
expiry: Optional[int] = None,
response_content_type: Optional[str] = None,
response_content_disposition: Optional[str] = None,
) -> str:
"""
Generate a presigned URL for downloading an object.
Args:
s3_key: The S3 key of the object
expiry: URL expiry in seconds (defaults to settings.presigned_url_expiry)
response_content_type: Override Content-Type header in response
response_content_disposition: Override Content-Disposition header in response
Returns:
Presigned URL string
"""
if expiry is None:
expiry = settings.presigned_url_expiry
params = {
"Bucket": self.bucket,
"Key": s3_key,
}
# Add response header overrides if specified
if response_content_type:
params["ResponseContentType"] = response_content_type
if response_content_disposition:
params["ResponseContentDisposition"] = response_content_disposition
url = self.client.generate_presigned_url(
"get_object",
Params=params,
ExpiresIn=expiry,
)
return url
def health_check(self) -> bool:
"""
Check if the storage backend is healthy and accessible.
Performs a lightweight HEAD request on the bucket to verify connectivity.
Returns:
True if healthy, False otherwise
"""
try:
self.client.head_bucket(Bucket=self.bucket)
return True
except ClientError as e:
logger.warning(f"Storage health check failed: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error during storage health check: {e}")
return False
def get_verified(self, s3_key: str, expected_hash: str) -> bytes:
"""
Download and verify content matches expected SHA256 hash.
This method downloads the entire content, computes its hash, and
verifies it matches the expected hash before returning.
Args:
s3_key: The S3 storage key of the file
expected_hash: Expected SHA256 hash (64 hex characters)
Returns:
File content as bytes (only if verification passes)
Raises:
ChecksumMismatchError: If computed hash doesn't match expected
ClientError: If S3 operation fails
"""
if not is_valid_sha256(expected_hash):
raise ValueError(f"Invalid SHA256 hash format: {expected_hash}")
content = self.get(s3_key)
actual_hash = compute_sha256(content)
if actual_hash != expected_hash.lower():
raise ChecksumMismatchError(
expected=expected_hash.lower(),
actual=actual_hash,
s3_key=s3_key,
size=len(content),
)
logger.debug(f"Verification passed for {s3_key}: {actual_hash[:16]}...")
return content
def get_stream_verified(
self,
s3_key: str,
expected_hash: str,
range_header: Optional[str] = None,
) -> Tuple[VerifyingStreamWrapper, int, Optional[str]]:
"""
Get a verifying stream wrapper for an object.
Returns a wrapper that computes the hash as chunks are read and
can verify after streaming completes. Note that verification happens
AFTER content has been streamed to the client.
IMPORTANT: For range requests, verification is not supported because
we cannot verify a partial download against the full file hash.
Args:
s3_key: The S3 storage key of the file
expected_hash: Expected SHA256 hash (64 hex characters)
range_header: Optional HTTP Range header (verification disabled if set)
Returns:
Tuple of (VerifyingStreamWrapper, content_length, content_range)
The wrapper has a verify() method to call after streaming.
Raises:
ValueError: If expected_hash is invalid format
ClientError: If S3 operation fails
"""
if not is_valid_sha256(expected_hash):
raise ValueError(f"Invalid SHA256 hash format: {expected_hash}")
# Get the S3 stream
stream, content_length, content_range = self.get_stream(s3_key, range_header)
# For range requests, we cannot verify (partial content)
# Return a HashingStreamWrapper that just tracks bytes without verification
if range_header or content_range:
logger.debug(
f"Range request for {s3_key} - verification disabled (partial content)"
)
# Return a basic hashing wrapper (caller should not verify)
hashing_wrapper = HashingStreamWrapper(stream)
return hashing_wrapper, content_length, content_range
# Create verifying wrapper
verifying_wrapper = VerifyingStreamWrapper(
stream=stream,
expected_hash=expected_hash,
s3_key=s3_key,
)
return verifying_wrapper, content_length, content_range
def verify_integrity(self, s3_key: str, expected_sha256: str) -> bool:
"""
Verify the integrity of a stored object by downloading and re-hashing.
This is an expensive operation and should only be used for critical
verification scenarios.
Args:
s3_key: The storage key of the file
expected_sha256: The expected SHA256 hash
Returns:
True if hash matches, False otherwise
"""
try:
content = self.get(s3_key)
actual_hash = hashlib.sha256(content).hexdigest()
if actual_hash != expected_sha256:
logger.error(
f"Integrity verification failed for {s3_key}: "
f"expected {expected_sha256[:12]}..., got {actual_hash[:12]}..."
)
return False
return True
except Exception as e:
logger.error(f"Error during integrity verification for {s3_key}: {e}")
return False
# Singleton instance
_storage: Optional[S3Storage] = None
def get_storage() -> StorageBackend:
"""
Get the configured storage backend instance.
Currently returns S3Storage (works with S3-compatible backends like MinIO).
Future implementations may support backend selection via configuration.
Returns:
StorageBackend instance
"""
global _storage
if _storage is None:
_storage = S3Storage()
return _storage