Files
orchard/backend/app/storage.py

1022 lines
34 KiB
Python

import hashlib
import logging
from typing import (
BinaryIO,
Tuple,
Optional,
Dict,
Any,
Generator,
NamedTuple,
Protocol,
runtime_checkable,
)
import boto3
from botocore.config import Config
from botocore.exceptions import (
ClientError,
ConnectionError as BotoConnectionError,
EndpointConnectionError,
ReadTimeoutError,
ConnectTimeoutError,
)
from .config import get_settings
from .checksum import (
ChecksumMismatchError,
HashingStreamWrapper,
VerifyingStreamWrapper,
compute_sha256,
is_valid_sha256,
)
settings = get_settings()
logger = logging.getLogger(__name__)
# =============================================================================
# Storage Backend Protocol/Interface (ISSUE 33)
# =============================================================================
@runtime_checkable
class StorageBackend(Protocol):
"""
Abstract protocol defining the interface for storage backends.
All storage implementations (S3, MinIO, future backends) must implement
this interface to ensure consistent behavior across the application.
Note on Deduplication:
- This system uses whole-file deduplication based on SHA256 hash
- Partial/chunk-level deduplication is NOT supported (out of scope for MVP)
- Files with identical content but different metadata are deduplicated
"""
def store(
self, file: BinaryIO, content_length: Optional[int] = None
) -> "StorageResult":
"""
Store a file and return StorageResult with all checksums.
Content-addressable: if the file already exists (by hash), just return
the existing hash without uploading again.
Args:
file: File-like object to store
content_length: Optional hint for file size (enables multipart upload)
Returns:
StorageResult with sha256, size, s3_key, and optional checksums
Raises:
HashComputationError: If hash computation fails
S3ExistenceCheckError: If existence check fails after retries
S3UploadError: If upload fails
"""
...
def get(self, s3_key: str) -> bytes:
"""
Retrieve a file by its storage key.
Args:
s3_key: The storage key (path) of the file
Returns:
File content as bytes
"""
...
def get_stream(
self, s3_key: str, range_header: Optional[str] = None
) -> Tuple[Any, int, Optional[str]]:
"""
Get a streaming response for a file.
Supports range requests for partial downloads.
Args:
s3_key: The storage key of the file
range_header: Optional HTTP Range header value
Returns:
Tuple of (stream, content_length, content_range)
"""
...
def delete(self, s3_key: str) -> bool:
"""
Delete a file from storage.
Args:
s3_key: The storage key of the file to delete
Returns:
True if deleted successfully, False otherwise
"""
...
def get_object_info(self, s3_key: str) -> Optional[Dict[str, Any]]:
"""
Get object metadata without downloading content.
Args:
s3_key: The storage key of the file
Returns:
Dict with size, content_type, last_modified, etag, or None if not found
"""
...
def generate_presigned_url(
self,
s3_key: str,
expiry: Optional[int] = None,
response_content_type: Optional[str] = None,
response_content_disposition: Optional[str] = None,
) -> str:
"""
Generate a presigned URL for downloading an object.
Args:
s3_key: The storage key of the file
expiry: URL expiry in seconds
response_content_type: Override Content-Type header in response
response_content_disposition: Override Content-Disposition header
Returns:
Presigned URL string
"""
...
def health_check(self) -> bool:
"""
Check if the storage backend is healthy and accessible.
Returns:
True if healthy, False otherwise
"""
...
# Threshold for multipart upload (100MB)
MULTIPART_THRESHOLD = 100 * 1024 * 1024
# Chunk size for multipart upload (10MB)
MULTIPART_CHUNK_SIZE = 10 * 1024 * 1024
# Chunk size for streaming hash computation
HASH_CHUNK_SIZE = 8 * 1024 * 1024
# Maximum retries for S3 existence check
MAX_EXISTENCE_CHECK_RETRIES = 3
class StorageError(Exception):
"""Base exception for storage operations"""
pass
class HashComputationError(StorageError):
"""Raised when hash computation fails"""
pass
class FileSizeExceededError(StorageError):
"""Raised when file exceeds maximum size during upload"""
pass
class S3ExistenceCheckError(StorageError):
"""Raised when S3 existence check fails after retries"""
pass
class S3UploadError(StorageError):
"""Raised when S3 upload fails"""
pass
class StorageResult(NamedTuple):
"""Result of storing a file with all computed checksums"""
sha256: str
size: int
s3_key: str
md5: Optional[str] = None
sha1: Optional[str] = None
s3_etag: Optional[str] = None
already_existed: bool = (
False # True if artifact was deduplicated (S3 object already existed)
)
class S3StorageUnavailableError(StorageError):
"""Raised when S3 storage backend is unavailable"""
pass
class HashCollisionError(StorageError):
"""Raised when a hash collision is detected (extremely rare)"""
pass
class S3Storage:
def __init__(self):
# Build config with retry and timeout settings
s3_config = {}
if settings.s3_use_path_style:
s3_config["addressing_style"] = "path"
config = Config(
s3=s3_config if s3_config else None,
connect_timeout=settings.s3_connect_timeout,
read_timeout=settings.s3_read_timeout,
retries={
"max_attempts": settings.s3_max_retries,
"mode": "adaptive", # Adaptive retry mode for better handling
},
)
self.client = boto3.client(
"s3",
endpoint_url=settings.s3_endpoint if settings.s3_endpoint else None,
region_name=settings.s3_region,
aws_access_key_id=settings.s3_access_key_id,
aws_secret_access_key=settings.s3_secret_access_key,
config=config,
verify=settings.s3_verify_ssl, # SSL/TLS verification
)
self.bucket = settings.s3_bucket
# Store active multipart uploads for resumable support
self._active_uploads: Dict[str, Dict[str, Any]] = {}
def store(
self, file: BinaryIO, content_length: Optional[int] = None
) -> StorageResult:
"""
Store a file and return StorageResult with all checksums.
Content-addressable: if the file already exists, just return the hash.
Uses multipart upload for files larger than MULTIPART_THRESHOLD.
"""
# For small files or unknown size, use the simple approach
if content_length is None or content_length < MULTIPART_THRESHOLD:
return self._store_simple(file)
else:
return self._store_multipart(file, content_length)
def _store_simple(self, file: BinaryIO) -> StorageResult:
"""
Store a small file using simple put_object.
Raises:
HashComputationError: If hash computation fails
FileSizeExceededError: If file exceeds maximum size
S3ExistenceCheckError: If S3 existence check fails after retries
S3UploadError: If S3 upload fails
"""
# Read file and compute all hashes with error handling
try:
content = file.read()
if not content:
raise HashComputationError("Empty file content")
size = len(content)
# Enforce file size limit (protection against Content-Length spoofing)
if size > settings.max_file_size:
raise FileSizeExceededError(
f"File size {size} exceeds maximum {settings.max_file_size}"
)
sha256_hash = hashlib.sha256(content).hexdigest()
md5_hash = hashlib.md5(content).hexdigest()
sha1_hash = hashlib.sha1(content).hexdigest()
except (HashComputationError, FileSizeExceededError):
raise
except Exception as e:
logger.error(f"Hash computation failed: {e}")
raise HashComputationError(f"Failed to compute hash: {e}") from e
# Check if already exists (with retry logic)
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
s3_etag = None
try:
exists = self._exists(s3_key)
except S3ExistenceCheckError:
# Re-raise the specific error
raise
except Exception as e:
logger.error(f"Unexpected error during S3 existence check: {e}")
raise S3ExistenceCheckError(f"Failed to check S3 existence: {e}") from e
if not exists:
try:
response = self.client.put_object(
Bucket=self.bucket,
Key=s3_key,
Body=content,
)
s3_etag = response.get("ETag", "").strip('"')
except (EndpointConnectionError, BotoConnectionError) as e:
logger.error(f"S3 storage unavailable: {e}")
raise S3StorageUnavailableError(
f"Storage backend unavailable: {e}"
) from e
except (ReadTimeoutError, ConnectTimeoutError) as e:
logger.error(f"S3 operation timed out: {e}")
raise S3UploadError(f"Upload timed out: {e}") from e
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
if error_code == "ServiceUnavailable":
logger.error(f"S3 service unavailable: {e}")
raise S3StorageUnavailableError(
f"Storage service unavailable: {e}"
) from e
logger.error(f"S3 upload failed: {e}")
raise S3UploadError(f"Failed to upload to S3: {e}") from e
else:
# Get existing ETag and verify integrity (detect potential hash collision)
obj_info = self.get_object_info(s3_key)
if obj_info:
s3_etag = obj_info.get("etag", "").strip('"')
# Check for hash collision by comparing size
existing_size = obj_info.get("size", 0)
if existing_size != size:
logger.critical(
f"HASH COLLISION DETECTED! Hash {sha256_hash} has size mismatch: "
f"existing={existing_size}, new={size}. This is extremely rare."
)
raise HashCollisionError(
f"Hash collision detected for {sha256_hash}: size mismatch"
)
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
already_existed=exists,
)
def _store_multipart(self, file: BinaryIO, content_length: int) -> StorageResult:
"""
Store a large file using S3 multipart upload with streaming hash computation.
Raises:
HashComputationError: If hash computation fails
FileSizeExceededError: If file exceeds maximum size
S3ExistenceCheckError: If S3 existence check fails after retries
S3UploadError: If S3 upload fails
"""
# First pass: compute all hashes by streaming through file
try:
sha256_hasher = hashlib.sha256()
md5_hasher = hashlib.md5()
sha1_hasher = hashlib.sha1()
size = 0
# Read file in chunks to compute hashes
while True:
chunk = file.read(HASH_CHUNK_SIZE)
if not chunk:
break
sha256_hasher.update(chunk)
md5_hasher.update(chunk)
sha1_hasher.update(chunk)
size += len(chunk)
# Enforce file size limit during streaming (protection against spoofing)
if size > settings.max_file_size:
raise FileSizeExceededError(
f"File size exceeds maximum {settings.max_file_size}"
)
if size == 0:
raise HashComputationError("Empty file content")
sha256_hash = sha256_hasher.hexdigest()
md5_hash = md5_hasher.hexdigest()
sha1_hash = sha1_hasher.hexdigest()
except (HashComputationError, FileSizeExceededError):
raise
except Exception as e:
logger.error(f"Hash computation failed for multipart upload: {e}")
raise HashComputationError(f"Failed to compute hash: {e}") from e
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
# Check if already exists (deduplication) with retry logic
try:
exists = self._exists(s3_key)
except S3ExistenceCheckError:
raise
except Exception as e:
logger.error(f"Unexpected error during S3 existence check: {e}")
raise S3ExistenceCheckError(f"Failed to check S3 existence: {e}") from e
if exists:
obj_info = self.get_object_info(s3_key)
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
# Check for hash collision by comparing size
if obj_info:
existing_size = obj_info.get("size", 0)
if existing_size != size:
logger.critical(
f"HASH COLLISION DETECTED! Hash {sha256_hash} has size mismatch: "
f"existing={existing_size}, new={size}. This is extremely rare."
)
raise HashCollisionError(
f"Hash collision detected for {sha256_hash}: size mismatch"
)
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
already_existed=True,
)
# Seek back to start for upload
file.seek(0)
# Start multipart upload
try:
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
except (EndpointConnectionError, BotoConnectionError) as e:
logger.error(f"S3 storage unavailable for multipart upload: {e}")
raise S3StorageUnavailableError(f"Storage backend unavailable: {e}") from e
upload_id = mpu["UploadId"]
try:
parts = []
part_number = 1
while True:
chunk = file.read(MULTIPART_CHUNK_SIZE)
if not chunk:
break
response = self.client.upload_part(
Bucket=self.bucket,
Key=s3_key,
UploadId=upload_id,
PartNumber=part_number,
Body=chunk,
)
parts.append(
{
"PartNumber": part_number,
"ETag": response["ETag"],
}
)
part_number += 1
# Complete multipart upload
complete_response = self.client.complete_multipart_upload(
Bucket=self.bucket,
Key=s3_key,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
s3_etag = complete_response.get("ETag", "").strip('"')
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
already_existed=False,
)
except Exception as e:
# Abort multipart upload on failure
logger.error(f"Multipart upload failed: {e}")
self.client.abort_multipart_upload(
Bucket=self.bucket,
Key=s3_key,
UploadId=upload_id,
)
raise
def store_streaming(self, chunks: Generator[bytes, None, None]) -> StorageResult:
"""
Store a file from a stream of chunks.
First accumulates to compute hash, then uploads.
For truly large files, consider using initiate_resumable_upload instead.
"""
# Accumulate chunks and compute all hashes
sha256_hasher = hashlib.sha256()
md5_hasher = hashlib.md5()
sha1_hasher = hashlib.sha1()
all_chunks = []
size = 0
for chunk in chunks:
sha256_hasher.update(chunk)
md5_hasher.update(chunk)
sha1_hasher.update(chunk)
all_chunks.append(chunk)
size += len(chunk)
sha256_hash = sha256_hasher.hexdigest()
md5_hash = md5_hasher.hexdigest()
sha1_hash = sha1_hasher.hexdigest()
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
s3_etag = None
# Check if already exists
if self._exists(s3_key):
obj_info = self.get_object_info(s3_key)
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
already_existed=True,
)
# Upload based on size
if size < MULTIPART_THRESHOLD:
content = b"".join(all_chunks)
response = self.client.put_object(
Bucket=self.bucket, Key=s3_key, Body=content
)
s3_etag = response.get("ETag", "").strip('"')
else:
# Use multipart for large files
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
upload_id = mpu["UploadId"]
try:
parts = []
part_number = 1
buffer = b""
for chunk in all_chunks:
buffer += chunk
while len(buffer) >= MULTIPART_CHUNK_SIZE:
part_data = buffer[:MULTIPART_CHUNK_SIZE]
buffer = buffer[MULTIPART_CHUNK_SIZE:]
response = self.client.upload_part(
Bucket=self.bucket,
Key=s3_key,
UploadId=upload_id,
PartNumber=part_number,
Body=part_data,
)
parts.append(
{
"PartNumber": part_number,
"ETag": response["ETag"],
}
)
part_number += 1
# Upload remaining buffer
if buffer:
response = self.client.upload_part(
Bucket=self.bucket,
Key=s3_key,
UploadId=upload_id,
PartNumber=part_number,
Body=buffer,
)
parts.append(
{
"PartNumber": part_number,
"ETag": response["ETag"],
}
)
complete_response = self.client.complete_multipart_upload(
Bucket=self.bucket,
Key=s3_key,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
s3_etag = complete_response.get("ETag", "").strip('"')
except Exception as e:
logger.error(f"Streaming multipart upload failed: {e}")
self.client.abort_multipart_upload(
Bucket=self.bucket,
Key=s3_key,
UploadId=upload_id,
)
raise
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
already_existed=False,
)
def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]:
"""
Initiate a resumable upload session.
Returns upload session info including upload_id.
"""
s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
# Check if already exists
if self._exists(s3_key):
return {
"upload_id": None,
"s3_key": s3_key,
"already_exists": True,
"parts": [],
}
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
upload_id = mpu["UploadId"]
session = {
"upload_id": upload_id,
"s3_key": s3_key,
"already_exists": False,
"parts": [],
"expected_hash": expected_hash,
}
self._active_uploads[upload_id] = session
return session
def upload_part(
self, upload_id: str, part_number: int, data: bytes
) -> Dict[str, Any]:
"""
Upload a part for a resumable upload.
Returns part info including ETag.
"""
session = self._active_uploads.get(upload_id)
if not session:
raise ValueError(f"Unknown upload session: {upload_id}")
response = self.client.upload_part(
Bucket=self.bucket,
Key=session["s3_key"],
UploadId=upload_id,
PartNumber=part_number,
Body=data,
)
part_info = {
"PartNumber": part_number,
"ETag": response["ETag"],
}
session["parts"].append(part_info)
return part_info
def complete_resumable_upload(self, upload_id: str) -> Tuple[str, str]:
"""
Complete a resumable upload.
Returns (sha256_hash, s3_key).
"""
session = self._active_uploads.get(upload_id)
if not session:
raise ValueError(f"Unknown upload session: {upload_id}")
# Sort parts by part number
sorted_parts = sorted(session["parts"], key=lambda x: x["PartNumber"])
self.client.complete_multipart_upload(
Bucket=self.bucket,
Key=session["s3_key"],
UploadId=upload_id,
MultipartUpload={"Parts": sorted_parts},
)
# Clean up session
del self._active_uploads[upload_id]
return session["expected_hash"], session["s3_key"]
def abort_resumable_upload(self, upload_id: str):
"""Abort a resumable upload"""
session = self._active_uploads.get(upload_id)
if session:
self.client.abort_multipart_upload(
Bucket=self.bucket,
Key=session["s3_key"],
UploadId=upload_id,
)
del self._active_uploads[upload_id]
def list_upload_parts(self, upload_id: str) -> list:
"""List uploaded parts for a resumable upload (for resume support)"""
session = self._active_uploads.get(upload_id)
if not session:
raise ValueError(f"Unknown upload session: {upload_id}")
response = self.client.list_parts(
Bucket=self.bucket,
Key=session["s3_key"],
UploadId=upload_id,
)
return response.get("Parts", [])
def get(self, s3_key: str) -> bytes:
"""Retrieve a file by its S3 key"""
response = self.client.get_object(Bucket=self.bucket, Key=s3_key)
return response["Body"].read()
def get_stream(self, s3_key: str, range_header: Optional[str] = None):
"""
Get a streaming response for a file.
Supports range requests for partial downloads.
Returns (stream, content_length, content_range, accept_ranges)
"""
kwargs = {"Bucket": self.bucket, "Key": s3_key}
if range_header:
kwargs["Range"] = range_header
response = self.client.get_object(**kwargs)
content_length = response.get("ContentLength", 0)
content_range = response.get("ContentRange")
return response["Body"], content_length, content_range
def get_object_info(self, s3_key: str) -> Dict[str, Any]:
"""Get object metadata without downloading content"""
try:
response = self.client.head_object(Bucket=self.bucket, Key=s3_key)
return {
"size": response.get("ContentLength", 0),
"content_type": response.get("ContentType"),
"last_modified": response.get("LastModified"),
"etag": response.get("ETag"),
}
except ClientError:
return None
def _exists(self, s3_key: str, retry: bool = True) -> bool:
"""
Check if an object exists with optional retry logic.
Args:
s3_key: The S3 key to check
retry: Whether to retry on transient failures (default: True)
Returns:
True if object exists, False otherwise
Raises:
S3ExistenceCheckError: If all retries fail due to non-404 errors
"""
import time
max_retries = MAX_EXISTENCE_CHECK_RETRIES if retry else 1
last_error = None
for attempt in range(max_retries):
try:
self.client.head_object(Bucket=self.bucket, Key=s3_key)
return True
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
# 404 means object doesn't exist - not an error
if error_code in ("404", "NoSuchKey"):
return False
# For other errors, retry
last_error = e
if attempt < max_retries - 1:
logger.warning(
f"S3 existence check failed (attempt {attempt + 1}/{max_retries}): {e}"
)
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
# All retries failed
logger.error(
f"S3 existence check failed after {max_retries} attempts: {last_error}"
)
raise S3ExistenceCheckError(
f"Failed to check S3 object existence after {max_retries} attempts: {last_error}"
)
def delete(self, s3_key: str) -> bool:
"""Delete an object"""
try:
self.client.delete_object(Bucket=self.bucket, Key=s3_key)
return True
except ClientError:
return False
def generate_presigned_url(
self,
s3_key: str,
expiry: Optional[int] = None,
response_content_type: Optional[str] = None,
response_content_disposition: Optional[str] = None,
) -> str:
"""
Generate a presigned URL for downloading an object.
Args:
s3_key: The S3 key of the object
expiry: URL expiry in seconds (defaults to settings.presigned_url_expiry)
response_content_type: Override Content-Type header in response
response_content_disposition: Override Content-Disposition header in response
Returns:
Presigned URL string
"""
if expiry is None:
expiry = settings.presigned_url_expiry
params = {
"Bucket": self.bucket,
"Key": s3_key,
}
# Add response header overrides if specified
if response_content_type:
params["ResponseContentType"] = response_content_type
if response_content_disposition:
params["ResponseContentDisposition"] = response_content_disposition
url = self.client.generate_presigned_url(
"get_object",
Params=params,
ExpiresIn=expiry,
)
return url
def health_check(self) -> bool:
"""
Check if the storage backend is healthy and accessible.
Performs a lightweight HEAD request on the bucket to verify connectivity.
Returns:
True if healthy, False otherwise
"""
try:
self.client.head_bucket(Bucket=self.bucket)
return True
except ClientError as e:
logger.warning(f"Storage health check failed: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error during storage health check: {e}")
return False
def get_verified(self, s3_key: str, expected_hash: str) -> bytes:
"""
Download and verify content matches expected SHA256 hash.
This method downloads the entire content, computes its hash, and
verifies it matches the expected hash before returning.
Args:
s3_key: The S3 storage key of the file
expected_hash: Expected SHA256 hash (64 hex characters)
Returns:
File content as bytes (only if verification passes)
Raises:
ChecksumMismatchError: If computed hash doesn't match expected
ClientError: If S3 operation fails
"""
if not is_valid_sha256(expected_hash):
raise ValueError(f"Invalid SHA256 hash format: {expected_hash}")
content = self.get(s3_key)
actual_hash = compute_sha256(content)
if actual_hash != expected_hash.lower():
raise ChecksumMismatchError(
expected=expected_hash.lower(),
actual=actual_hash,
s3_key=s3_key,
size=len(content),
)
logger.debug(f"Verification passed for {s3_key}: {actual_hash[:16]}...")
return content
def get_stream_verified(
self,
s3_key: str,
expected_hash: str,
range_header: Optional[str] = None,
) -> Tuple[VerifyingStreamWrapper, int, Optional[str]]:
"""
Get a verifying stream wrapper for an object.
Returns a wrapper that computes the hash as chunks are read and
can verify after streaming completes. Note that verification happens
AFTER content has been streamed to the client.
IMPORTANT: For range requests, verification is not supported because
we cannot verify a partial download against the full file hash.
Args:
s3_key: The S3 storage key of the file
expected_hash: Expected SHA256 hash (64 hex characters)
range_header: Optional HTTP Range header (verification disabled if set)
Returns:
Tuple of (VerifyingStreamWrapper, content_length, content_range)
The wrapper has a verify() method to call after streaming.
Raises:
ValueError: If expected_hash is invalid format
ClientError: If S3 operation fails
"""
if not is_valid_sha256(expected_hash):
raise ValueError(f"Invalid SHA256 hash format: {expected_hash}")
# Get the S3 stream
stream, content_length, content_range = self.get_stream(s3_key, range_header)
# For range requests, we cannot verify (partial content)
# Return a HashingStreamWrapper that just tracks bytes without verification
if range_header or content_range:
logger.debug(
f"Range request for {s3_key} - verification disabled (partial content)"
)
# Return a basic hashing wrapper (caller should not verify)
hashing_wrapper = HashingStreamWrapper(stream)
return hashing_wrapper, content_length, content_range
# Create verifying wrapper
verifying_wrapper = VerifyingStreamWrapper(
stream=stream,
expected_hash=expected_hash,
s3_key=s3_key,
)
return verifying_wrapper, content_length, content_range
def verify_integrity(self, s3_key: str, expected_sha256: str) -> bool:
"""
Verify the integrity of a stored object by downloading and re-hashing.
This is an expensive operation and should only be used for critical
verification scenarios.
Args:
s3_key: The storage key of the file
expected_sha256: The expected SHA256 hash
Returns:
True if hash matches, False otherwise
"""
try:
content = self.get(s3_key)
actual_hash = hashlib.sha256(content).hexdigest()
if actual_hash != expected_sha256:
logger.error(
f"Integrity verification failed for {s3_key}: "
f"expected {expected_sha256[:12]}..., got {actual_hash[:12]}..."
)
return False
return True
except Exception as e:
logger.error(f"Error during integrity verification for {s3_key}: {e}")
return False
# Singleton instance
_storage: Optional[S3Storage] = None
def get_storage() -> StorageBackend:
"""
Get the configured storage backend instance.
Currently returns S3Storage (works with S3-compatible backends like MinIO).
Future implementations may support backend selection via configuration.
Returns:
StorageBackend instance
"""
global _storage
if _storage is None:
_storage = S3Storage()
return _storage