""" Integration tests for duplicate uploads and storage verification. These tests require the full stack to be running (docker-compose.local.yml). Tests cover: - Duplicate upload scenarios across packages and projects - Storage verification (single S3 object, single artifact row) - Upload table tracking - Content integrity verification - Concurrent upload handling - Failure cleanup """ import pytest import io import threading import time from concurrent.futures import ThreadPoolExecutor, as_completed from tests.conftest import ( compute_sha256, upload_test_file, list_s3_objects_by_hash, s3_object_exists, delete_s3_object_by_hash, ) class TestDuplicateUploadScenarios: """Integration tests for duplicate upload behavior.""" @pytest.mark.integration def test_same_file_twice_returns_same_artifact_id( self, integration_client, test_package ): """Test uploading same file twice returns same artifact_id.""" project, package = test_package content = b"content uploaded twice for same artifact test" expected_hash = compute_sha256(content) # First upload result1 = upload_test_file( integration_client, project, package, content, tag="first" ) assert result1["artifact_id"] == expected_hash # Second upload result2 = upload_test_file( integration_client, project, package, content, tag="second" ) assert result2["artifact_id"] == expected_hash assert result1["artifact_id"] == result2["artifact_id"] @pytest.mark.integration def test_same_file_twice_increments_ref_count( self, integration_client, test_package ): """Test uploading same file twice increments ref_count to 2.""" project, package = test_package content = b"content for ref count increment test" # First upload result1 = upload_test_file( integration_client, project, package, content, tag="v1" ) assert result1["ref_count"] == 1 # Second upload result2 = upload_test_file( integration_client, project, package, content, tag="v2" ) assert result2["ref_count"] == 2 @pytest.mark.integration def test_same_file_different_packages_shares_artifact( self, integration_client, test_project, unique_test_id ): """Test uploading same file to different packages shares artifact.""" project = test_project content = f"content shared across packages {unique_test_id}".encode() expected_hash = compute_sha256(content) # Create two packages pkg1 = f"package-a-{unique_test_id}" pkg2 = f"package-b-{unique_test_id}" integration_client.post( f"/api/v1/project/{project}/packages", json={"name": pkg1, "description": "Package A"}, ) integration_client.post( f"/api/v1/project/{project}/packages", json={"name": pkg2, "description": "Package B"}, ) # Upload to first package result1 = upload_test_file(integration_client, project, pkg1, content, tag="v1") assert result1["artifact_id"] == expected_hash assert result1["deduplicated"] is False # Upload to second package result2 = upload_test_file(integration_client, project, pkg2, content, tag="v1") assert result2["artifact_id"] == expected_hash assert result2["deduplicated"] is True @pytest.mark.integration def test_same_file_different_projects_shares_artifact( self, integration_client, unique_test_id ): """Test uploading same file to different projects shares artifact.""" content = f"content shared across projects {unique_test_id}".encode() expected_hash = compute_sha256(content) # Create two projects with packages proj1 = f"project-x-{unique_test_id}" proj2 = f"project-y-{unique_test_id}" pkg_name = "shared-pkg" try: # Create projects and packages integration_client.post( "/api/v1/projects", json={"name": proj1, "description": "Project X", "is_public": True}, ) integration_client.post( "/api/v1/projects", json={"name": proj2, "description": "Project Y", "is_public": True}, ) integration_client.post( f"/api/v1/project/{proj1}/packages", json={"name": pkg_name, "description": "Package"}, ) integration_client.post( f"/api/v1/project/{proj2}/packages", json={"name": pkg_name, "description": "Package"}, ) # Upload to first project result1 = upload_test_file( integration_client, proj1, pkg_name, content, tag="v1" ) assert result1["artifact_id"] == expected_hash assert result1["deduplicated"] is False # Upload to second project result2 = upload_test_file( integration_client, proj2, pkg_name, content, tag="v1" ) assert result2["artifact_id"] == expected_hash assert result2["deduplicated"] is True finally: # Cleanup integration_client.delete(f"/api/v1/projects/{proj1}") integration_client.delete(f"/api/v1/projects/{proj2}") @pytest.mark.integration def test_same_file_different_filenames_shares_artifact( self, integration_client, test_package ): """Test uploading same file with different original filenames shares artifact.""" project, package = test_package content = b"content with different filenames" expected_hash = compute_sha256(content) # Upload with filename1 result1 = upload_test_file( integration_client, project, package, content, filename="file1.bin", tag="v1", ) assert result1["artifact_id"] == expected_hash # Upload with filename2 result2 = upload_test_file( integration_client, project, package, content, filename="file2.bin", tag="v2", ) assert result2["artifact_id"] == expected_hash assert result2["deduplicated"] is True @pytest.mark.integration def test_same_file_different_tags_shares_artifact( self, integration_client, test_package, unique_test_id ): """Test uploading same file with different tags shares artifact.""" project, package = test_package content = f"content with different tags {unique_test_id}".encode() expected_hash = compute_sha256(content) tags = ["latest", "stable", "v1.0.0", "release"] for i, tag in enumerate(tags): result = upload_test_file( integration_client, project, package, content, tag=tag ) assert result["artifact_id"] == expected_hash if i == 0: assert result["deduplicated"] is False else: assert result["deduplicated"] is True class TestStorageVerification: """Tests to verify storage behavior after duplicate uploads.""" @pytest.mark.integration def test_artifact_table_single_row_after_duplicates( self, integration_client, test_package ): """Test artifact table contains only one row after duplicate uploads.""" project, package = test_package content = b"content for single row test" expected_hash = compute_sha256(content) # Upload same content multiple times with different tags for tag in ["v1", "v2", "v3"]: upload_test_file(integration_client, project, package, content, tag=tag) # Query artifact - should exist and be unique response = integration_client.get(f"/api/v1/artifact/{expected_hash}") assert response.status_code == 200 artifact = response.json() assert artifact["id"] == expected_hash assert artifact["ref_count"] == 3 @pytest.mark.integration def test_upload_table_multiple_rows_for_duplicates( self, integration_client, test_package ): """Test upload table contains multiple rows for duplicate uploads (event tracking).""" project, package = test_package content = b"content for upload tracking test" # Upload same content 3 times for tag in ["upload1", "upload2", "upload3"]: upload_test_file(integration_client, project, package, content, tag=tag) # Check package stats - should show 3 uploads but fewer unique artifacts response = integration_client.get( f"/api/v1/project/{project}/packages/{package}" ) assert response.status_code == 200 pkg_info = response.json() assert pkg_info["tag_count"] == 3 @pytest.mark.integration def test_artifact_content_matches_original(self, integration_client, test_package): """Test artifact content retrieved matches original content exactly.""" project, package = test_package original_content = b"exact content verification test data 12345" # Upload result = upload_test_file( integration_client, project, package, original_content, tag="verify" ) # Download and compare download_response = integration_client.get( f"/api/v1/project/{project}/{package}/+/verify", params={"mode": "proxy"} ) assert download_response.status_code == 200 downloaded_content = download_response.content assert downloaded_content == original_content @pytest.mark.integration def test_storage_stats_reflect_deduplication( self, integration_client, test_package ): """Test total storage size matches single artifact size after duplicates.""" project, package = test_package content = b"content for storage stats test - should only count once" content_size = len(content) # Upload same content 5 times for tag in ["a", "b", "c", "d", "e"]: upload_test_file(integration_client, project, package, content, tag=tag) # Check global stats response = integration_client.get("/api/v1/stats") assert response.status_code == 200 stats = response.json() # Deduplication should show savings assert stats["deduplicated_uploads"] > 0 assert stats["storage_saved_bytes"] > 0 class TestConcurrentUploads: """Tests for concurrent upload handling.""" @pytest.mark.integration def test_concurrent_uploads_same_file(self, integration_client, test_package): """Test concurrent uploads of same file handle deduplication correctly.""" project, package = test_package content = b"content for concurrent upload test" expected_hash = compute_sha256(content) num_concurrent = 5 results = [] errors = [] def upload_worker(tag_suffix): try: # Create a new client for this thread from httpx import Client base_url = "http://localhost:8080" with Client(base_url=base_url, timeout=30.0) as client: files = { "file": ( f"concurrent-{tag_suffix}.bin", io.BytesIO(content), "application/octet-stream", ) } response = client.post( f"/api/v1/project/{project}/{package}/upload", files=files, data={"tag": f"concurrent-{tag_suffix}"}, ) if response.status_code == 200: results.append(response.json()) else: errors.append(f"Status {response.status_code}: {response.text}") except Exception as e: errors.append(str(e)) # Run concurrent uploads with ThreadPoolExecutor(max_workers=num_concurrent) as executor: futures = [executor.submit(upload_worker, i) for i in range(num_concurrent)] for future in as_completed(futures): pass # Wait for all to complete # Verify results assert len(errors) == 0, f"Errors during concurrent uploads: {errors}" assert len(results) == num_concurrent # All should have same artifact_id artifact_ids = set(r["artifact_id"] for r in results) assert len(artifact_ids) == 1 assert expected_hash in artifact_ids # Verify final ref_count response = integration_client.get(f"/api/v1/artifact/{expected_hash}") assert response.status_code == 200 assert response.json()["ref_count"] == num_concurrent class TestDeduplicationAcrossRestarts: """Tests for deduplication persistence.""" @pytest.mark.integration def test_deduplication_persists( self, integration_client, test_package, unique_test_id ): """ Test deduplication works with persisted data. This test uploads content, then uploads the same content again. Since the database persists, the second upload should detect the existing artifact even without server restart. """ project, package = test_package content = f"persisted content for dedup test {unique_test_id}".encode() expected_hash = compute_sha256(content) # First upload result1 = upload_test_file( integration_client, project, package, content, tag="persist1" ) assert result1["artifact_id"] == expected_hash assert result1["deduplicated"] is False # Second upload (simulating after restart - data is persisted) result2 = upload_test_file( integration_client, project, package, content, tag="persist2" ) assert result2["artifact_id"] == expected_hash assert result2["deduplicated"] is True # Verify artifact exists with correct ref_count response = integration_client.get(f"/api/v1/artifact/{expected_hash}") assert response.status_code == 200 assert response.json()["ref_count"] == 2 class TestS3ObjectVerification: """Tests to verify S3 storage behavior directly.""" @pytest.mark.integration def test_s3_bucket_single_object_after_duplicates( self, integration_client, test_package, unique_test_id ): """Test S3 bucket contains only one object after duplicate uploads.""" project, package = test_package content = f"content for s3 object count test {unique_test_id}".encode() expected_hash = compute_sha256(content) # Upload same content multiple times with different tags for tag in ["s3test1", "s3test2", "s3test3"]: upload_test_file(integration_client, project, package, content, tag=tag) # Verify only one S3 object exists for this hash s3_objects = list_s3_objects_by_hash(expected_hash) assert len(s3_objects) == 1, ( f"Expected 1 S3 object, found {len(s3_objects)}: {s3_objects}" ) # Verify the object key follows expected pattern expected_key = ( f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" ) assert s3_objects[0] == expected_key class TestUploadFailureCleanup: """Tests for cleanup when uploads fail.""" @pytest.mark.integration def test_upload_failure_invalid_project_no_orphaned_s3( self, integration_client, unique_test_id ): """Test upload to non-existent project doesn't leave orphaned S3 objects.""" content = f"content for orphan s3 test {unique_test_id}".encode() expected_hash = compute_sha256(content) # Attempt upload to non-existent project files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/nonexistent-project-{unique_test_id}/nonexistent-pkg/upload", files=files, data={"tag": "test"}, ) # Upload should fail assert response.status_code == 404 # Verify no S3 object was created assert not s3_object_exists(expected_hash), ( "Orphaned S3 object found after failed upload" ) @pytest.mark.integration def test_upload_failure_invalid_package_no_orphaned_s3( self, integration_client, test_project, unique_test_id ): """Test upload to non-existent package doesn't leave orphaned S3 objects.""" content = f"content for orphan s3 test pkg {unique_test_id}".encode() expected_hash = compute_sha256(content) # Attempt upload to non-existent package files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload", files=files, data={"tag": "test"}, ) # Upload should fail assert response.status_code == 404 # Verify no S3 object was created assert not s3_object_exists(expected_hash), ( "Orphaned S3 object found after failed upload" ) @pytest.mark.integration def test_upload_failure_empty_file_no_orphaned_s3( self, integration_client, test_package, unique_test_id ): """Test upload of empty file doesn't leave orphaned S3 objects or DB records.""" project, package = test_package content = b"" # Empty content # Attempt upload of empty file files = {"file": ("empty.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, data={"tag": f"empty-{unique_test_id}"}, ) # Upload should fail (empty files are rejected) assert response.status_code in (400, 422), ( f"Expected 400/422, got {response.status_code}" ) @pytest.mark.integration def test_upload_failure_no_orphaned_database_records( self, integration_client, test_project, unique_test_id ): """Test failed upload doesn't leave orphaned database records.""" content = f"content for db orphan test {unique_test_id}".encode() expected_hash = compute_sha256(content) # Attempt upload to non-existent package (should fail before DB insert) files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload", files=files, data={"tag": "test"}, ) # Upload should fail assert response.status_code == 404 # Verify no artifact record was created artifact_response = integration_client.get(f"/api/v1/artifact/{expected_hash}") assert artifact_response.status_code == 404, ( "Orphaned artifact record found after failed upload" ) @pytest.mark.integration def test_duplicate_tag_upload_handles_gracefully( self, integration_client, test_package, unique_test_id ): """Test uploading with duplicate tag is handled without orphaned data.""" project, package = test_package content1 = f"content version 1 {unique_test_id}".encode() content2 = f"content version 2 {unique_test_id}".encode() tag = f"duplicate-tag-{unique_test_id}" # First upload with tag result1 = upload_test_file( integration_client, project, package, content1, tag=tag ) hash1 = result1["artifact_id"] # Second upload with same tag (should update the tag to point to new artifact) result2 = upload_test_file( integration_client, project, package, content2, tag=tag ) hash2 = result2["artifact_id"] # Both artifacts should exist assert integration_client.get(f"/api/v1/artifact/{hash1}").status_code == 200 assert integration_client.get(f"/api/v1/artifact/{hash2}").status_code == 200 # Tag should point to the second artifact tag_response = integration_client.get( f"/api/v1/project/{project}/{package}/tags/{tag}" ) assert tag_response.status_code == 200 assert tag_response.json()["artifact_id"] == hash2 class TestFileSizeValidation: """Tests for file size limits and empty file rejection.""" @pytest.mark.integration def test_empty_file_rejected(self, integration_client, test_package): """Test that empty files are rejected with appropriate error.""" project, package = test_package # Try to upload empty content files = {"file": ("empty.txt", io.BytesIO(b""), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, ) # Should be rejected (422 from storage layer or validation) assert response.status_code in [422, 400] @pytest.mark.integration def test_small_valid_file_accepted(self, integration_client, test_package): """Test that small (1 byte) files are accepted.""" project, package = test_package content = b"X" # Single byte result = upload_test_file( integration_client, project, package, content, tag="tiny" ) assert result["artifact_id"] is not None assert result["size"] == 1 @pytest.mark.integration def test_file_size_reported_correctly( self, integration_client, test_package, unique_test_id ): """Test that file size is correctly reported in response.""" project, package = test_package content = f"Test content for size check {unique_test_id}".encode() expected_size = len(content) result = upload_test_file( integration_client, project, package, content, tag="size-test" ) assert result["size"] == expected_size # Also verify via artifact endpoint artifact_response = integration_client.get( f"/api/v1/artifact/{result['artifact_id']}" ) assert artifact_response.json()["size"] == expected_size