Upload workflow enhancements: S3 verification, timing, client checksum support (#19)

- Add S3 object verification after upload (size validation before DB commit)
- Add cleanup of S3 objects if DB commit fails
- Record upload duration_ms and user_agent
- Support X-Checksum-SHA256 header for client-side checksum verification
- Add already_existed flag to StorageResult for deduplication tracking
- Add status, error_message, client_checksum columns to Upload model
- Add UploadLock model for future 409 conflict detection
- Add consistency-check admin endpoint for detecting orphaned S3 objects
- Add migration 005_upload_enhancements.sql
This commit is contained in:
Mondo Diaz
2026-01-06 15:31:59 -06:00
parent 3056747f39
commit c184272cec
5 changed files with 350 additions and 4 deletions

View File

@@ -208,6 +208,11 @@ class Upload(Base):
duration_ms = Column(Integer) # Upload timing in milliseconds duration_ms = Column(Integer) # Upload timing in milliseconds
deduplicated = Column(Boolean, default=False) # Whether artifact was deduplicated deduplicated = Column(Boolean, default=False) # Whether artifact was deduplicated
checksum_verified = Column(Boolean, default=True) # Whether checksum was verified checksum_verified = Column(Boolean, default=True) # Whether checksum was verified
status = Column(
String(20), default="completed", nullable=False
) # pending, completed, failed
error_message = Column(Text) # Error details for failed uploads
client_checksum = Column(String(64)) # Client-provided SHA256 for verification
uploaded_at = Column(DateTime(timezone=True), default=datetime.utcnow) uploaded_at = Column(DateTime(timezone=True), default=datetime.utcnow)
uploaded_by = Column(String(255), nullable=False) uploaded_by = Column(String(255), nullable=False)
source_ip = Column(String(45)) source_ip = Column(String(45))
@@ -221,6 +226,35 @@ class Upload(Base):
Index("idx_uploads_uploaded_at", "uploaded_at"), Index("idx_uploads_uploaded_at", "uploaded_at"),
Index("idx_uploads_package_uploaded_at", "package_id", "uploaded_at"), Index("idx_uploads_package_uploaded_at", "package_id", "uploaded_at"),
Index("idx_uploads_uploaded_by_at", "uploaded_by", "uploaded_at"), Index("idx_uploads_uploaded_by_at", "uploaded_by", "uploaded_at"),
Index("idx_uploads_status", "status"),
Index("idx_uploads_status_uploaded_at", "status", "uploaded_at"),
CheckConstraint(
"status IN ('pending', 'completed', 'failed')", name="check_upload_status"
),
)
class UploadLock(Base):
"""Track in-progress uploads for conflict detection (409 responses)."""
__tablename__ = "upload_locks"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
sha256_hash = Column(String(64), nullable=False)
package_id = Column(
UUID(as_uuid=True),
ForeignKey("packages.id", ondelete="CASCADE"),
nullable=False,
)
locked_at = Column(DateTime(timezone=True), default=datetime.utcnow)
locked_by = Column(String(255), nullable=False)
expires_at = Column(DateTime(timezone=True), nullable=False)
__table_args__ = (
Index("idx_upload_locks_expires_at", "expires_at"),
Index(
"idx_upload_locks_hash_package", "sha256_hash", "package_id", unique=True
),
) )

View File

