Add S3 configuration options and improved error handling
- Add s3_verify_ssl config option for SSL/TLS verification - Add s3_connect_timeout and s3_read_timeout config options - Add s3_max_retries config option with adaptive retry mode - Add S3StorageUnavailableError for backend availability issues - Add HashCollisionError for detecting extremely rare hash collisions - Add hash collision detection by comparing file sizes on dedup - Handle network interruption and timeout errors explicitly - Update routes.py to handle new exception types with appropriate HTTP codes
This commit is contained in:
@@ -22,7 +22,9 @@ class Settings(BaseSettings):
|
|||||||
database_pool_size: int = 5 # Number of connections to keep open
|
database_pool_size: int = 5 # Number of connections to keep open
|
||||||
database_max_overflow: int = 10 # Max additional connections beyond pool_size
|
database_max_overflow: int = 10 # Max additional connections beyond pool_size
|
||||||
database_pool_timeout: int = 30 # Seconds to wait for a connection from pool
|
database_pool_timeout: int = 30 # Seconds to wait for a connection from pool
|
||||||
database_pool_recycle: int = 1800 # Recycle connections after this many seconds (30 min)
|
database_pool_recycle: int = (
|
||||||
|
1800 # Recycle connections after this many seconds (30 min)
|
||||||
|
)
|
||||||
|
|
||||||
# S3
|
# S3
|
||||||
s3_endpoint: str = ""
|
s3_endpoint: str = ""
|
||||||
@@ -31,10 +33,16 @@ class Settings(BaseSettings):
|
|||||||
s3_access_key_id: str = ""
|
s3_access_key_id: str = ""
|
||||||
s3_secret_access_key: str = ""
|
s3_secret_access_key: str = ""
|
||||||
s3_use_path_style: bool = True
|
s3_use_path_style: bool = True
|
||||||
|
s3_verify_ssl: bool = True # Set to False for self-signed certs (dev only)
|
||||||
|
s3_connect_timeout: int = 10 # Connection timeout in seconds
|
||||||
|
s3_read_timeout: int = 60 # Read timeout in seconds
|
||||||
|
s3_max_retries: int = 3 # Max retry attempts for transient failures
|
||||||
|
|
||||||
# Download settings
|
# Download settings
|
||||||
download_mode: str = "presigned" # "presigned", "redirect", or "proxy"
|
download_mode: str = "presigned" # "presigned", "redirect", or "proxy"
|
||||||
presigned_url_expiry: int = 3600 # Presigned URL expiry in seconds (default: 1 hour)
|
presigned_url_expiry: int = (
|
||||||
|
3600 # Presigned URL expiry in seconds (default: 1 hour)
|
||||||
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def database_url(self) -> str:
|
def database_url(self) -> str:
|
||||||
|
|||||||
@@ -29,6 +29,8 @@ from .storage import (
|
|||||||
HashComputationError,
|
HashComputationError,
|
||||||
S3ExistenceCheckError,
|
S3ExistenceCheckError,
|
||||||
S3UploadError,
|
S3UploadError,
|
||||||
|
S3StorageUnavailableError,
|
||||||
|
HashCollisionError,
|
||||||
)
|
)
|
||||||
from .models import (
|
from .models import (
|
||||||
Project,
|
Project,
|
||||||
@@ -998,6 +1000,19 @@ def upload_artifact(
|
|||||||
status_code=503,
|
status_code=503,
|
||||||
detail="Storage service temporarily unavailable. Please retry.",
|
detail="Storage service temporarily unavailable. Please retry.",
|
||||||
)
|
)
|
||||||
|
except S3StorageUnavailableError as e:
|
||||||
|
logger.error(f"S3 storage unavailable: {e}")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=503,
|
||||||
|
detail="Storage backend is unavailable. Please retry later.",
|
||||||
|
)
|
||||||
|
except HashCollisionError as e:
|
||||||
|
# This is extremely rare - log critical alert
|
||||||
|
logger.critical(f"HASH COLLISION DETECTED: {e}")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=500,
|
||||||
|
detail="Data integrity error detected. Please contact support.",
|
||||||
|
)
|
||||||
except StorageError as e:
|
except StorageError as e:
|
||||||
logger.error(f"Storage error during upload: {e}")
|
logger.error(f"Storage error during upload: {e}")
|
||||||
raise HTTPException(status_code=500, detail="Internal storage error")
|
raise HTTPException(status_code=500, detail="Internal storage error")
|
||||||
|
|||||||
@@ -14,7 +14,13 @@ from typing import (
|
|||||||
)
|
)
|
||||||
import boto3
|
import boto3
|
||||||
from botocore.config import Config
|
from botocore.config import Config
|
||||||
from botocore.exceptions import ClientError
|
from botocore.exceptions import (
|
||||||
|
ClientError,
|
||||||
|
ConnectionError as BotoConnectionError,
|
||||||
|
EndpointConnectionError,
|
||||||
|
ReadTimeoutError,
|
||||||
|
ConnectTimeoutError,
|
||||||
|
)
|
||||||
|
|
||||||
from .config import get_settings
|
from .config import get_settings
|
||||||
|
|
||||||
@@ -193,10 +199,33 @@ class StorageResult(NamedTuple):
|
|||||||
s3_etag: Optional[str] = None
|
s3_etag: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
|
class S3StorageUnavailableError(StorageError):
|
||||||
|
"""Raised when S3 storage backend is unavailable"""
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class HashCollisionError(StorageError):
|
||||||
|
"""Raised when a hash collision is detected (extremely rare)"""
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class S3Storage:
|
class S3Storage:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
|
# Build config with retry and timeout settings
|
||||||
|
s3_config = {}
|
||||||
|
if settings.s3_use_path_style:
|
||||||
|
s3_config["addressing_style"] = "path"
|
||||||
|
|
||||||
config = Config(
|
config = Config(
|
||||||
s3={"addressing_style": "path"} if settings.s3_use_path_style else {}
|
s3=s3_config if s3_config else None,
|
||||||
|
connect_timeout=settings.s3_connect_timeout,
|
||||||
|
read_timeout=settings.s3_read_timeout,
|
||||||
|
retries={
|
||||||
|
"max_attempts": settings.s3_max_retries,
|
||||||
|
"mode": "adaptive", # Adaptive retry mode for better handling
|
||||||
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
self.client = boto3.client(
|
self.client = boto3.client(
|
||||||
@@ -206,6 +235,7 @@ class S3Storage:
|
|||||||
aws_access_key_id=settings.s3_access_key_id,
|
aws_access_key_id=settings.s3_access_key_id,
|
||||||
aws_secret_access_key=settings.s3_secret_access_key,
|
aws_secret_access_key=settings.s3_secret_access_key,
|
||||||
config=config,
|
config=config,
|
||||||
|
verify=settings.s3_verify_ssl, # SSL/TLS verification
|
||||||
)
|
)
|
||||||
self.bucket = settings.s3_bucket
|
self.bucket = settings.s3_bucket
|
||||||
# Store active multipart uploads for resumable support
|
# Store active multipart uploads for resumable support
|
||||||
@@ -271,14 +301,38 @@ class S3Storage:
|
|||||||
Body=content,
|
Body=content,
|
||||||
)
|
)
|
||||||
s3_etag = response.get("ETag", "").strip('"')
|
s3_etag = response.get("ETag", "").strip('"')
|
||||||
|
except (EndpointConnectionError, BotoConnectionError) as e:
|
||||||
|
logger.error(f"S3 storage unavailable: {e}")
|
||||||
|
raise S3StorageUnavailableError(
|
||||||
|
f"Storage backend unavailable: {e}"
|
||||||
|
) from e
|
||||||
|
except (ReadTimeoutError, ConnectTimeoutError) as e:
|
||||||
|
logger.error(f"S3 operation timed out: {e}")
|
||||||
|
raise S3UploadError(f"Upload timed out: {e}") from e
|
||||||
except ClientError as e:
|
except ClientError as e:
|
||||||
|
error_code = e.response.get("Error", {}).get("Code", "")
|
||||||
|
if error_code == "ServiceUnavailable":
|
||||||
|
logger.error(f"S3 service unavailable: {e}")
|
||||||
|
raise S3StorageUnavailableError(
|
||||||
|
f"Storage service unavailable: {e}"
|
||||||
|
) from e
|
||||||
logger.error(f"S3 upload failed: {e}")
|
logger.error(f"S3 upload failed: {e}")
|
||||||
raise S3UploadError(f"Failed to upload to S3: {e}") from e
|
raise S3UploadError(f"Failed to upload to S3: {e}") from e
|
||||||
else:
|
else:
|
||||||
# Get existing ETag
|
# Get existing ETag and verify integrity (detect potential hash collision)
|
||||||
obj_info = self.get_object_info(s3_key)
|
obj_info = self.get_object_info(s3_key)
|
||||||
if obj_info:
|
if obj_info:
|
||||||
s3_etag = obj_info.get("etag", "").strip('"')
|
s3_etag = obj_info.get("etag", "").strip('"')
|
||||||
|
# Check for hash collision by comparing size
|
||||||
|
existing_size = obj_info.get("size", 0)
|
||||||
|
if existing_size != size:
|
||||||
|
logger.critical(
|
||||||
|
f"HASH COLLISION DETECTED! Hash {sha256_hash} has size mismatch: "
|
||||||
|
f"existing={existing_size}, new={size}. This is extremely rare."
|
||||||
|
)
|
||||||
|
raise HashCollisionError(
|
||||||
|
f"Hash collision detected for {sha256_hash}: size mismatch"
|
||||||
|
)
|
||||||
|
|
||||||
return StorageResult(
|
return StorageResult(
|
||||||
sha256=sha256_hash,
|
sha256=sha256_hash,
|
||||||
@@ -341,6 +395,17 @@ class S3Storage:
|
|||||||
if exists:
|
if exists:
|
||||||
obj_info = self.get_object_info(s3_key)
|
obj_info = self.get_object_info(s3_key)
|
||||||
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
|
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
|
||||||
|
# Check for hash collision by comparing size
|
||||||
|
if obj_info:
|
||||||
|
existing_size = obj_info.get("size", 0)
|
||||||
|
if existing_size != size:
|
||||||
|
logger.critical(
|
||||||
|
f"HASH COLLISION DETECTED! Hash {sha256_hash} has size mismatch: "
|
||||||
|
f"existing={existing_size}, new={size}. This is extremely rare."
|
||||||
|
)
|
||||||
|
raise HashCollisionError(
|
||||||
|
f"Hash collision detected for {sha256_hash}: size mismatch"
|
||||||
|
)
|
||||||
return StorageResult(
|
return StorageResult(
|
||||||
sha256=sha256_hash,
|
sha256=sha256_hash,
|
||||||
size=size,
|
size=size,
|
||||||
@@ -354,7 +419,11 @@ class S3Storage:
|
|||||||
file.seek(0)
|
file.seek(0)
|
||||||
|
|
||||||
# Start multipart upload
|
# Start multipart upload
|
||||||
|
try:
|
||||||
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
|
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
|
||||||
|
except (EndpointConnectionError, BotoConnectionError) as e:
|
||||||
|
logger.error(f"S3 storage unavailable for multipart upload: {e}")
|
||||||
|
raise S3StorageUnavailableError(f"Storage backend unavailable: {e}") from e
|
||||||
upload_id = mpu["UploadId"]
|
upload_id = mpu["UploadId"]
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
|||||||
Reference in New Issue
Block a user