From c184272cecd17d1db23b0a1486852aceb0d13cd8 Mon Sep 17 00:00:00 2001 From: Mondo Diaz Date: Tue, 6 Jan 2026 15:31:59 -0600 Subject: [PATCH] Upload workflow enhancements: S3 verification, timing, client checksum support (#19) - Add S3 object verification after upload (size validation before DB commit) - Add cleanup of S3 objects if DB commit fails - Record upload duration_ms and user_agent - Support X-Checksum-SHA256 header for client-side checksum verification - Add already_existed flag to StorageResult for deduplication tracking - Add status, error_message, client_checksum columns to Upload model - Add UploadLock model for future 409 conflict detection - Add consistency-check admin endpoint for detecting orphaned S3 objects - Add migration 005_upload_enhancements.sql --- backend/app/models.py | 34 ++++ backend/app/routes.py | 216 ++++++++++++++++++++++++- backend/app/schemas.py | 13 ++ backend/app/storage.py | 8 + migrations/005_upload_enhancements.sql | 83 ++++++++++ 5 files changed, 350 insertions(+), 4 deletions(-) create mode 100644 migrations/005_upload_enhancements.sql diff --git a/backend/app/models.py b/backend/app/models.py index 470ac2f..37f23ef 100644 --- a/backend/app/models.py +++ b/backend/app/models.py @@ -208,6 +208,11 @@ class Upload(Base): duration_ms = Column(Integer) # Upload timing in milliseconds deduplicated = Column(Boolean, default=False) # Whether artifact was deduplicated checksum_verified = Column(Boolean, default=True) # Whether checksum was verified + status = Column( + String(20), default="completed", nullable=False + ) # pending, completed, failed + error_message = Column(Text) # Error details for failed uploads + client_checksum = Column(String(64)) # Client-provided SHA256 for verification uploaded_at = Column(DateTime(timezone=True), default=datetime.utcnow) uploaded_by = Column(String(255), nullable=False) source_ip = Column(String(45)) @@ -221,6 +226,35 @@ class Upload(Base): Index("idx_uploads_uploaded_at", "uploaded_at"), Index("idx_uploads_package_uploaded_at", "package_id", "uploaded_at"), Index("idx_uploads_uploaded_by_at", "uploaded_by", "uploaded_at"), + Index("idx_uploads_status", "status"), + Index("idx_uploads_status_uploaded_at", "status", "uploaded_at"), + CheckConstraint( + "status IN ('pending', 'completed', 'failed')", name="check_upload_status" + ), + ) + + +class UploadLock(Base): + """Track in-progress uploads for conflict detection (409 responses).""" + + __tablename__ = "upload_locks" + + id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) + sha256_hash = Column(String(64), nullable=False) + package_id = Column( + UUID(as_uuid=True), + ForeignKey("packages.id", ondelete="CASCADE"), + nullable=False, + ) + locked_at = Column(DateTime(timezone=True), default=datetime.utcnow) + locked_by = Column(String(255), nullable=False) + expires_at = Column(DateTime(timezone=True), nullable=False) + + __table_args__ = ( + Index("idx_upload_locks_expires_at", "expires_at"), + Index( + "idx_upload_locks_hash_package", "sha256_hash", "package_id", unique=True + ), ) diff --git a/backend/app/routes.py b/backend/app/routes.py index f2c90b2..39281e7 100644 --- a/backend/app/routes.py +++ b/backend/app/routes.py @@ -39,6 +39,7 @@ from .models import ( Tag, TagHistory, Upload, + UploadLock, Consumer, AuditLog, ) @@ -82,6 +83,7 @@ from .schemas import ( PresignedUrlResponse, GarbageCollectionResponse, OrphanedArtifactResponse, + ConsistencyCheckResponse, StorageStatsResponse, DeduplicationStatsResponse, ProjectStatsResponse, @@ -121,6 +123,7 @@ def get_user_id(request: Request) -> str: import logging +import time logger = logging.getLogger(__name__) @@ -1138,8 +1141,20 @@ def upload_artifact( db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), content_length: Optional[int] = Header(None, alias="Content-Length"), + user_agent: Optional[str] = Header(None, alias="User-Agent"), + client_checksum: Optional[str] = Header(None, alias="X-Checksum-SHA256"), ): + """ + Upload an artifact to a package. + + Headers: + - X-Checksum-SHA256: Optional client-provided SHA256 for verification + - User-Agent: Captured for audit purposes + """ + start_time = time.time() user_id = get_user_id(request) + settings = get_settings() + storage_result = None # Get project and package project = db.query(Project).filter(Project.name == project_name).first() @@ -1155,7 +1170,6 @@ def upload_artifact( raise HTTPException(status_code=404, detail="Package not found") # Validate file size - settings = get_settings() if content_length is not None: if content_length > settings.max_file_size: raise HTTPException( @@ -1168,6 +1182,17 @@ def upload_artifact( detail="Empty files are not allowed", ) + # Validate client checksum format if provided + if client_checksum: + client_checksum = client_checksum.lower().strip() + if len(client_checksum) != 64 or not all( + c in "0123456789abcdef" for c in client_checksum + ): + raise HTTPException( + status_code=400, + detail="Invalid X-Checksum-SHA256 header. Must be 64 hex characters.", + ) + # Extract format-specific metadata before storing file_metadata = {} if file.filename: @@ -1224,6 +1249,55 @@ def upload_artifact( logger.error(f"Storage error during upload: {e}") raise HTTPException(status_code=500, detail="Internal storage error") + # Verify client-provided checksum if present + checksum_verified = True + if client_checksum and client_checksum != storage_result.sha256: + # Checksum mismatch - clean up S3 object if it was newly uploaded + logger.warning( + f"Client checksum mismatch: expected {client_checksum}, got {storage_result.sha256}" + ) + # Attempt cleanup of the uploaded object + try: + if not storage_result.already_existed: + storage.delete(storage_result.s3_key) + logger.info( + f"Cleaned up S3 object after checksum mismatch: {storage_result.s3_key}" + ) + except Exception as cleanup_error: + logger.error( + f"Failed to clean up S3 object after checksum mismatch: {cleanup_error}" + ) + raise HTTPException( + status_code=422, + detail=f"Checksum verification failed. Expected {client_checksum}, got {storage_result.sha256}", + ) + + # Verify S3 object exists and size matches before proceeding + try: + s3_info = storage.get_object_info(storage_result.s3_key) + if s3_info is None: + raise HTTPException( + status_code=500, + detail="Failed to verify uploaded object in storage", + ) + if s3_info.get("size") != storage_result.size: + logger.error( + f"Size mismatch after upload: expected {storage_result.size}, " + f"got {s3_info.get('size')}" + ) + raise HTTPException( + status_code=500, + detail="Upload verification failed: size mismatch", + ) + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to verify S3 object: {e}") + raise HTTPException( + status_code=500, + detail="Failed to verify uploaded object", + ) + # Check if this is a deduplicated upload deduplicated = False saved_bytes = 0 @@ -1275,14 +1349,23 @@ def upload_artifact( ) db.add(artifact) - # Record upload + # Calculate upload duration + duration_ms = int((time.time() - start_time) * 1000) + + # Record upload with enhanced metadata upload = Upload( artifact_id=storage_result.sha256, package_id=package.id, original_name=file.filename, + tag_name=tag, + user_agent=user_agent[:512] if user_agent else None, # Truncate if too long + duration_ms=duration_ms, + deduplicated=deduplicated, + checksum_verified=checksum_verified, + client_checksum=client_checksum, + status="completed", uploaded_by=user_id, source_ip=request.client.host if request.client else None, - deduplicated=deduplicated, ) db.add(upload) db.flush() # Flush to get upload ID @@ -1311,10 +1394,32 @@ def upload_artifact( "deduplicated": deduplicated, "saved_bytes": saved_bytes, "tag": tag, + "duration_ms": duration_ms, + "client_checksum_provided": client_checksum is not None, }, ) - db.commit() + # Commit with cleanup on failure + try: + db.commit() + except Exception as commit_error: + logger.error(f"Database commit failed after upload: {commit_error}") + db.rollback() + # Attempt to clean up newly uploaded S3 object + if storage_result and not storage_result.already_existed: + try: + storage.delete(storage_result.s3_key) + logger.info( + f"Cleaned up S3 object after commit failure: {storage_result.s3_key}" + ) + except Exception as cleanup_error: + logger.error( + f"Failed to clean up S3 object after commit failure: {cleanup_error}" + ) + raise HTTPException( + status_code=500, + detail="Failed to save upload record. Please retry.", + ) return UploadResponse( artifact_id=storage_result.sha256, @@ -2597,6 +2702,109 @@ def garbage_collect( ) +@router.get( + "/api/v1/admin/consistency-check", + response_model=ConsistencyCheckResponse, +) +def check_consistency( + limit: int = Query( + default=100, ge=1, le=1000, description="Max items to report per category" + ), + db: Session = Depends(get_db), + storage: S3Storage = Depends(get_storage), +): + """ + Check consistency between database records and S3 storage. + + Reports: + - Orphaned S3 objects (in S3 but not in database) + - Missing S3 objects (in database but not in S3) + - Size mismatches (database size != S3 size) + + This is a read-only operation. Use garbage-collect to clean up issues. + """ + orphaned_s3_keys = [] + missing_s3_keys = [] + size_mismatches = [] + + # Get all artifacts from database + artifacts = db.query(Artifact).all() + total_checked = len(artifacts) + + # Check each artifact exists in S3 and sizes match + for artifact in artifacts: + try: + s3_info = storage.get_object_info(artifact.s3_key) + if s3_info is None: + if len(missing_s3_keys) < limit: + missing_s3_keys.append(artifact.s3_key) + else: + s3_size = s3_info.get("ContentLength", 0) + if s3_size != artifact.size: + if len(size_mismatches) < limit: + size_mismatches.append( + { + "artifact_id": artifact.id, + "s3_key": artifact.s3_key, + "db_size": artifact.size, + "s3_size": s3_size, + } + ) + except Exception as e: + logger.error(f"Error checking S3 object {artifact.s3_key}: {e}") + if len(missing_s3_keys) < limit: + missing_s3_keys.append(artifact.s3_key) + + # Check for orphaned S3 objects (objects in S3 bucket but not in database) + # Note: This is expensive for large buckets, so we limit the scan + try: + # List objects in the fruits/ prefix (where artifacts are stored) + paginator = storage.client.get_paginator("list_objects_v2") + artifact_ids_in_db = {a.id for a in artifacts} + + objects_checked = 0 + for page in paginator.paginate( + Bucket=storage.bucket, Prefix="fruits/", MaxKeys=1000 + ): + if "Contents" not in page: + break + for obj in page["Contents"]: + objects_checked += 1 + # Extract hash from key: fruits/ab/cd/abcdef... + key = obj["Key"] + parts = key.split("/") + if len(parts) == 4 and parts[0] == "fruits": + sha256_hash = parts[3] + if sha256_hash not in artifact_ids_in_db: + if len(orphaned_s3_keys) < limit: + orphaned_s3_keys.append(key) + + # Limit total objects checked + if objects_checked >= 10000: + break + if objects_checked >= 10000: + break + except Exception as e: + logger.error(f"Error listing S3 objects for consistency check: {e}") + + healthy = ( + len(orphaned_s3_keys) == 0 + and len(missing_s3_keys) == 0 + and len(size_mismatches) == 0 + ) + + return ConsistencyCheckResponse( + total_artifacts_checked=total_checked, + orphaned_s3_objects=len(orphaned_s3_keys), + missing_s3_objects=len(missing_s3_keys), + size_mismatches=len(size_mismatches), + healthy=healthy, + orphaned_s3_keys=orphaned_s3_keys, + missing_s3_keys=missing_s3_keys, + size_mismatch_artifacts=size_mismatches, + ) + + # ============================================================================= # Statistics Endpoints (ISSUE 34) # ============================================================================= diff --git a/backend/app/schemas.py b/backend/app/schemas.py index 89d24c2..2f66c49 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -548,6 +548,19 @@ class StorageStatsResponse(BaseModel): storage_saved_bytes: int # Bytes saved through deduplication +class ConsistencyCheckResponse(BaseModel): + """Result of S3/Database consistency check""" + + total_artifacts_checked: int + orphaned_s3_objects: int # Objects in S3 but not in DB + missing_s3_objects: int # Records in DB but not in S3 + size_mismatches: int # Records where DB size != S3 size + healthy: bool + orphaned_s3_keys: List[str] = [] # Limited list of orphaned S3 keys + missing_s3_keys: List[str] = [] # Limited list of missing S3 keys + size_mismatch_artifacts: List[Dict[str, Any]] = [] # Limited list of mismatches + + class DeduplicationStatsResponse(BaseModel): """Deduplication effectiveness statistics""" diff --git a/backend/app/storage.py b/backend/app/storage.py index 99b4783..440dbaf 100644 --- a/backend/app/storage.py +++ b/backend/app/storage.py @@ -202,6 +202,9 @@ class StorageResult(NamedTuple): md5: Optional[str] = None sha1: Optional[str] = None s3_etag: Optional[str] = None + already_existed: bool = ( + False # True if artifact was deduplicated (S3 object already existed) + ) class S3StorageUnavailableError(StorageError): @@ -354,6 +357,7 @@ class S3Storage: md5=md5_hash, sha1=sha1_hash, s3_etag=s3_etag, + already_existed=exists, ) def _store_multipart(self, file: BinaryIO, content_length: int) -> StorageResult: @@ -433,6 +437,7 @@ class S3Storage: md5=md5_hash, sha1=sha1_hash, s3_etag=s3_etag, + already_existed=True, ) # Seek back to start for upload @@ -486,6 +491,7 @@ class S3Storage: md5=md5_hash, sha1=sha1_hash, s3_etag=s3_etag, + already_existed=False, ) except Exception as e: @@ -535,6 +541,7 @@ class S3Storage: md5=md5_hash, sha1=sha1_hash, s3_etag=s3_etag, + already_existed=True, ) # Upload based on size @@ -615,6 +622,7 @@ class S3Storage: md5=md5_hash, sha1=sha1_hash, s3_etag=s3_etag, + already_existed=False, ) def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]: diff --git a/migrations/005_upload_enhancements.sql b/migrations/005_upload_enhancements.sql new file mode 100644 index 0000000..b1706e6 --- /dev/null +++ b/migrations/005_upload_enhancements.sql @@ -0,0 +1,83 @@ +-- Migration 005: Upload Workflow Enhancements +-- Adds status tracking and error handling for uploads + +-- ============================================ +-- Add status column to uploads table +-- ============================================ +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'uploads' AND column_name = 'status') THEN + ALTER TABLE uploads ADD COLUMN status VARCHAR(20) DEFAULT 'completed' NOT NULL; + END IF; +END $$; + +-- ============================================ +-- Add error_message column for failed uploads +-- ============================================ +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'uploads' AND column_name = 'error_message') THEN + ALTER TABLE uploads ADD COLUMN error_message TEXT; + END IF; +END $$; + +-- ============================================ +-- Add client_checksum column for verification +-- ============================================ +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.columns + WHERE table_name = 'uploads' AND column_name = 'client_checksum') THEN + ALTER TABLE uploads ADD COLUMN client_checksum VARCHAR(64); + END IF; +END $$; + +-- ============================================ +-- Add indexes for upload status queries +-- ============================================ +CREATE INDEX IF NOT EXISTS idx_uploads_status ON uploads(status); +CREATE INDEX IF NOT EXISTS idx_uploads_status_uploaded_at ON uploads(status, uploaded_at); + +-- ============================================ +-- Add constraint to validate status values +-- ============================================ +DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM information_schema.constraint_column_usage + WHERE constraint_name = 'check_upload_status') THEN + ALTER TABLE uploads ADD CONSTRAINT check_upload_status + CHECK (status IN ('pending', 'completed', 'failed')); + END IF; +END $$; + +-- ============================================ +-- Create table for tracking in-progress uploads (for 409 conflict detection) +-- ============================================ +CREATE TABLE IF NOT EXISTS upload_locks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + sha256_hash VARCHAR(64) NOT NULL, + package_id UUID NOT NULL REFERENCES packages(id) ON DELETE CASCADE, + locked_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + locked_by VARCHAR(255) NOT NULL, + expires_at TIMESTAMP WITH TIME ZONE NOT NULL, + UNIQUE(sha256_hash, package_id) +); + +CREATE INDEX IF NOT EXISTS idx_upload_locks_expires_at ON upload_locks(expires_at); +CREATE INDEX IF NOT EXISTS idx_upload_locks_hash_package ON upload_locks(sha256_hash, package_id); + +-- ============================================ +-- Function to clean up expired upload locks +-- ============================================ +CREATE OR REPLACE FUNCTION cleanup_expired_upload_locks() +RETURNS INTEGER AS $$ +DECLARE + deleted_count INTEGER; +BEGIN + DELETE FROM upload_locks WHERE expires_at < NOW(); + GET DIAGNOSTICS deleted_count = ROW_COUNT; + RETURN deleted_count; +END; +$$ LANGUAGE plpgsql;