""" Integration tests for artifact integrity verification. Tests cover: - Round-trip verification (upload -> download -> verify hash) - Consistency check endpoint - Header-based verification - Integrity verification across file sizes - Client-side verification workflow """ import pytest import io import hashlib from tests.factories import ( compute_sha256, upload_test_file, generate_content_with_hash, s3_object_exists, get_s3_client, get_s3_bucket, ) from tests.conftest import ( SIZE_1KB, SIZE_10KB, SIZE_100KB, SIZE_1MB, SIZE_10MB, ) class TestRoundTripVerification: """Tests for complete round-trip integrity verification.""" @pytest.mark.integration def test_upload_download_hash_matches(self, integration_client, test_package): """Test that upload -> download round trip preserves content integrity.""" project, package = test_package content = b"Round trip integrity test content" expected_hash = compute_sha256(content) # Upload and capture returned hash result = upload_test_file( integration_client, project, package, content, tag="roundtrip" ) uploaded_hash = result["artifact_id"] # Verify upload returned correct hash assert uploaded_hash == expected_hash # Download artifact response = integration_client.get( f"/api/v1/project/{project}/{package}/+/roundtrip", params={"mode": "proxy"}, ) assert response.status_code == 200 # Compute hash of downloaded content downloaded_hash = compute_sha256(response.content) # All three hashes should match assert downloaded_hash == expected_hash assert downloaded_hash == uploaded_hash @pytest.mark.integration def test_upload_response_contains_hash(self, integration_client, test_package): """Test upload response contains artifact_id which is the SHA256 hash.""" project, package = test_package content = b"Upload response hash test" expected_hash = compute_sha256(content) result = upload_test_file(integration_client, project, package, content) assert "artifact_id" in result assert result["artifact_id"] == expected_hash assert len(result["artifact_id"]) == 64 assert all(c in "0123456789abcdef" for c in result["artifact_id"]) @pytest.mark.integration def test_download_header_matches_artifact_id(self, integration_client, test_package): """Test X-Checksum-SHA256 header matches artifact ID.""" project, package = test_package content = b"Header verification test" expected_hash = compute_sha256(content) upload_test_file( integration_client, project, package, content, tag="header-check" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/header-check", params={"mode": "proxy"}, ) assert response.status_code == 200 assert response.headers.get("X-Checksum-SHA256") == expected_hash @pytest.mark.integration def test_etag_matches_artifact_id(self, integration_client, test_package): """Test ETag header matches artifact ID.""" project, package = test_package content = b"ETag verification test" expected_hash = compute_sha256(content) upload_test_file( integration_client, project, package, content, tag="etag-check" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/etag-check", params={"mode": "proxy"}, ) assert response.status_code == 200 etag = response.headers.get("ETag", "").strip('"') assert etag == expected_hash @pytest.mark.integration def test_artifact_endpoint_returns_correct_hash(self, integration_client, test_package): """Test artifact endpoint returns correct hash/ID.""" project, package = test_package content = b"Artifact endpoint hash test" expected_hash = compute_sha256(content) upload_test_file(integration_client, project, package, content) # Query artifact directly response = integration_client.get(f"/api/v1/artifact/{expected_hash}") assert response.status_code == 200 data = response.json() assert data["id"] == expected_hash assert data.get("sha256") == expected_hash class TestClientSideVerificationWorkflow: """Tests for client-side verification workflow.""" @pytest.mark.integration def test_client_can_verify_before_upload(self, integration_client, test_package): """Test client can compute hash before upload and verify response matches.""" project, package = test_package content = b"Client pre-upload verification test" # Client computes hash locally before upload client_hash = compute_sha256(content) # Upload result = upload_test_file(integration_client, project, package, content) # Client verifies server returned the same hash assert result["artifact_id"] == client_hash @pytest.mark.integration def test_client_can_provide_checksum_header(self, integration_client, test_package): """Test client can provide X-Checksum-SHA256 header for verification.""" project, package = test_package content = b"Client checksum header test" client_hash = compute_sha256(content) files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, headers={"X-Checksum-SHA256": client_hash}, ) assert response.status_code == 200 assert response.json()["artifact_id"] == client_hash @pytest.mark.integration def test_checksum_mismatch_rejected(self, integration_client, test_package): """Test upload with wrong client checksum is rejected.""" project, package = test_package content = b"Checksum mismatch test" wrong_hash = "0" * 64 files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, headers={"X-Checksum-SHA256": wrong_hash}, ) assert response.status_code == 422 @pytest.mark.integration def test_client_can_verify_after_download(self, integration_client, test_package): """Test client can verify downloaded content matches header hash.""" project, package = test_package content = b"Client post-download verification" upload_test_file( integration_client, project, package, content, tag="verify-after" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/verify-after", params={"mode": "proxy"}, ) assert response.status_code == 200 # Client gets hash from header header_hash = response.headers.get("X-Checksum-SHA256") # Client computes hash of downloaded content downloaded_hash = compute_sha256(response.content) # Client verifies they match assert downloaded_hash == header_hash class TestIntegritySizeVariants: """Tests for integrity verification across different file sizes.""" @pytest.mark.integration def test_integrity_1kb(self, integration_client, test_package, sized_content): """Test integrity verification for 1KB file.""" project, package = test_package content, expected_hash = sized_content(SIZE_1KB, seed=100) result = upload_test_file( integration_client, project, package, content, tag="int-1kb" ) assert result["artifact_id"] == expected_hash response = integration_client.get( f"/api/v1/project/{project}/{package}/+/int-1kb", params={"mode": "proxy"}, ) assert response.status_code == 200 assert compute_sha256(response.content) == expected_hash assert response.headers.get("X-Checksum-SHA256") == expected_hash @pytest.mark.integration def test_integrity_100kb(self, integration_client, test_package, sized_content): """Test integrity verification for 100KB file.""" project, package = test_package content, expected_hash = sized_content(SIZE_100KB, seed=101) result = upload_test_file( integration_client, project, package, content, tag="int-100kb" ) assert result["artifact_id"] == expected_hash response = integration_client.get( f"/api/v1/project/{project}/{package}/+/int-100kb", params={"mode": "proxy"}, ) assert response.status_code == 200 assert compute_sha256(response.content) == expected_hash assert response.headers.get("X-Checksum-SHA256") == expected_hash @pytest.mark.integration def test_integrity_1mb(self, integration_client, test_package, sized_content): """Test integrity verification for 1MB file.""" project, package = test_package content, expected_hash = sized_content(SIZE_1MB, seed=102) result = upload_test_file( integration_client, project, package, content, tag="int-1mb" ) assert result["artifact_id"] == expected_hash response = integration_client.get( f"/api/v1/project/{project}/{package}/+/int-1mb", params={"mode": "proxy"}, ) assert response.status_code == 200 assert compute_sha256(response.content) == expected_hash assert response.headers.get("X-Checksum-SHA256") == expected_hash @pytest.mark.integration @pytest.mark.slow def test_integrity_10mb(self, integration_client, test_package, sized_content): """Test integrity verification for 10MB file.""" project, package = test_package content, expected_hash = sized_content(SIZE_10MB, seed=103) result = upload_test_file( integration_client, project, package, content, tag="int-10mb" ) assert result["artifact_id"] == expected_hash response = integration_client.get( f"/api/v1/project/{project}/{package}/+/int-10mb", params={"mode": "proxy"}, ) assert response.status_code == 200 assert compute_sha256(response.content) == expected_hash assert response.headers.get("X-Checksum-SHA256") == expected_hash class TestConsistencyCheck: """Tests for the admin consistency check endpoint.""" @pytest.mark.integration def test_consistency_check_returns_200(self, integration_client): """Test consistency check endpoint returns 200.""" response = integration_client.get("/api/v1/admin/consistency-check") assert response.status_code == 200 @pytest.mark.integration def test_consistency_check_response_format(self, integration_client): """Test consistency check returns expected response format.""" response = integration_client.get("/api/v1/admin/consistency-check") assert response.status_code == 200 data = response.json() # Check expected fields assert "total_artifacts_checked" in data assert "orphaned_s3_objects" in data assert "missing_s3_objects" in data assert "size_mismatches" in data assert "healthy" in data assert "orphaned_s3_keys" in data assert "missing_s3_keys" in data assert "size_mismatch_artifacts" in data # Verify types assert isinstance(data["total_artifacts_checked"], int) assert isinstance(data["orphaned_s3_objects"], int) assert isinstance(data["missing_s3_objects"], int) assert isinstance(data["size_mismatches"], int) assert isinstance(data["healthy"], bool) assert isinstance(data["orphaned_s3_keys"], list) assert isinstance(data["missing_s3_keys"], list) assert isinstance(data["size_mismatch_artifacts"], list) @pytest.mark.integration def test_consistency_check_after_upload(self, integration_client, test_package): """Test consistency check passes after valid upload.""" project, package = test_package content = b"Consistency check test content" # Upload artifact upload_test_file(integration_client, project, package, content) # Run consistency check response = integration_client.get("/api/v1/admin/consistency-check") assert response.status_code == 200 data = response.json() # Verify check ran and no issues assert data["total_artifacts_checked"] >= 1 assert data["healthy"] is True @pytest.mark.integration def test_consistency_check_limit_parameter(self, integration_client): """Test consistency check respects limit parameter.""" response = integration_client.get( "/api/v1/admin/consistency-check", params={"limit": 10} ) assert response.status_code == 200 data = response.json() # Lists should not exceed limit assert len(data["orphaned_s3_keys"]) <= 10 assert len(data["missing_s3_keys"]) <= 10 assert len(data["size_mismatch_artifacts"]) <= 10 class TestDigestHeader: """Tests for RFC 3230 Digest header.""" @pytest.mark.integration def test_download_includes_digest_header(self, integration_client, test_package): """Test download includes Digest header in RFC 3230 format.""" project, package = test_package content = b"Digest header test" expected_hash = compute_sha256(content) upload_test_file( integration_client, project, package, content, tag="digest-test" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/digest-test", params={"mode": "proxy"}, ) assert response.status_code == 200 assert "Digest" in response.headers # Verify Digest format (sha-256=base64hash) digest = response.headers["Digest"] assert digest.startswith("sha-256=") @pytest.mark.integration def test_digest_header_base64_valid(self, integration_client, test_package): """Test Digest header contains valid base64 encoding.""" import base64 project, package = test_package content = b"Digest base64 test" expected_hash = compute_sha256(content) upload_test_file( integration_client, project, package, content, tag="digest-b64" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/digest-b64", params={"mode": "proxy"}, ) assert response.status_code == 200 digest = response.headers["Digest"] base64_part = digest.split("=", 1)[1] # Should be valid base64 try: decoded = base64.b64decode(base64_part) assert len(decoded) == 32 # SHA256 is 32 bytes except Exception as e: pytest.fail(f"Invalid base64 in Digest header: {e}") class TestVerificationModes: """Tests for download verification modes.""" @pytest.mark.integration def test_pre_verification_mode(self, integration_client, test_package): """Test pre-verification mode verifies before streaming.""" project, package = test_package content = b"Pre-verification mode test" upload_test_file( integration_client, project, package, content, tag="pre-verify" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/pre-verify", params={"mode": "proxy", "verify": "true", "verify_mode": "pre"}, ) assert response.status_code == 200 assert response.content == content # X-Verified header should be true assert response.headers.get("X-Verified") == "true" @pytest.mark.integration def test_stream_verification_mode(self, integration_client, test_package): """Test streaming verification mode.""" project, package = test_package content = b"Stream verification mode test" upload_test_file( integration_client, project, package, content, tag="stream-verify" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/stream-verify", params={"mode": "proxy", "verify": "true", "verify_mode": "stream"}, ) assert response.status_code == 200 assert response.content == content class TestArtifactIntegrityEndpoint: """Tests for artifact-specific integrity operations.""" @pytest.mark.integration def test_artifact_size_matches(self, integration_client, test_package): """Test artifact endpoint returns correct size.""" project, package = test_package content = b"Artifact size test content" expected_size = len(content) result = upload_test_file(integration_client, project, package, content) artifact_id = result["artifact_id"] response = integration_client.get(f"/api/v1/artifact/{artifact_id}") assert response.status_code == 200 data = response.json() assert data["size"] == expected_size @pytest.mark.integration def test_content_length_header_matches_size(self, integration_client, test_package): """Test Content-Length header matches artifact size.""" project, package = test_package content = b"Content-Length header test" expected_size = len(content) upload_test_file( integration_client, project, package, content, tag="content-len" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/content-len", params={"mode": "proxy"}, ) assert response.status_code == 200 assert int(response.headers.get("Content-Length", 0)) == expected_size assert len(response.content) == expected_size @pytest.mark.requires_direct_s3 class TestCorruptionDetection: """Tests for detecting corrupted S3 objects. These tests directly manipulate S3 objects to simulate corruption and verify that the system can detect hash mismatches. Note: These tests require direct S3/MinIO access and are skipped in CI where S3 is not directly accessible from the test runner. """ @pytest.mark.integration def test_detection_of_corrupted_content(self, integration_client, test_package): """Test that corrupted S3 content is detected via hash mismatch. Uploads content, then directly modifies the S3 object, then verifies that the downloaded content hash doesn't match. """ project, package = test_package content = b"Original content for corruption test" expected_hash = compute_sha256(content) # Upload original content result = upload_test_file( integration_client, project, package, content, tag="corrupt-test" ) assert result["artifact_id"] == expected_hash # Get the S3 object and corrupt it s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" # Replace with corrupted content corrupted_content = b"Corrupted content - different from original!" s3_client.put_object(Bucket=bucket, Key=s3_key, Body=corrupted_content) # Download via proxy (bypasses hash verification) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/corrupt-test", params={"mode": "proxy"}, ) assert response.status_code == 200 # Verify the downloaded content doesn't match original hash downloaded_hash = compute_sha256(response.content) assert downloaded_hash != expected_hash, "Corruption was not detected - hashes match" assert response.content == corrupted_content # The X-Checksum-SHA256 header should still show the original hash (from DB) # but the actual content hash is different header_hash = response.headers.get("X-Checksum-SHA256") assert header_hash == expected_hash # Header shows expected hash assert downloaded_hash != header_hash # But content is corrupted # Restore original content for cleanup s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content) @pytest.mark.integration def test_detection_of_single_bit_flip(self, integration_client, test_package): """Test detection of a single bit flip in S3 object content.""" project, package = test_package content = b"Content for single bit flip detection test" expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project, package, content, tag="bitflip-test" ) assert result["artifact_id"] == expected_hash # Get S3 object and flip a single bit s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" # Flip the first bit of the first byte corrupted_content = bytearray(content) corrupted_content[0] ^= 0x01 corrupted_content = bytes(corrupted_content) s3_client.put_object(Bucket=bucket, Key=s3_key, Body=corrupted_content) # Download and verify hash mismatch response = integration_client.get( f"/api/v1/project/{project}/{package}/+/bitflip-test", params={"mode": "proxy"}, ) assert response.status_code == 200 downloaded_hash = compute_sha256(response.content) assert downloaded_hash != expected_hash, "Single bit flip not detected" # Restore original s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content) @pytest.mark.integration def test_detection_of_truncated_content(self, integration_client, test_package): """Test detection of truncated S3 object.""" project, package = test_package content = b"This is content that will be truncated for testing purposes" expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project, package, content, tag="truncate-test" ) assert result["artifact_id"] == expected_hash # Get S3 object and truncate it s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" # Truncate to half the original size truncated_content = content[: len(content) // 2] s3_client.put_object(Bucket=bucket, Key=s3_key, Body=truncated_content) # Download and verify hash mismatch response = integration_client.get( f"/api/v1/project/{project}/{package}/+/truncate-test", params={"mode": "proxy"}, ) assert response.status_code == 200 downloaded_hash = compute_sha256(response.content) assert downloaded_hash != expected_hash, "Truncation not detected" assert len(response.content) < len(content), "Content was not truncated" # Restore original s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content) @pytest.mark.integration def test_detection_of_appended_content(self, integration_client, test_package): """Test detection of content with extra bytes appended.""" project, package = test_package content = b"Original content" expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project, package, content, tag="append-test" ) assert result["artifact_id"] == expected_hash # Get S3 object and append extra bytes s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" appended_content = content + b" - extra bytes appended" s3_client.put_object(Bucket=bucket, Key=s3_key, Body=appended_content) # Download and verify hash mismatch response = integration_client.get( f"/api/v1/project/{project}/{package}/+/append-test", params={"mode": "proxy"}, ) assert response.status_code == 200 downloaded_hash = compute_sha256(response.content) assert downloaded_hash != expected_hash, "Appended content not detected" assert len(response.content) > len(content), "Content was not extended" # Restore original s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content) @pytest.mark.integration def test_client_detects_hash_mismatch_post_download( self, integration_client, test_package ): """Test that a client can detect hash mismatch after downloading corrupted content. This simulates the full client verification workflow: 1. Download content 2. Get expected hash from header 3. Compute actual hash of content 4. Verify they match (or detect corruption) """ project, package = test_package content = b"Content for client-side corruption detection" expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project, package, content, tag="client-detect" ) # Corrupt the S3 object s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" corrupted = b"This is completely different content" s3_client.put_object(Bucket=bucket, Key=s3_key, Body=corrupted) # Simulate client download and verification response = integration_client.get( f"/api/v1/project/{project}/{package}/+/client-detect", params={"mode": "proxy"}, ) assert response.status_code == 200 # Client gets expected hash from header header_hash = response.headers.get("X-Checksum-SHA256") # Client computes hash of downloaded content actual_hash = compute_sha256(response.content) # Client detects the mismatch corruption_detected = actual_hash != header_hash assert corruption_detected, "Client should detect hash mismatch" # Restore original s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content) @pytest.mark.integration def test_consistency_check_detects_size_mismatch( self, integration_client, test_package, unique_test_id ): """Test that consistency check detects size mismatches. Uploads content, modifies S3 object size, then runs consistency check. """ project, package = test_package content = b"Content for size mismatch consistency check test " + unique_test_id.encode() expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project, package, content, tag="size-mismatch" ) # Modify S3 object to have different size s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" different_size_content = content + b"extra extra extra" s3_client.put_object(Bucket=bucket, Key=s3_key, Body=different_size_content) # Run consistency check response = integration_client.get("/api/v1/admin/consistency-check") assert response.status_code == 200 data = response.json() # Should detect the size mismatch assert data["size_mismatches"] >= 1 or len(data["size_mismatch_artifacts"]) >= 1 # Restore original s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content) @pytest.mark.integration def test_consistency_check_detects_missing_s3_object( self, integration_client, test_package, unique_test_id ): """Test that consistency check detects missing S3 objects. Uploads content, deletes S3 object, then runs consistency check. """ project, package = test_package content = b"Content for missing S3 object test " + unique_test_id.encode() expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project, package, content, tag="missing-s3" ) # Delete the S3 object s3_client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" s3_client.delete_object(Bucket=bucket, Key=s3_key) # Run consistency check response = integration_client.get("/api/v1/admin/consistency-check") assert response.status_code == 200 data = response.json() # Should detect the missing S3 object assert data["missing_s3_objects"] >= 1 or len(data["missing_s3_keys"]) >= 1 # Restore the object for cleanup s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content)