@@ -39,6 +39,7 @@ from .models import (
Tag, Tag,
TagHistory, TagHistory,
Upload, Upload,
UploadLock,
Consumer, Consumer,
AuditLog, AuditLog,
) )
@@ -82,6 +83,7 @@ from .schemas import (
PresignedUrlResponse, PresignedUrlResponse,
GarbageCollectionResponse, GarbageCollectionResponse,
OrphanedArtifactResponse, OrphanedArtifactResponse,
ConsistencyCheckResponse,
StorageStatsResponse, StorageStatsResponse,
DeduplicationStatsResponse, DeduplicationStatsResponse,
ProjectStatsResponse, ProjectStatsResponse,
@@ -121,6 +123,7 @@ def get_user_id(request: Request) -> str:
import logging import logging
import time
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -1138,8 +1141,20 @@ def upload_artifact(
db: Session = Depends(get_db), db: Session = Depends(get_db),
storage: S3Storage = Depends(get_storage), storage: S3Storage = Depends(get_storage),
content_length: Optional[int] = Header(None, alias="Content-Length"), content_length: Optional[int] = Header(None, alias="Content-Length"),
user_agent: Optional[str] = Header(None, alias="User-Agent"),
client_checksum: Optional[str] = Header(None, alias="X-Checksum-SHA256"),
): ):
"""
Upload an artifact to a package.
Headers:
- X-Checksum-SHA256: Optional client-provided SHA256 for verification
- User-Agent: Captured for audit purposes
"""
start_time = time.time()
user_id = get_user_id(request) user_id = get_user_id(request)
settings = get_settings()
storage_result = None
# Get project and package # Get project and package
project = db.query(Project).filter(Project.name == project_name).first() project = db.query(Project).filter(Project.name == project_name).first()
@@ -1155,7 +1170,6 @@ def upload_artifact(
raise HTTPException(status_code=404, detail="Package not found") raise HTTPException(status_code=404, detail="Package not found")
# Validate file size # Validate file size
settings = get_settings()
if content_length is not None: if content_length is not None:
if content_length > settings.max_file_size: if content_length > settings.max_file_size:
raise HTTPException( raise HTTPException(
@@ -1168,6 +1182,17 @@ def upload_artifact(
detail="Empty files are not allowed", detail="Empty files are not allowed",
) )
# Validate client checksum format if provided
if client_checksum:
client_checksum = client_checksum.lower().strip()
if len(client_checksum) != 64 or not all(
c in "0123456789abcdef" for c in client_checksum
):
raise HTTPException(
status_code=400,
detail="Invalid X-Checksum-SHA256 header. Must be 64 hex characters.",
)
# Extract format-specific metadata before storing # Extract format-specific metadata before storing
file_metadata = {} file_metadata = {}
if file.filename: if file.filename:
@@ -1224,6 +1249,55 @@ def upload_artifact(
logger.error(f"Storage error during upload: {e}") logger.error(f"Storage error during upload: {e}")
raise HTTPException(status_code=500, detail="Internal storage error") raise HTTPException(status_code=500, detail="Internal storage error")
# Verify client-provided checksum if present
checksum_verified = True
if client_checksum and client_checksum != storage_result.sha256:
# Checksum mismatch - clean up S3 object if it was newly uploaded
logger.warning(
f"Client checksum mismatch: expected {client_checksum}, got {storage_result.sha256}"
)
# Attempt cleanup of the uploaded object
try:
if not storage_result.already_existed:
storage.delete(storage_result.s3_key)
logger.info(
f"Cleaned up S3 object after checksum mismatch: {storage_result.s3_key}"
)
except Exception as cleanup_error:
logger.error(
f"Failed to clean up S3 object after checksum mismatch: {cleanup_error}"
)
raise HTTPException(
status_code=422,
detail=f"Checksum verification failed. Expected {client_checksum}, got {storage_result.sha256}",
)
# Verify S3 object exists and size matches before proceeding
try:
s3_info = storage.get_object_info(storage_result.s3_key)
if s3_info is None:
raise HTTPException(
status_code=500,
detail="Failed to verify uploaded object in storage",
)
if s3_info.get("size") != storage_result.size:
logger.error(
f"Size mismatch after upload: expected {storage_result.size}, "
f"got {s3_info.get('size')}"
)
raise HTTPException(
status_code=500,
detail="Upload verification failed: size mismatch",
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to verify S3 object: {e}")
raise HTTPException(
status_code=500,
detail="Failed to verify uploaded object",
)
# Check if this is a deduplicated upload # Check if this is a deduplicated upload
deduplicated = False deduplicated = False
saved_bytes = 0 saved_bytes = 0
@@ -1275,14 +1349,23 @@ def upload_artifact(
) )
db.add(artifact) db.add(artifact)
# Record upload # Calculate upload duration
duration_ms = int((time.time() - start_time) * 1000)
# Record upload with enhanced metadata
upload = Upload( upload = Upload(
artifact_id=storage_result.sha256, artifact_id=storage_result.sha256,
package_id=package.id, package_id=package.id,
original_name=file.filename, original_name=file.filename,
tag_name=tag,
user_agent=user_agent[:512] if user_agent else None, # Truncate if too long
duration_ms=duration_ms,
deduplicated=deduplicated,
checksum_verified=checksum_verified,
client_checksum=client_checksum,
status="completed",
uploaded_by=user_id, uploaded_by=user_id,
source_ip=request.client.host if request.client else None, source_ip=request.client.host if request.client else None,
deduplicated=deduplicated,
) )
db.add(upload) db.add(upload)
db.flush() # Flush to get upload ID db.flush() # Flush to get upload ID
@@ -1311,10 +1394,32 @@ def upload_artifact(
"deduplicated": deduplicated, "deduplicated": deduplicated,
"saved_bytes": saved_bytes, "saved_bytes": saved_bytes,
"tag": tag, "tag": tag,
"duration_ms": duration_ms,
"client_checksum_provided": client_checksum is not None,
}, },
) )
# Commit with cleanup on failure
try:
db.commit() db.commit()
except Exception as commit_error:
logger.error(f"Database commit failed after upload: {commit_error}")
db.rollback()
# Attempt to clean up newly uploaded S3 object
if storage_result and not storage_result.already_existed:
try:
storage.delete(storage_result.s3_key)
logger.info(
f"Cleaned up S3 object after commit failure: {storage_result.s3_key}"
)
except Exception as cleanup_error:
logger.error(
f"Failed to clean up S3 object after commit failure: {cleanup_error}"
)
raise HTTPException(
status_code=500,
detail="Failed to save upload record. Please retry.",
)
return UploadResponse( return UploadResponse(
artifact_id=storage_result.sha256, artifact_id=storage_result.sha256,
@@ -2597,6 +2702,109 @@ def garbage_collect(
) )
@router.get(
"/api/v1/admin/consistency-check",
response_model=ConsistencyCheckResponse,
)
def check_consistency(
limit: int = Query(
default=100, ge=1, le=1000, description="Max items to report per category"
),
db: Session = Depends(get_db),
storage: S3Storage = Depends(get_storage),
):
"""
Check consistency between database records and S3 storage.
Reports:
- Orphaned S3 objects (in S3 but not in database)
- Missing S3 objects (in database but not in S3)
- Size mismatches (database size != S3 size)
This is a read-only operation. Use garbage-collect to clean up issues.
"""
orphaned_s3_keys = []
missing_s3_keys = []
size_mismatches = []
# Get all artifacts from database
artifacts = db.query(Artifact).all()
total_checked = len(artifacts)
# Check each artifact exists in S3 and sizes match
for artifact in artifacts:
try:
s3_info = storage.get_object_info(artifact.s3_key)
if s3_info is None:
if len(missing_s3_keys) < limit:
missing_s3_keys.append(artifact.s3_key)
else:
s3_size = s3_info.get("ContentLength", 0)
if s3_size != artifact.size:
if len(size_mismatches) < limit:
size_mismatches.append(
{
"artifact_id": artifact.id,
"s3_key": artifact.s3_key,
"db_size": artifact.size,
"s3_size": s3_size,
}
)
except Exception as e:
logger.error(f"Error checking S3 object {artifact.s3_key}: {e}")
if len(missing_s3_keys) < limit:
missing_s3_keys.append(artifact.s3_key)
# Check for orphaned S3 objects (objects in S3 bucket but not in database)
# Note: This is expensive for large buckets, so we limit the scan
try:
# List objects in the fruits/ prefix (where artifacts are stored)
paginator = storage.client.get_paginator("list_objects_v2")
artifact_ids_in_db = {a.id for a in artifacts}
objects_checked = 0
for page in paginator.paginate(
Bucket=storage.bucket, Prefix="fruits/", MaxKeys=1000
):
if "Contents" not in page:
break
for obj in page["Contents"]:
objects_checked += 1
# Extract hash from key: fruits/ab/cd/abcdef...
key = obj["Key"]
parts = key.split("/")
if len(parts) == 4 and parts[0] == "fruits":
sha256_hash = parts[3]
if sha256_hash not in artifact_ids_in_db:
if len(orphaned_s3_keys) < limit:
orphaned_s3_keys.append(key)
# Limit total objects checked
if objects_checked >= 10000:
break
if objects_checked >= 10000:
break
except Exception as e:
logger.error(f"Error listing S3 objects for consistency check: {e}")
healthy = (
len(orphaned_s3_keys) == 0
and len(missing_s3_keys) == 0
and len(size_mismatches) == 0
)
return ConsistencyCheckResponse(
total_artifacts_checked=total_checked,
orphaned_s3_objects=len(orphaned_s3_keys),
missing_s3_objects=len(missing_s3_keys),
size_mismatches=len(size_mismatches),
healthy=healthy,
orphaned_s3_keys=orphaned_s3_keys,
missing_s3_keys=missing_s3_keys,
size_mismatch_artifacts=size_mismatches,
)
# ============================================================================= # =============================================================================
# Statistics Endpoints (ISSUE 34) # Statistics Endpoints (ISSUE 34)
# ============================================================================= # =============================================================================

View File

@@ -548,6 +548,19 @@ class StorageStatsResponse(BaseModel):
storage_saved_bytes: int # Bytes saved through deduplication storage_saved_bytes: int # Bytes saved through deduplication
class ConsistencyCheckResponse(BaseModel):
"""Result of S3/Database consistency check"""
total_artifacts_checked: int
orphaned_s3_objects: int # Objects in S3 but not in DB
missing_s3_objects: int # Records in DB but not in S3
size_mismatches: int # Records where DB size != S3 size
healthy: bool
orphaned_s3_keys: List[str] = [] # Limited list of orphaned S3 keys
missing_s3_keys: List[str] = [] # Limited list of missing S3 keys
size_mismatch_artifacts: List[Dict[str, Any]] = [] # Limited list of mismatches
class DeduplicationStatsResponse(BaseModel): class DeduplicationStatsResponse(BaseModel):
"""Deduplication effectiveness statistics""" """Deduplication effectiveness statistics"""

View File

@@ -202,6 +202,9 @@ class StorageResult(NamedTuple):
md5: Optional[str] = None md5: Optional[str] = None
sha1: Optional[str] = None sha1: Optional[str] = None
s3_etag: Optional[str] = None s3_etag: Optional[str] = None
already_existed: bool = (
False # True if artifact was deduplicated (S3 object already existed)
)
class S3StorageUnavailableError(StorageError): class S3StorageUnavailableError(StorageError):
@@ -354,6 +357,7 @@ class S3Storage:
md5=md5_hash, md5=md5_hash,
sha1=sha1_hash, sha1=sha1_hash,
s3_etag=s3_etag, s3_etag=s3_etag,
already_existed=exists,
) )
def _store_multipart(self, file: BinaryIO, content_length: int) -> StorageResult: def _store_multipart(self, file: BinaryIO, content_length: int) -> StorageResult:
@@ -433,6 +437,7 @@ class S3Storage:
md5=md5_hash, md5=md5_hash,
sha1=sha1_hash, sha1=sha1_hash,
s3_etag=s3_etag, s3_etag=s3_etag,
already_existed=True,
) )
# Seek back to start for upload # Seek back to start for upload
@@ -486,6 +491,7 @@ class S3Storage:
md5=md5_hash, md5=md5_hash,
sha1=sha1_hash, sha1=sha1_hash,
s3_etag=s3_etag, s3_etag=s3_etag,
already_existed=False,
) )
except Exception as e: except Exception as e:
@@ -535,6 +541,7 @@ class S3Storage:
md5=md5_hash, md5=md5_hash,
sha1=sha1_hash, sha1=sha1_hash,
s3_etag=s3_etag, s3_etag=s3_etag,
already_existed=True,
) )
# Upload based on size # Upload based on size
@@ -615,6 +622,7 @@ class S3Storage:
md5=md5_hash, md5=md5_hash,
sha1=sha1_hash, sha1=sha1_hash,
s3_etag=s3_etag, s3_etag=s3_etag,
already_existed=False,
) )
def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]: def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]:

