""" Integration tests for artifact integrity verification. Tests cover: - Round-trip verification (upload -> download -> verify hash) - Consistency check endpoint - Header-based verification - Integrity verification across file sizes - Client-side verification workflow """ import pytest import io import hashlib from tests.factories import ( compute_sha256, upload_test_file, generate_content_with_hash, s3_object_exists, get_s3_client, get_s3_bucket, ) from tests.conftest import ( SIZE_1KB, SIZE_10KB, SIZE_100KB, SIZE_1MB, SIZE_10MB, ) class TestRoundTripVerification: """Tests for complete round-trip integrity verification.""" @pytest.mark.integration def test_upload_download_hash_matches(self, integration_client, test_package): """Test that upload -> download round trip preserves content integrity.""" project, package = test_package content = b"Round trip integrity test content" expected_hash = compute_sha256(content) # Upload and capture returned hash result = upload_test_file( integration_client, project, package, content, version="roundtrip" ) uploaded_hash = result["artifact_id"] # Verify upload returned correct hash assert uploaded_hash == expected_hash # Download artifact response = integration_client.get( f"/api/v1/project/{project}/{package}/+/roundtrip", params={"mode": "proxy"}, ) assert response.status_code == 200 # Compute hash of downloaded content downloaded_hash = compute_sha256(response.content) # All three hashes should match assert downloaded_hash == expected_hash assert downloaded_hash == uploaded_hash @pytest.mark.integration def test_upload_response_contains_hash(self, integration_client, test_package): """Test upload response contains artifact_id which is the SHA256 hash.""" project, package = test_package content = b"Upload response hash test" expected_hash = compute_sha256(content) result = upload_test_file(integration_client, project, package, content) assert "artifact_id" in result assert result["artifact_id"] == expected_hash assert len(result["artifact_id"]) == 64 assert all(c in "0123456789abcdef" for c in result["artifact_id"]) @pytest.mark.integration def test_download_header_matches_artifact_id(self, integration_client, test_package): """Test X-Checksum-SHA256 header matches artifact ID.""" project, package = test_package content = b"Header verification test" expected_hash = compute_sha256(content) upload_test_file( integration_client, project, package, content, version="header-check" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/header-check", params={"mode": "proxy"}, ) assert response.status_code == 200 assert response.headers.get("X-Checksum-SHA256") == expected_hash @pytest.mark.integration def test_etag_matches_artifact_id(self, integration_client, test_package): """Test ETag header matches artifact ID.""" project, package = test_package content = b"ETag verification test" expected_hash = compute_sha256(content) upload_test_file( integration_client, project, package, content, version="etag-check" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/etag-check", params={"mode": "proxy"}, ) assert response.status_code == 200 etag = response.headers.get("ETag", "").strip('"') assert etag == expected_hash @pytest.mark.integration def test_artifact_endpoint_returns_correct_hash(self, integration_client, test_package): """Test artifact endpoint returns correct hash/ID.""" project, package = test_package content = b"Artifact endpoint hash test" expected_hash = compute_sha256(content) upload_test_file(integration_client, project, package, content) # Query artifact directly response = integration_client.get(f"/api/v1/artifact/{expected_hash}") assert response.status_code == 200 data = response.json() assert data["id"] == expected_hash assert data.get("sha256") == expected_hash class TestClientSideVerificationWorkflow: """Tests for client-side verification workflow.""" @pytest.mark.integration def test_client_can_verify_before_upload(self, integration_client, test_package): """Test client can compute hash before upload and verify response matches.""" project, package = test_package content = b"Client pre-upload verification test" # Client computes hash locally before upload client_hash = compute_sha256(content) # Upload result = upload_test_file(integration_client, project, package, content) # Client verifies server returned the same hash assert result["artifact_id"] == client_hash @pytest.mark.integration def test_client_can_provide_checksum_header(self, integration_client, test_package): """Test client can provide X-Checksum-SHA256 header for verification.""" project, package = test_package content = b"Client checksum header test" client_hash = compute_sha256(content) files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, headers={"X-Checksum-SHA256": client_hash}, ) assert response.status_code == 200 assert response.json()["artifact_id"] == client_hash @pytest.mark.integration def test_checksum_mismatch_rejected(self, integration_client, test_package): """Test upload with wrong client checksum is rejected.""" project, package = test_package content = b"Checksum mismatch test" wrong_hash = "0" * 64 files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, headers={"X-Checksum-SHA256": wrong_hash}, ) assert response.status_code == 422 @pytest.mark.integration def test_client_can_verify_after_download(self, integration_client, test_package): """Test client can verify downloaded content matches header hash.""" project, package = test_package content = b"Client post-download verification" upload_test_file( integration_client, project, package, content, version="verify-after" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/verify-after", params={"mode": "proxy"}, ) assert response.status_code == 200 # Client gets hash from header header_hash = response.headers.get("X-Checksum-SHA256") # Client computes hash of downloaded content downloaded_hash = compute_sha256(response.content) # Client verifies they match assert downloaded_hash == header_hash class TestIntegritySizeVariants: """Tests for integrity verification across different file sizes.""" @pytest.mark.integration def test_integrity_1kb(self, integration_client, test_package, sized_content): """Test integrity verification for 1KB file.""" project, package = test_package content, expected_hash = sized_content(SIZE_1KB, seed=100) result = upload_test_file( integration_client, project, package, content, version="int-1kb" ) assert result["artifact_id"] == expected_hash response = integration_client.get( f"/api/v1/project/{project}/{package}/+/int-1kb", params={"mode": "proxy"}, ) assert response.status_code == 200 assert compute_sha256(response.content) == expected_hash assert response.headers.get("X-Checksum-SHA256") == expected_hash @pytest.mark.integration def test_integrity_100kb(self, integration_client, test_package, sized_content): """Test integrity verification for 100KB file.""" project, package = test_package content, expected_hash = sized_content(SIZE_100KB, seed=101) result = upload_test_file( integration_client, project, package, content, version="int-100kb" ) assert result["artifact_id"] == expected_hash response = integration_client.get( f"/api/v1/project/{project}/{package}/+/int-100kb", params={"mode": "proxy"}, ) assert response.status_code == 200 assert compute_sha256(response.content) == expected_hash assert response.headers.get("X-Checksum-SHA256") == expected_hash @pytest.mark.integration def test_integrity_1mb(self, integration_client, test_package, sized_content): """Test integrity verification for 1MB file.""" project, package = test_package content, expected_hash = sized_content(SIZE_1MB, seed=102) result = upload_test_file( integration_client, project, package, content, version="int-1mb" ) assert result["artifact_id"] == expected_hash response = integration_client.get( f"/api/v1/project/{project}/{package}/+/int-1mb", params={"mode": "proxy"}, ) assert response.status_code == 200 assert compute_sha256(response.content) == expected_hash assert response.headers.get("X-Checksum-SHA256") == expected_hash @pytest.mark.integration @pytest.mark.slow def test_integrity_10mb(self, integration_client, test_package, sized_content): """Test integrity verification for 10MB file.""" project, package = test_package content, expected_hash = sized_content(SIZE_10MB, seed=103) result = upload_test_file( integration_client, project, package, content, version="int-10mb" ) assert result["artifact_id"] == expected_hash response = integration_client.get( f"/api/v1/project/{project}/{package}/+/int-10mb", params={"mode": "proxy"}, ) assert response.status_code == 200 assert compute_sha256(response.content) == expected_hash assert response.headers.get("X-Checksum-SHA256") == expected_hash class TestConsistencyCheck: """Tests for the admin consistency check endpoint.""" @pytest.mark.integration def test_consistency_check_returns_200(self, integration_client): """Test consistency check endpoint returns 200.""" response = integration_client.get("/api/v1/admin/consistency-check") assert response.status_code == 200 @pytest.mark.integration def test_consistency_check_response_format(self, integration_client): """Test consistency check returns expected response format.""" response = integration_client.get("/api/v1/admin/consistency-check") assert response.status_code == 200 data = response.json() # Check expected fields assert "total_artifacts_checked" in data assert "orphaned_s3_objects" in data assert "missing_s3_objects" in data assert "size_mismatches" in data assert "healthy" in data assert "orphaned_s3_keys" in data assert "missing_s3_keys" in data assert "size_mismatch_artifacts" in data # Verify types assert isinstance(data["total_artifacts_checked"], int) assert isinstance(data["orphaned_s3_objects"], int) assert isinstance(data["missing_s3_objects"], int) assert isinstance(data["size_mismatches"], int) assert isinstance(data["healthy"], bool) assert isinstance(data["orphaned_s3_keys"], list) assert isinstance(data["missing_s3_keys"], list) assert isinstance(data["size_mismatch_artifacts"], list) @pytest.mark.integration def test_consistency_check_after_upload(self, integration_client, test_package): """Test consistency check runs successfully after a valid upload. Note: We don't assert healthy=True because other tests (especially corruption detection tests) may leave orphaned S3 objects behind. This test validates the consistency check endpoint works and the uploaded artifact is included in the check count. """ project, package = test_package content = b"Consistency check test content" # Upload artifact upload_test_file(integration_client, project, package, content) # Run consistency check response = integration_client.get("/api/v1/admin/consistency-check") assert response.status_code == 200 data = response.json() # Verify check ran - at least 1 artifact was checked assert data["total_artifacts_checked"] >= 1 # Verify no missing S3 objects (uploaded artifact should exist) assert data["missing_s3_objects"] == 0 @pytest.mark.integration def test_consistency_check_limit_parameter(self, integration_client): """Test consistency check respects limit parameter.""" response = integration_client.get( "/api/v1/admin/consistency-check", params={"limit": 10} ) assert response.status_code == 200 data = response.json() # Lists should not exceed limit assert len(data["orphaned_s3_keys"]) <= 10 assert len(data["missing_s3_keys"]) <= 10 assert len(data["size_mismatch_artifacts"]) <= 10 class TestDigestHeader: """Tests for RFC 3230 Digest header.""" @pytest.mark.integration def test_download_includes_digest_header(self, integration_client, test_package): """Test download includes Digest header in RFC 3230 format.""" project, package = test_package content = b"Digest header test" expected_hash = compute_sha256(content) upload_test_file( integration_client, project, package, content, version="digest-test" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/digest-test", params={"mode": "proxy"}, ) assert response.status_code == 200 assert "Digest" in response.headers # Verify Digest format (sha-256=base64hash) digest = response.headers["Digest"] assert digest.startswith("sha-256=") @pytest.mark.integration def test_digest_header_base64_valid(self, integration_client, test_package): """Test Digest header contains valid base64 encoding.""" import base64 project, package = test_package content = b"Digest base64 test" expected_hash = compute_sha256(content) upload_test_file( integration_client, project, package, content, version="digest-b64" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/digest-b64", params={"mode": "proxy"}, ) assert response.status_code == 200 digest = response.headers["Digest"] base64_part = digest.split("=", 1)[1] # Should be valid base64 try: decoded = base64.b64decode(base64_part) assert len(decoded) == 32 # SHA256 is 32 bytes except Exception as e: pytest.fail(f"Invalid base64 in Digest header: {e}") class TestVerificationModes: """Tests for download verification modes.""" @pytest.mark.integration def test_pre_verification_mode(self, integration_client, test_package): """Test pre-verification mode verifies before streaming.""" project, package = test_package content = b"Pre-verification mode test" upload_test_file( integration_client, project, package, content, version="pre-verify" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/pre-verify", params={"mode": "proxy", "verify": "true", "verify_mode": "pre"}, ) assert response.status_code == 200 assert response.content == content # X-Verified header should be true assert response.headers.get("X-Verified") == "true" @pytest.mark.integration def test_stream_verification_mode(self, integration_client, test_package): """Test streaming verification mode.""" project, package = test_package content = b"Stream verification mode test" upload_test_file( integration_client, project, package, content, version="stream-verify" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/stream-verify", params={"mode": "proxy", "verify": "true", "verify_mode": "stream"}, ) assert response.status_code == 200 assert response.content == content class TestArtifactIntegrityEndpoint: """Tests for artifact-specific integrity operations.""" @pytest.mark.integration def test_artifact_size_matches(self, integration_client, test_package): """Test artifact endpoint returns correct size.""" project, package = test_package content = b"Artifact size test content" expected_size = len(content) result = upload_test_file(integration_client, project, package, content) artifact_id = result["artifact_id"] response = integration_client.get(f"/api/v1/artifact/{artifact_id}") assert response.status_code == 200 data = response.json() assert data["size"] == expected_size @pytest.mark.integration def test_content_length_header_matches_size(self, integration_client, test_package): """Test Content-Length header matches artifact size.""" project, package = test_package content = b"Content-Length header test" expected_size = len(content) upload_test_file( integration_client, project, package, content, version="content-len" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/content-len", params={"mode": "proxy"}, ) assert response.status_code == 200 assert int(response.headers.get("Content-Length", 0)) == expected_size assert len(response.content) == expected_size @pytest.mark.requires_direct_s3 class TestCorruptionDetection: """Tests for detecting corrupted S3 objects. These tests directly manipulate S3 objects to simulate corruption and verify that the system can detect hash mismatches. Note: These tests require direct S3/MinIO access and are skipped in CI where S3 is not directly accessible from the test runner. """ @pytest.mark.integration def test_detection_of_corrupted_content(self, integration_client, test_package): """Test that corrupted S3 content is detected via hash mismatch. Uploads content, then directly modifies the S3 object, then verifies that the downloaded content hash doesn't match. """ project, package = test_package content = b"Original content for corruption test" expected_hash = compute_sha256(content) # Upload original content result = upload_test_file( integration_client, project, package, content, version="corrupt-test" ) assert result["artifact_id"] == expected_hash # Get the S3 object and corrupt it s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" # Replace with corrupted content corrupted_content = b"Corrupted content - different from original!" s3_client.put_object(Bucket=bucket, Key=s3_key, Body=corrupted_content) # Download via proxy (bypasses hash verification) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/corrupt-test", params={"mode": "proxy"}, ) assert response.status_code == 200 # Verify the downloaded content doesn't match original hash downloaded_hash = compute_sha256(response.content) assert downloaded_hash != expected_hash, "Corruption was not detected - hashes match" assert response.content == corrupted_content # The X-Checksum-SHA256 header should still show the original hash (from DB) # but the actual content hash is different header_hash = response.headers.get("X-Checksum-SHA256") assert header_hash == expected_hash # Header shows expected hash assert downloaded_hash != header_hash # But content is corrupted # Restore original content for cleanup s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content) @pytest.mark.integration def test_detection_of_single_bit_flip(self, integration_client, test_package): """Test detection of a single bit flip in S3 object content.""" project, package = test_package content = b"Content for single bit flip detection test" expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project, package, content, version="bitflip-test" ) assert result["artifact_id"] == expected_hash # Get S3 object and flip a single bit s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" # Flip the first bit of the first byte corrupted_content = bytearray(content) corrupted_content[0] ^= 0x01 corrupted_content = bytes(corrupted_content) s3_client.put_object(Bucket=bucket, Key=s3_key, Body=corrupted_content) # Download and verify hash mismatch response = integration_client.get( f"/api/v1/project/{project}/{package}/+/bitflip-test", params={"mode": "proxy"}, ) assert response.status_code == 200 downloaded_hash = compute_sha256(response.content) assert downloaded_hash != expected_hash, "Single bit flip not detected" # Restore original s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content) @pytest.mark.integration def test_detection_of_truncated_content(self, integration_client, test_package): """Test detection of truncated S3 object.""" project, package = test_package content = b"This is content that will be truncated for testing purposes" expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project, package, content, version="truncate-test" ) assert result["artifact_id"] == expected_hash # Get S3 object and truncate it s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" # Truncate to half the original size truncated_content = content[: len(content) // 2] s3_client.put_object(Bucket=bucket, Key=s3_key, Body=truncated_content) # Download and verify hash mismatch response = integration_client.get( f"/api/v1/project/{project}/{package}/+/truncate-test", params={"mode": "proxy"}, ) assert response.status_code == 200 downloaded_hash = compute_sha256(response.content) assert downloaded_hash != expected_hash, "Truncation not detected" assert len(response.content) < len(content), "Content was not truncated" # Restore original s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content) @pytest.mark.integration def test_detection_of_appended_content(self, integration_client, test_package): """Test detection of content with extra bytes appended.""" project, package = test_package content = b"Original content" expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project, package, content, version="append-test" ) assert result["artifact_id"] == expected_hash # Get S3 object and append extra bytes s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" appended_content = content + b" - extra bytes appended" s3_client.put_object(Bucket=bucket, Key=s3_key, Body=appended_content) # Download and verify hash mismatch response = integration_client.get( f"/api/v1/project/{project}/{package}/+/append-test", params={"mode": "proxy"}, ) assert response.status_code == 200 downloaded_hash = compute_sha256(response.content) assert downloaded_hash != expected_hash, "Appended content not detected" assert len(response.content) > len(content), "Content was not extended" # Restore original s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content) @pytest.mark.integration def test_client_detects_hash_mismatch_post_download( self, integration_client, test_package ): """Test that a client can detect hash mismatch after downloading corrupted content. This simulates the full client verification workflow: 1. Download content 2. Get expected hash from header 3. Compute actual hash of content 4. Verify they match (or detect corruption) """ project, package = test_package content = b"Content for client-side corruption detection" expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project, package, content, version="client-detect" ) # Corrupt the S3 object s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" corrupted = b"This is completely different content" s3_client.put_object(Bucket=bucket, Key=s3_key, Body=corrupted) # Simulate client download and verification response = integration_client.get( f"/api/v1/project/{project}/{package}/+/client-detect", params={"mode": "proxy"}, ) assert response.status_code == 200 # Client gets expected hash from header header_hash = response.headers.get("X-Checksum-SHA256") # Client computes hash of downloaded content actual_hash = compute_sha256(response.content) # Client detects the mismatch corruption_detected = actual_hash != header_hash assert corruption_detected, "Client should detect hash mismatch" # Restore original s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content) @pytest.mark.integration def test_consistency_check_detects_size_mismatch( self, integration_client, test_package, unique_test_id ): """Test that consistency check detects size mismatches. Uploads content, modifies S3 object size, then runs consistency check. """ project, package = test_package content = b"Content for size mismatch consistency check test " + unique_test_id.encode() expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project, package, content, version="size-mismatch" ) # Modify S3 object to have different size s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" different_size_content = content + b"extra extra extra" s3_client.put_object(Bucket=bucket, Key=s3_key, Body=different_size_content) # Run consistency check response = integration_client.get("/api/v1/admin/consistency-check") assert response.status_code == 200 data = response.json() # Should detect the size mismatch assert data["size_mismatches"] >= 1 or len(data["size_mismatch_artifacts"]) >= 1 # Restore original s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content) @pytest.mark.integration def test_consistency_check_detects_missing_s3_object( self, integration_client, test_package, unique_test_id ): """Test that consistency check detects missing S3 objects. Uploads content, deletes S3 object, then runs consistency check. """ project, package = test_package content = b"Content for missing S3 object test " + unique_test_id.encode() expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project, package, content, version="missing-s3" ) # Delete the S3 object s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" s3_client.delete_object(Bucket=bucket, Key=s3_key) # Run consistency check response = integration_client.get("/api/v1/admin/consistency-check") assert response.status_code == 200 data = response.json() # Should detect the missing S3 object assert data["missing_s3_objects"] >= 1 or len(data["missing_s3_keys"]) >= 1 # Restore the object for cleanup s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content)