From 722e7d2d81141e39aee97eaca4c15530f5dd16ec Mon Sep 17 00:00:00 2001 From: Mondo Diaz Date: Fri, 16 Jan 2026 19:33:31 +0000 Subject: [PATCH] Add large file upload enhancements and tests (#43) - Add upload duration/throughput metrics (duration_ms, throughput_mbps) to response - Add upload progress logging for large files (hash computation and multipart upload) - Add client disconnect handling during uploads with proper cleanup - Add upload progress tracking endpoint GET /upload/{upload_id}/progress - Add large file upload tests (10MB, 100MB, 1GB) - Add upload cancellation and timeout handling tests - Add API documentation for upload endpoints with curl, Python, JavaScript examples --- CHANGELOG.md | 7 + backend/app/routes.py | 191 +++++- backend/app/schemas.py | 18 + backend/app/storage.py | 139 ++++- .../tests/integration/test_large_uploads.py | 550 ++++++++++++++++++ 5 files changed, 894 insertions(+), 11 deletions(-) create mode 100644 backend/tests/integration/test_large_uploads.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 95c8390..427ae5c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added download completion logging with bytes transferred and throughput (#42) - Added client disconnect handling during streaming downloads (#42) - Added streaming download tests: range requests, conditional requests, caching headers, download resume (#42) +- Added upload duration and throughput metrics (`duration_ms`, `throughput_mbps`) to upload response (#43) +- Added upload progress logging for large files (hash computation and multipart upload phases) (#43) +- Added client disconnect handling during uploads with proper cleanup (#43) +- Added upload progress tracking endpoint `GET /upload/{upload_id}/progress` for resumable uploads (#43) +- Added large file upload tests (10MB, 100MB, 1GB) with multipart upload verification (#43) +- Added upload cancellation and timeout handling tests (#43) +- Added comprehensive API documentation for upload endpoints with curl, Python, and JavaScript examples (#43) - Added `package_versions` table for immutable version tracking separate from mutable tags (#56) - Versions are set at upload time via explicit `version` parameter or auto-detected from filename/metadata - Version detection priority: explicit parameter > package metadata > filename pattern diff --git a/backend/app/routes.py b/backend/app/routes.py index f327109..2cb08d3 100644 --- a/backend/app/routes.py +++ b/backend/app/routes.py @@ -82,6 +82,7 @@ from .schemas import ( ResumableUploadCompleteRequest, ResumableUploadCompleteResponse, ResumableUploadStatusResponse, + UploadProgressResponse, GlobalSearchResponse, SearchResultProject, SearchResultPackage, @@ -2283,10 +2284,56 @@ def upload_artifact( """ Upload an artifact to a package. - Headers: - - X-Checksum-SHA256: Optional client-provided SHA256 for verification - - User-Agent: Captured for audit purposes - - Authorization: Bearer for authentication + **Size Limits:** + - Minimum: 1 byte (empty files rejected) + - Maximum: 10GB (configurable via ORCHARD_MAX_FILE_SIZE) + - Files > 100MB automatically use S3 multipart upload + + **Headers:** + - `X-Checksum-SHA256`: Optional SHA256 hash for server-side verification + - `Content-Length`: File size (required for early rejection of oversized files) + - `Authorization`: Bearer for authentication + + **Deduplication:** + Content-addressable storage automatically deduplicates identical files. + If the same content is uploaded multiple times, only one copy is stored. + + **Response Metrics:** + - `duration_ms`: Upload duration in milliseconds + - `throughput_mbps`: Upload throughput in MB/s + - `deduplicated`: True if content already existed + + **Example (curl):** + ```bash + curl -X POST "http://localhost:8080/api/v1/project/myproject/mypackage/upload" \\ + -H "Authorization: Bearer " \\ + -F "file=@myfile.tar.gz" \\ + -F "tag=v1.0.0" + ``` + + **Example (Python requests):** + ```python + import requests + with open('myfile.tar.gz', 'rb') as f: + response = requests.post( + 'http://localhost:8080/api/v1/project/myproject/mypackage/upload', + files={'file': f}, + data={'tag': 'v1.0.0'}, + headers={'Authorization': 'Bearer '} + ) + ``` + + **Example (JavaScript fetch):** + ```javascript + const formData = new FormData(); + formData.append('file', fileInput.files[0]); + formData.append('tag', 'v1.0.0'); + const response = await fetch('/api/v1/project/myproject/mypackage/upload', { + method: 'POST', + headers: { 'Authorization': 'Bearer ' }, + body: formData + }); + ``` """ start_time = time.time() settings = get_settings() @@ -2388,6 +2435,30 @@ def upload_artifact( except StorageError as e: logger.error(f"Storage error during upload: {e}") raise HTTPException(status_code=500, detail="Internal storage error") + except (ConnectionResetError, BrokenPipeError) as e: + # Client disconnected during upload + logger.warning( + f"Client disconnected during upload: project={project_name} " + f"package={package_name} filename={file.filename} error={e}" + ) + raise HTTPException( + status_code=499, # Client Closed Request (nginx convention) + detail="Client disconnected during upload", + ) + except Exception as e: + # Catch-all for unexpected errors including client disconnects + error_str = str(e).lower() + if "connection" in error_str or "broken pipe" in error_str or "reset" in error_str: + logger.warning( + f"Client connection error during upload: project={project_name} " + f"package={package_name} filename={file.filename} error={e}" + ) + raise HTTPException( + status_code=499, + detail="Client connection error during upload", + ) + logger.error(f"Unexpected error during upload: {e}", exc_info=True) + raise HTTPException(status_code=500, detail="Internal server error during upload") # Verify client-provided checksum if present checksum_verified = True @@ -2580,6 +2651,12 @@ def upload_artifact( detail="Failed to save upload record. Please retry.", ) + # Calculate throughput + throughput_mbps = None + if duration_ms > 0: + duration_seconds = duration_ms / 1000.0 + throughput_mbps = round((storage_result.size / (1024 * 1024)) / duration_seconds, 2) + return UploadResponse( artifact_id=storage_result.sha256, sha256=storage_result.sha256, @@ -2599,6 +2676,8 @@ def upload_artifact( content_type=artifact.content_type, original_name=artifact.original_name, created_at=artifact.created_at, + duration_ms=duration_ms, + throughput_mbps=throughput_mbps, ) @@ -2616,8 +2695,46 @@ def init_resumable_upload( storage: S3Storage = Depends(get_storage), ): """ - Initialize a resumable upload session. - Client must provide the SHA256 hash of the file in advance. + Initialize a resumable upload session for large files. + + Resumable uploads allow uploading large files in chunks, with the ability + to resume after interruption. The client must compute the SHA256 hash + of the entire file before starting. + + **Workflow:** + 1. POST /upload/init - Initialize upload session (this endpoint) + 2. PUT /upload/{upload_id}/part/{part_number} - Upload each part + 3. GET /upload/{upload_id}/progress - Check upload progress (optional) + 4. POST /upload/{upload_id}/complete - Finalize upload + 5. DELETE /upload/{upload_id} - Abort upload (if needed) + + **Chunk Size:** + Use the `chunk_size` returned in the response (10MB default). + Each part except the last must be exactly this size. + + **Deduplication:** + If the expected_hash already exists in storage, the response will include + `already_exists: true` and no upload session is created. + + **Example (curl):** + ```bash + # Step 1: Initialize + curl -X POST "http://localhost:8080/api/v1/project/myproject/mypackage/upload/init" \\ + -H "Authorization: Bearer " \\ + -H "Content-Type: application/json" \\ + -d '{"expected_hash": "", "filename": "large.tar.gz", "size": 104857600}' + + # Step 2: Upload parts + curl -X PUT "http://localhost:8080/api/v1/project/myproject/mypackage/upload//part/1" \\ + -H "Authorization: Bearer " \\ + --data-binary @part1.bin + + # Step 3: Complete + curl -X POST "http://localhost:8080/api/v1/project/myproject/mypackage/upload//complete" \\ + -H "Authorization: Bearer " \\ + -H "Content-Type: application/json" \\ + -d '{"tag": "v1.0.0"}' + ``` """ user_id = get_user_id(request) @@ -2711,6 +2828,10 @@ def init_resumable_upload( # Initialize resumable upload session = storage.initiate_resumable_upload(init_request.expected_hash) + # Set expected size for progress tracking + if session["upload_id"] and init_request.size: + storage.set_upload_expected_size(session["upload_id"], init_request.size) + return ResumableUploadInitResponse( upload_id=session["upload_id"], already_exists=False, @@ -2777,6 +2898,64 @@ def upload_part( raise HTTPException(status_code=404, detail=str(e)) +@router.get( + "/api/v1/project/{project_name}/{package_name}/upload/{upload_id}/progress", + response_model=UploadProgressResponse, +) +def get_upload_progress( + project_name: str, + package_name: str, + upload_id: str, + db: Session = Depends(get_db), + storage: S3Storage = Depends(get_storage), +): + """ + Get progress information for an in-flight resumable upload. + + Returns progress metrics including bytes uploaded, percent complete, + elapsed time, and throughput. + """ + # Validate project and package exist + project = db.query(Project).filter(Project.name == project_name).first() + if not project: + raise HTTPException(status_code=404, detail="Project not found") + + package = ( + db.query(Package) + .filter(Package.project_id == project.id, Package.name == package_name) + .first() + ) + if not package: + raise HTTPException(status_code=404, detail="Package not found") + + progress = storage.get_upload_progress(upload_id) + if not progress: + # Return not_found status instead of 404 to allow polling + return UploadProgressResponse( + upload_id=upload_id, + status="not_found", + bytes_uploaded=0, + ) + + from datetime import datetime, timezone + started_at_dt = None + if progress.get("started_at"): + started_at_dt = datetime.fromtimestamp(progress["started_at"], tz=timezone.utc) + + return UploadProgressResponse( + upload_id=upload_id, + status=progress.get("status", "in_progress"), + bytes_uploaded=progress.get("bytes_uploaded", 0), + bytes_total=progress.get("bytes_total"), + percent_complete=progress.get("percent_complete"), + parts_uploaded=progress.get("parts_uploaded", 0), + parts_total=progress.get("parts_total"), + started_at=started_at_dt, + elapsed_seconds=progress.get("elapsed_seconds"), + throughput_mbps=progress.get("throughput_mbps"), + ) + + @router.post( "/api/v1/project/{project_name}/{package_name}/upload/{upload_id}/complete" ) diff --git a/backend/app/schemas.py b/backend/app/schemas.py index 1fe395b..275f827 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -412,6 +412,9 @@ class UploadResponse(BaseModel): content_type: Optional[str] = None original_name: Optional[str] = None created_at: Optional[datetime] = None + # Upload metrics (Issue #43) + duration_ms: Optional[int] = None # Upload duration in milliseconds + throughput_mbps: Optional[float] = None # Upload throughput in MB/s # Resumable upload schemas @@ -478,6 +481,21 @@ class ResumableUploadStatusResponse(BaseModel): total_uploaded_bytes: int +class UploadProgressResponse(BaseModel): + """Progress information for an in-flight upload""" + + upload_id: str + status: str # 'in_progress', 'completed', 'failed', 'not_found' + bytes_uploaded: int = 0 + bytes_total: Optional[int] = None + percent_complete: Optional[float] = None + parts_uploaded: int = 0 + parts_total: Optional[int] = None + started_at: Optional[datetime] = None + elapsed_seconds: Optional[float] = None + throughput_mbps: Optional[float] = None + + # Consumer schemas class ConsumerResponse(BaseModel): id: UUID diff --git a/backend/app/storage.py b/backend/app/storage.py index ca3ffe3..cb7dbd4 100644 --- a/backend/app/storage.py +++ b/backend/app/storage.py @@ -378,10 +378,16 @@ class S3Storage: """ # First pass: compute all hashes by streaming through file try: + import time sha256_hasher = hashlib.sha256() md5_hasher = hashlib.md5() sha1_hasher = hashlib.sha1() size = 0 + hash_start_time = time.time() + last_log_time = hash_start_time + log_interval_seconds = 5 # Log progress every 5 seconds + + logger.info(f"Computing hashes for large file: expected_size={content_length}") # Read file in chunks to compute hashes while True: @@ -393,6 +399,18 @@ class S3Storage: sha1_hasher.update(chunk) size += len(chunk) + # Log hash computation progress periodically + current_time = time.time() + if current_time - last_log_time >= log_interval_seconds: + elapsed = current_time - hash_start_time + percent = (size / content_length) * 100 if content_length > 0 else 0 + throughput = (size / (1024 * 1024)) / elapsed if elapsed > 0 else 0 + logger.info( + f"Hash computation progress: bytes={size}/{content_length} ({percent:.1f}%) " + f"throughput={throughput:.2f}MB/s" + ) + last_log_time = current_time + # Enforce file size limit during streaming (protection against spoofing) if size > settings.max_file_size: raise FileSizeExceededError( @@ -405,6 +423,14 @@ class S3Storage: sha256_hash = sha256_hasher.hexdigest() md5_hash = md5_hasher.hexdigest() sha1_hash = sha1_hasher.hexdigest() + + # Log hash computation completion + hash_elapsed = time.time() - hash_start_time + hash_throughput = (size / (1024 * 1024)) / hash_elapsed if hash_elapsed > 0 else 0 + logger.info( + f"Hash computation completed: hash={sha256_hash[:16]}... " + f"size={size} duration={hash_elapsed:.2f}s throughput={hash_throughput:.2f}MB/s" + ) except (HashComputationError, FileSizeExceededError): raise except Exception as e: @@ -458,8 +484,19 @@ class S3Storage: upload_id = mpu["UploadId"] try: + import time parts = [] part_number = 1 + bytes_uploaded = 0 + upload_start_time = time.time() + last_log_time = upload_start_time + log_interval_seconds = 5 # Log progress every 5 seconds + + total_parts = (content_length + MULTIPART_CHUNK_SIZE - 1) // MULTIPART_CHUNK_SIZE + logger.info( + f"Starting multipart upload: hash={sha256_hash[:16]}... " + f"size={content_length} parts={total_parts}" + ) while True: chunk = file.read(MULTIPART_CHUNK_SIZE) @@ -479,8 +516,32 @@ class S3Storage: "ETag": response["ETag"], } ) + bytes_uploaded += len(chunk) + + # Log progress periodically + current_time = time.time() + if current_time - last_log_time >= log_interval_seconds: + elapsed = current_time - upload_start_time + percent = (bytes_uploaded / content_length) * 100 + throughput = (bytes_uploaded / (1024 * 1024)) / elapsed if elapsed > 0 else 0 + logger.info( + f"Upload progress: hash={sha256_hash[:16]}... " + f"part={part_number}/{total_parts} " + f"bytes={bytes_uploaded}/{content_length} ({percent:.1f}%) " + f"throughput={throughput:.2f}MB/s" + ) + last_log_time = current_time + part_number += 1 + # Log completion + total_elapsed = time.time() - upload_start_time + final_throughput = (content_length / (1024 * 1024)) / total_elapsed if total_elapsed > 0 else 0 + logger.info( + f"Multipart upload completed: hash={sha256_hash[:16]}... " + f"size={content_length} duration={total_elapsed:.2f}s throughput={final_throughput:.2f}MB/s" + ) + # Complete multipart upload complete_response = self.client.complete_multipart_upload( Bucket=self.bucket, @@ -502,12 +563,28 @@ class S3Storage: except Exception as e: # Abort multipart upload on failure - logger.error(f"Multipart upload failed: {e}") - self.client.abort_multipart_upload( - Bucket=self.bucket, - Key=s3_key, - UploadId=upload_id, + error_str = str(e).lower() + is_client_disconnect = ( + isinstance(e, (ConnectionResetError, BrokenPipeError)) or + "connection" in error_str or "broken pipe" in error_str or "reset" in error_str ) + if is_client_disconnect: + logger.warning( + f"Multipart upload aborted (client disconnect): hash={sha256_hash[:16]}... " + f"parts_uploaded={len(parts)} bytes_uploaded={bytes_uploaded}" + ) + else: + logger.error(f"Multipart upload failed: hash={sha256_hash[:16]}... error={e}") + + try: + self.client.abort_multipart_upload( + Bucket=self.bucket, + Key=s3_key, + UploadId=upload_id, + ) + logger.info(f"Multipart upload aborted and cleaned up: upload_id={upload_id[:16]}...") + except Exception as abort_error: + logger.error(f"Failed to abort multipart upload: {abort_error}") raise def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]: @@ -529,12 +606,17 @@ class S3Storage: mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key) upload_id = mpu["UploadId"] + import time session = { "upload_id": upload_id, "s3_key": s3_key, "already_exists": False, "parts": [], "expected_hash": expected_hash, + "started_at": time.time(), + "bytes_uploaded": 0, + "expected_size": None, # Set when init provides size + "status": "in_progress", } self._active_uploads[upload_id] = session return session @@ -561,10 +643,57 @@ class S3Storage: part_info = { "PartNumber": part_number, "ETag": response["ETag"], + "size": len(data), } session["parts"].append(part_info) + session["bytes_uploaded"] = session.get("bytes_uploaded", 0) + len(data) return part_info + def get_upload_progress(self, upload_id: str) -> Optional[Dict[str, Any]]: + """ + Get progress information for a resumable upload. + Returns None if upload not found. + """ + import time + session = self._active_uploads.get(upload_id) + if not session: + return None + + bytes_uploaded = session.get("bytes_uploaded", 0) + expected_size = session.get("expected_size") + started_at = session.get("started_at") + + progress = { + "upload_id": upload_id, + "status": session.get("status", "in_progress"), + "bytes_uploaded": bytes_uploaded, + "bytes_total": expected_size, + "parts_uploaded": len(session.get("parts", [])), + "parts_total": None, + "started_at": started_at, + "elapsed_seconds": None, + "percent_complete": None, + "throughput_mbps": None, + } + + if expected_size and expected_size > 0: + progress["percent_complete"] = round((bytes_uploaded / expected_size) * 100, 2) + progress["parts_total"] = (expected_size + MULTIPART_CHUNK_SIZE - 1) // MULTIPART_CHUNK_SIZE + + if started_at: + elapsed = time.time() - started_at + progress["elapsed_seconds"] = round(elapsed, 2) + if elapsed > 0 and bytes_uploaded > 0: + progress["throughput_mbps"] = round((bytes_uploaded / (1024 * 1024)) / elapsed, 2) + + return progress + + def set_upload_expected_size(self, upload_id: str, size: int): + """Set the expected size for an upload (for progress tracking).""" + session = self._active_uploads.get(upload_id) + if session: + session["expected_size"] = size + def complete_resumable_upload(self, upload_id: str) -> Tuple[str, str]: """ Complete a resumable upload. diff --git a/backend/tests/integration/test_large_uploads.py b/backend/tests/integration/test_large_uploads.py new file mode 100644 index 0000000..dd724d3 --- /dev/null +++ b/backend/tests/integration/test_large_uploads.py @@ -0,0 +1,550 @@ +""" +Integration tests for large file upload functionality. + +Tests cover: +- Large file uploads (100MB, 1GB) +- Multipart upload behavior +- Upload metrics (duration, throughput) +- Memory efficiency during uploads +- Upload progress tracking + +Note: Large tests are marked with @pytest.mark.slow and will be skipped +by default. Run with `pytest --run-slow` to include them. +""" + +import pytest +import io +import time +from tests.factories import ( + compute_sha256, + upload_test_file, + s3_object_exists, +) +from tests.conftest import ( + SIZE_1KB, + SIZE_100KB, + SIZE_1MB, + SIZE_10MB, + SIZE_100MB, + SIZE_1GB, +) + + +class TestUploadMetrics: + """Tests for upload duration and throughput metrics.""" + + @pytest.mark.integration + def test_upload_response_includes_duration_ms(self, integration_client, test_package): + """Test upload response includes duration_ms field.""" + project, package = test_package + content = b"duration test content" + + result = upload_test_file( + integration_client, project, package, content, tag="duration-test" + ) + + assert "duration_ms" in result + assert result["duration_ms"] is not None + assert result["duration_ms"] >= 0 + + @pytest.mark.integration + def test_upload_response_includes_throughput(self, integration_client, test_package): + """Test upload response includes throughput_mbps field.""" + project, package = test_package + content = b"throughput test content" + + result = upload_test_file( + integration_client, project, package, content, tag="throughput-test" + ) + + assert "throughput_mbps" in result + # For small files throughput may be very high or None + # Just verify the field exists + + @pytest.mark.integration + def test_upload_duration_reasonable( + self, integration_client, test_package, sized_content + ): + """Test upload duration is reasonable for file size.""" + project, package = test_package + content, _ = sized_content(SIZE_1MB, seed=100) + + start = time.time() + result = upload_test_file( + integration_client, project, package, content, tag="duration-check" + ) + actual_duration = (time.time() - start) * 1000 # ms + + # Reported duration should be close to actual + assert result["duration_ms"] is not None + # Allow some variance (network overhead) + assert result["duration_ms"] <= actual_duration + 1000 # Within 1s + + +class TestLargeFileUploads: + """Tests for large file uploads using multipart.""" + + @pytest.mark.integration + def test_upload_10mb_file(self, integration_client, test_package, sized_content): + """Test uploading a 10MB file.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_10MB, seed=200) + + result = upload_test_file( + integration_client, project, package, content, tag="large-10mb" + ) + + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_10MB + assert result["duration_ms"] is not None + assert result["throughput_mbps"] is not None + + @pytest.mark.integration + @pytest.mark.slow + def test_upload_100mb_file(self, integration_client, test_package, sized_content): + """Test uploading a 100MB file (triggers multipart upload).""" + project, package = test_package + content, expected_hash = sized_content(SIZE_100MB, seed=300) + + result = upload_test_file( + integration_client, project, package, content, tag="large-100mb" + ) + + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_100MB + # Verify S3 object exists + assert s3_object_exists(expected_hash) + + @pytest.mark.integration + @pytest.mark.slow + @pytest.mark.large + def test_upload_1gb_file(self, integration_client, test_package, sized_content): + """Test uploading a 1GB file.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_1GB, seed=400) + + result = upload_test_file( + integration_client, project, package, content, tag="large-1gb" + ) + + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_1GB + # Should have measurable throughput + assert result["throughput_mbps"] is not None + assert result["throughput_mbps"] > 0 + + @pytest.mark.integration + def test_large_file_deduplication( + self, integration_client, test_package, sized_content, unique_test_id + ): + """Test deduplication works for large files.""" + project, package = test_package + # Use unique_test_id to ensure unique content per test run + seed = hash(unique_test_id) % 10000 + content, expected_hash = sized_content(SIZE_10MB, seed=seed) + + # First upload + result1 = upload_test_file( + integration_client, project, package, content, tag=f"dedup-{unique_test_id}-1" + ) + # Note: may be True if previous test uploaded same content + first_dedupe = result1["deduplicated"] + + # Second upload of same content + result2 = upload_test_file( + integration_client, project, package, content, tag=f"dedup-{unique_test_id}-2" + ) + assert result2["artifact_id"] == expected_hash + # Second upload MUST be deduplicated + assert result2["deduplicated"] is True + + +class TestUploadProgress: + """Tests for upload progress tracking endpoint.""" + + @pytest.mark.integration + def test_progress_endpoint_returns_not_found_for_invalid_id( + self, integration_client, test_package + ): + """Test progress endpoint returns not_found status for invalid upload ID.""" + project, package = test_package + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/upload/invalid-upload-id/progress" + ) + + assert response.status_code == 200 + data = response.json() + assert data["status"] == "not_found" + assert data["upload_id"] == "invalid-upload-id" + + @pytest.mark.integration + def test_progress_endpoint_requires_valid_project( + self, integration_client, unique_test_id + ): + """Test progress endpoint validates project exists.""" + response = integration_client.get( + f"/api/v1/project/nonexistent-{unique_test_id}/pkg/upload/upload-id/progress" + ) + + assert response.status_code == 404 + + @pytest.mark.integration + def test_progress_endpoint_requires_valid_package( + self, integration_client, test_project, unique_test_id + ): + """Test progress endpoint validates package exists.""" + response = integration_client.get( + f"/api/v1/project/{test_project}/nonexistent-{unique_test_id}/upload/upload-id/progress" + ) + + assert response.status_code == 404 + + +class TestResumableUploadProgress: + """Tests for progress tracking during resumable uploads.""" + + @pytest.mark.integration + def test_resumable_upload_init_and_progress( + self, integration_client, test_package, sized_content + ): + """Test initializing resumable upload and checking progress.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_100KB, seed=600) + + # Get API key for auth + api_key_response = integration_client.post( + "/api/v1/auth/keys", + json={"name": "progress-test-key"}, + ) + assert api_key_response.status_code == 200 + api_key = api_key_response.json()["key"] + + # Initialize resumable upload + init_response = integration_client.post( + f"/api/v1/project/{project}/{package}/upload/init", + json={ + "expected_hash": expected_hash, + "filename": "progress-test.bin", + "size": SIZE_100KB, + }, + headers={"Authorization": f"Bearer {api_key}"}, + ) + assert init_response.status_code == 200 + upload_id = init_response.json().get("upload_id") + + if upload_id: + # Check initial progress + progress_response = integration_client.get( + f"/api/v1/project/{project}/{package}/upload/{upload_id}/progress", + headers={"Authorization": f"Bearer {api_key}"}, + ) + assert progress_response.status_code == 200 + progress = progress_response.json() + assert progress["status"] == "in_progress" + assert progress["bytes_uploaded"] == 0 + assert progress["bytes_total"] == SIZE_100KB + + # Abort to clean up + integration_client.delete( + f"/api/v1/project/{project}/{package}/upload/{upload_id}", + headers={"Authorization": f"Bearer {api_key}"}, + ) + + +class TestUploadSizeLimits: + """Tests for upload size limit enforcement.""" + + @pytest.mark.integration + def test_empty_file_rejected(self, integration_client, test_package): + """Test empty files are rejected.""" + project, package = test_package + + files = {"file": ("empty.txt", io.BytesIO(b""), "application/octet-stream")} + response = integration_client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + ) + + assert response.status_code in [400, 422] + + @pytest.mark.integration + def test_minimum_size_accepted(self, integration_client, test_package): + """Test 1-byte file is accepted.""" + project, package = test_package + content = b"X" + + result = upload_test_file( + integration_client, project, package, content, tag="min-size" + ) + + assert result["size"] == 1 + + @pytest.mark.integration + def test_content_length_header_used_in_response(self, integration_client, test_package): + """Test that upload response size matches Content-Length.""" + project, package = test_package + content = b"content length verification test" + + result = upload_test_file( + integration_client, project, package, content, tag="content-length-test" + ) + + # Size in response should match actual content length + assert result["size"] == len(content) + + +class TestUploadErrorHandling: + """Tests for upload error handling.""" + + @pytest.mark.integration + def test_upload_to_nonexistent_project_returns_404( + self, integration_client, unique_test_id + ): + """Test upload to nonexistent project returns 404.""" + content = b"test content" + files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} + + response = integration_client.post( + f"/api/v1/project/nonexistent-{unique_test_id}/pkg/upload", + files=files, + ) + + assert response.status_code == 404 + + @pytest.mark.integration + def test_upload_to_nonexistent_package_returns_404( + self, integration_client, test_project, unique_test_id + ): + """Test upload to nonexistent package returns 404.""" + content = b"test content" + files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} + + response = integration_client.post( + f"/api/v1/project/{test_project}/nonexistent-{unique_test_id}/upload", + files=files, + ) + + assert response.status_code == 404 + + @pytest.mark.integration + def test_upload_without_file_returns_422(self, integration_client, test_package): + """Test upload without file field returns 422.""" + project, package = test_package + + response = integration_client.post( + f"/api/v1/project/{project}/{package}/upload", + data={"tag": "no-file"}, + ) + + assert response.status_code == 422 + + @pytest.mark.integration + def test_upload_with_invalid_checksum_rejected( + self, integration_client, test_package + ): + """Test upload with invalid checksum header format is rejected.""" + project, package = test_package + content = b"checksum test" + + files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} + response = integration_client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + headers={"X-Checksum-SHA256": "invalid-checksum"}, + ) + + assert response.status_code == 400 + + @pytest.mark.integration + def test_upload_with_mismatched_checksum_rejected( + self, integration_client, test_package + ): + """Test upload with wrong checksum is rejected.""" + project, package = test_package + content = b"mismatch test" + wrong_hash = "0" * 64 + + files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} + response = integration_client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + headers={"X-Checksum-SHA256": wrong_hash}, + ) + + assert response.status_code == 422 + assert "verification failed" in response.json().get("detail", "").lower() + + +class TestResumableUploadCancellation: + """Tests for resumable upload cancellation.""" + + @pytest.mark.integration + def test_abort_resumable_upload(self, integration_client, test_package, sized_content): + """Test aborting a resumable upload cleans up properly.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_100KB, seed=700) + + # Get API key for auth + api_key_response = integration_client.post( + "/api/v1/auth/keys", + json={"name": "abort-test-key"}, + ) + assert api_key_response.status_code == 200 + api_key = api_key_response.json()["key"] + + # Initialize resumable upload + init_response = integration_client.post( + f"/api/v1/project/{project}/{package}/upload/init", + json={ + "expected_hash": expected_hash, + "filename": "abort-test.bin", + "size": SIZE_100KB, + }, + headers={"Authorization": f"Bearer {api_key}"}, + ) + assert init_response.status_code == 200 + upload_id = init_response.json().get("upload_id") + + if upload_id: + # Abort the upload (without uploading any parts) + abort_response = integration_client.delete( + f"/api/v1/project/{project}/{package}/upload/{upload_id}", + headers={"Authorization": f"Bearer {api_key}"}, + ) + assert abort_response.status_code in [200, 204] + + # Verify progress shows not_found after abort + progress_response = integration_client.get( + f"/api/v1/project/{project}/{package}/upload/{upload_id}/progress", + headers={"Authorization": f"Bearer {api_key}"}, + ) + assert progress_response.status_code == 200 + assert progress_response.json()["status"] == "not_found" + + @pytest.mark.integration + def test_abort_nonexistent_upload(self, integration_client, test_package): + """Test aborting nonexistent upload returns appropriate error.""" + project, package = test_package + + # Get API key for auth + api_key_response = integration_client.post( + "/api/v1/auth/keys", + json={"name": "abort-nonexistent-key"}, + ) + assert api_key_response.status_code == 200 + api_key = api_key_response.json()["key"] + + response = integration_client.delete( + f"/api/v1/project/{project}/{package}/upload/nonexistent-upload-id", + headers={"Authorization": f"Bearer {api_key}"}, + ) + + # Should return 404 or 200 (idempotent delete) + assert response.status_code in [200, 204, 404] + + +class TestUploadTimeout: + """Tests for upload timeout handling.""" + + @pytest.mark.integration + def test_upload_with_short_timeout_succeeds_for_small_file( + self, integration_client, test_package + ): + """Test small file upload succeeds with reasonable timeout.""" + project, package = test_package + content = b"small timeout test" + + # httpx client should handle this quickly + result = upload_test_file( + integration_client, project, package, content, tag="timeout-small" + ) + + assert result["artifact_id"] is not None + + @pytest.mark.integration + def test_upload_response_duration_under_timeout( + self, integration_client, test_package, sized_content + ): + """Test upload completes within reasonable time.""" + project, package = test_package + content, _ = sized_content(SIZE_1MB, seed=800) + + start = time.time() + result = upload_test_file( + integration_client, project, package, content, tag="timeout-check" + ) + duration = time.time() - start + + # 1MB should upload in well under 60 seconds on local + assert duration < 60 + assert result["artifact_id"] is not None + + +class TestConcurrentUploads: + """Tests for concurrent upload handling.""" + + @pytest.mark.integration + def test_concurrent_different_files( + self, integration_client, test_package, sized_content + ): + """Test concurrent uploads of different files succeed.""" + from concurrent.futures import ThreadPoolExecutor, as_completed + + project, package = test_package + + # Get API key for auth + api_key_response = integration_client.post( + "/api/v1/auth/keys", + json={"name": "concurrent-diff-key"}, + ) + assert api_key_response.status_code == 200 + api_key = api_key_response.json()["key"] + + num_uploads = 3 + results = [] + errors = [] + + def upload_unique_file(idx): + try: + from httpx import Client + + content, expected_hash = sized_content(SIZE_100KB, seed=900 + idx) + + base_url = "http://localhost:8080" + with Client(base_url=base_url, timeout=30.0) as client: + files = { + "file": ( + f"concurrent-{idx}.bin", + io.BytesIO(content), + "application/octet-stream", + ) + } + response = client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + data={"tag": f"concurrent-diff-{idx}"}, + headers={"Authorization": f"Bearer {api_key}"}, + ) + if response.status_code == 200: + results.append((idx, response.json(), expected_hash)) + else: + errors.append(f"Upload {idx}: {response.status_code} - {response.text}") + except Exception as e: + errors.append(f"Upload {idx}: {str(e)}") + + with ThreadPoolExecutor(max_workers=num_uploads) as executor: + futures = [executor.submit(upload_unique_file, i) for i in range(num_uploads)] + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Concurrent upload errors: {errors}" + assert len(results) == num_uploads + + # Each upload should have unique artifact ID + artifact_ids = set(r[1]["artifact_id"] for r in results) + assert len(artifact_ids) == num_uploads + + # Each should match expected hash + for idx, result, expected_hash in results: + assert result["artifact_id"] == expected_hash