View File

@@ -0,0 +1,83 @@
-- Migration 005: Upload Workflow Enhancements
-- Adds status tracking and error handling for uploads
-- ============================================
-- Add status column to uploads table
-- ============================================
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.columns
WHERE table_name = 'uploads' AND column_name = 'status') THEN
ALTER TABLE uploads ADD COLUMN status VARCHAR(20) DEFAULT 'completed' NOT NULL;
END IF;
END $$;
-- ============================================
-- Add error_message column for failed uploads
-- ============================================
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.columns
WHERE table_name = 'uploads' AND column_name = 'error_message') THEN
ALTER TABLE uploads ADD COLUMN error_message TEXT;
END IF;
END $$;
-- ============================================
-- Add client_checksum column for verification
-- ============================================
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.columns
WHERE table_name = 'uploads' AND column_name = 'client_checksum') THEN
ALTER TABLE uploads ADD COLUMN client_checksum VARCHAR(64);
END IF;
END $$;
-- ============================================
-- Add indexes for upload status queries
-- ============================================
CREATE INDEX IF NOT EXISTS idx_uploads_status ON uploads(status);
CREATE INDEX IF NOT EXISTS idx_uploads_status_uploaded_at ON uploads(status, uploaded_at);
-- ============================================
-- Add constraint to validate status values
-- ============================================
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM information_schema.constraint_column_usage
WHERE constraint_name = 'check_upload_status') THEN
ALTER TABLE uploads ADD CONSTRAINT check_upload_status
CHECK (status IN ('pending', 'completed', 'failed'));
END IF;
END $$;
-- ============================================
-- Create table for tracking in-progress uploads (for 409 conflict detection)
-- ============================================
CREATE TABLE IF NOT EXISTS upload_locks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sha256_hash VARCHAR(64) NOT NULL,
package_id UUID NOT NULL REFERENCES packages(id) ON DELETE CASCADE,
locked_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
locked_by VARCHAR(255) NOT NULL,
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
UNIQUE(sha256_hash, package_id)
);
CREATE INDEX IF NOT EXISTS idx_upload_locks_expires_at ON upload_locks(expires_at);
CREATE INDEX IF NOT EXISTS idx_upload_locks_hash_package ON upload_locks(sha256_hash, package_id);
-- ============================================
-- Function to clean up expired upload locks
-- ============================================
CREATE OR REPLACE FUNCTION cleanup_expired_upload_locks()
RETURNS INTEGER AS $$
DECLARE
deleted_count INTEGER;
BEGIN
DELETE FROM upload_locks WHERE expires_at < NOW();
GET DIAGNOSTICS deleted_count = ROW_COUNT;
RETURN deleted_count;
END;
$$ LANGUAGE plpgsql;