776 lines
29 KiB
Python
776 lines
29 KiB
Python
"""
|
|
Integration tests for artifact integrity verification.
|
|
|
|
Tests cover:
|
|
- Round-trip verification (upload -> download -> verify hash)
|
|
- Consistency check endpoint
|
|
- Header-based verification
|
|
- Integrity verification across file sizes
|
|
- Client-side verification workflow
|
|
"""
|
|
|
|
import pytest
|
|
import io
|
|
import hashlib
|
|
from tests.factories import (
|
|
compute_sha256,
|
|
upload_test_file,
|
|
generate_content_with_hash,
|
|
s3_object_exists,
|
|
get_s3_client,
|
|
get_s3_bucket,
|
|
)
|
|
from tests.conftest import (
|
|
SIZE_1KB,
|
|
SIZE_10KB,
|
|
SIZE_100KB,
|
|
SIZE_1MB,
|
|
SIZE_10MB,
|
|
)
|
|
|
|
|
|
class TestRoundTripVerification:
|
|
"""Tests for complete round-trip integrity verification."""
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_download_hash_matches(self, integration_client, test_package):
|
|
"""Test that upload -> download round trip preserves content integrity."""
|
|
project, package = test_package
|
|
content = b"Round trip integrity test content"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
# Upload and capture returned hash
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="roundtrip"
|
|
)
|
|
uploaded_hash = result["artifact_id"]
|
|
|
|
# Verify upload returned correct hash
|
|
assert uploaded_hash == expected_hash
|
|
|
|
# Download artifact
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/roundtrip",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# Compute hash of downloaded content
|
|
downloaded_hash = compute_sha256(response.content)
|
|
|
|
# All three hashes should match
|
|
assert downloaded_hash == expected_hash
|
|
assert downloaded_hash == uploaded_hash
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_response_contains_hash(self, integration_client, test_package):
|
|
"""Test upload response contains artifact_id which is the SHA256 hash."""
|
|
project, package = test_package
|
|
content = b"Upload response hash test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
result = upload_test_file(integration_client, project, package, content)
|
|
|
|
assert "artifact_id" in result
|
|
assert result["artifact_id"] == expected_hash
|
|
assert len(result["artifact_id"]) == 64
|
|
assert all(c in "0123456789abcdef" for c in result["artifact_id"])
|
|
|
|
@pytest.mark.integration
|
|
def test_download_header_matches_artifact_id(self, integration_client, test_package):
|
|
"""Test X-Checksum-SHA256 header matches artifact ID."""
|
|
project, package = test_package
|
|
content = b"Header verification test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content, version="header-check"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/header-check",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.headers.get("X-Checksum-SHA256") == expected_hash
|
|
|
|
@pytest.mark.integration
|
|
def test_etag_matches_artifact_id(self, integration_client, test_package):
|
|
"""Test ETag header matches artifact ID."""
|
|
project, package = test_package
|
|
content = b"ETag verification test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content, version="etag-check"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/etag-check",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
etag = response.headers.get("ETag", "").strip('"')
|
|
assert etag == expected_hash
|
|
|
|
@pytest.mark.integration
|
|
def test_artifact_endpoint_returns_correct_hash(self, integration_client, test_package):
|
|
"""Test artifact endpoint returns correct hash/ID."""
|
|
project, package = test_package
|
|
content = b"Artifact endpoint hash test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
upload_test_file(integration_client, project, package, content)
|
|
|
|
# Query artifact directly
|
|
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["id"] == expected_hash
|
|
assert data.get("sha256") == expected_hash
|
|
|
|
|
|
class TestClientSideVerificationWorkflow:
|
|
"""Tests for client-side verification workflow."""
|
|
|
|
@pytest.mark.integration
|
|
def test_client_can_verify_before_upload(self, integration_client, test_package):
|
|
"""Test client can compute hash before upload and verify response matches."""
|
|
project, package = test_package
|
|
content = b"Client pre-upload verification test"
|
|
|
|
# Client computes hash locally before upload
|
|
client_hash = compute_sha256(content)
|
|
|
|
# Upload
|
|
result = upload_test_file(integration_client, project, package, content)
|
|
|
|
# Client verifies server returned the same hash
|
|
assert result["artifact_id"] == client_hash
|
|
|
|
@pytest.mark.integration
|
|
def test_client_can_provide_checksum_header(self, integration_client, test_package):
|
|
"""Test client can provide X-Checksum-SHA256 header for verification."""
|
|
project, package = test_package
|
|
content = b"Client checksum header test"
|
|
client_hash = compute_sha256(content)
|
|
|
|
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
headers={"X-Checksum-SHA256": client_hash},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["artifact_id"] == client_hash
|
|
|
|
@pytest.mark.integration
|
|
def test_checksum_mismatch_rejected(self, integration_client, test_package):
|
|
"""Test upload with wrong client checksum is rejected."""
|
|
project, package = test_package
|
|
content = b"Checksum mismatch test"
|
|
wrong_hash = "0" * 64
|
|
|
|
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
headers={"X-Checksum-SHA256": wrong_hash},
|
|
)
|
|
assert response.status_code == 422
|
|
|
|
@pytest.mark.integration
|
|
def test_client_can_verify_after_download(self, integration_client, test_package):
|
|
"""Test client can verify downloaded content matches header hash."""
|
|
project, package = test_package
|
|
content = b"Client post-download verification"
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content, version="verify-after"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/verify-after",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# Client gets hash from header
|
|
header_hash = response.headers.get("X-Checksum-SHA256")
|
|
|
|
# Client computes hash of downloaded content
|
|
downloaded_hash = compute_sha256(response.content)
|
|
|
|
# Client verifies they match
|
|
assert downloaded_hash == header_hash
|
|
|
|
|
|
class TestIntegritySizeVariants:
|
|
"""Tests for integrity verification across different file sizes."""
|
|
|
|
@pytest.mark.integration
|
|
def test_integrity_1kb(self, integration_client, test_package, sized_content):
|
|
"""Test integrity verification for 1KB file."""
|
|
project, package = test_package
|
|
content, expected_hash = sized_content(SIZE_1KB, seed=100)
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="int-1kb"
|
|
)
|
|
assert result["artifact_id"] == expected_hash
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/int-1kb",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert compute_sha256(response.content) == expected_hash
|
|
assert response.headers.get("X-Checksum-SHA256") == expected_hash
|
|
|
|
@pytest.mark.integration
|
|
def test_integrity_100kb(self, integration_client, test_package, sized_content):
|
|
"""Test integrity verification for 100KB file."""
|
|
project, package = test_package
|
|
content, expected_hash = sized_content(SIZE_100KB, seed=101)
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="int-100kb"
|
|
)
|
|
assert result["artifact_id"] == expected_hash
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/int-100kb",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert compute_sha256(response.content) == expected_hash
|
|
assert response.headers.get("X-Checksum-SHA256") == expected_hash
|
|
|
|
@pytest.mark.integration
|
|
def test_integrity_1mb(self, integration_client, test_package, sized_content):
|
|
"""Test integrity verification for 1MB file."""
|
|
project, package = test_package
|
|
content, expected_hash = sized_content(SIZE_1MB, seed=102)
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="int-1mb"
|
|
)
|
|
assert result["artifact_id"] == expected_hash
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/int-1mb",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert compute_sha256(response.content) == expected_hash
|
|
assert response.headers.get("X-Checksum-SHA256") == expected_hash
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.slow
|
|
def test_integrity_10mb(self, integration_client, test_package, sized_content):
|
|
"""Test integrity verification for 10MB file."""
|
|
project, package = test_package
|
|
content, expected_hash = sized_content(SIZE_10MB, seed=103)
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="int-10mb"
|
|
)
|
|
assert result["artifact_id"] == expected_hash
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/int-10mb",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert compute_sha256(response.content) == expected_hash
|
|
assert response.headers.get("X-Checksum-SHA256") == expected_hash
|
|
|
|
|
|
class TestConsistencyCheck:
|
|
"""Tests for the admin consistency check endpoint."""
|
|
|
|
@pytest.mark.integration
|
|
def test_consistency_check_returns_200(self, integration_client):
|
|
"""Test consistency check endpoint returns 200."""
|
|
response = integration_client.get("/api/v1/admin/consistency-check")
|
|
assert response.status_code == 200
|
|
|
|
@pytest.mark.integration
|
|
def test_consistency_check_response_format(self, integration_client):
|
|
"""Test consistency check returns expected response format."""
|
|
response = integration_client.get("/api/v1/admin/consistency-check")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Check expected fields
|
|
assert "total_artifacts_checked" in data
|
|
assert "orphaned_s3_objects" in data
|
|
assert "missing_s3_objects" in data
|
|
assert "size_mismatches" in data
|
|
assert "healthy" in data
|
|
assert "orphaned_s3_keys" in data
|
|
assert "missing_s3_keys" in data
|
|
assert "size_mismatch_artifacts" in data
|
|
# Verify types
|
|
assert isinstance(data["total_artifacts_checked"], int)
|
|
assert isinstance(data["orphaned_s3_objects"], int)
|
|
assert isinstance(data["missing_s3_objects"], int)
|
|
assert isinstance(data["size_mismatches"], int)
|
|
assert isinstance(data["healthy"], bool)
|
|
assert isinstance(data["orphaned_s3_keys"], list)
|
|
assert isinstance(data["missing_s3_keys"], list)
|
|
assert isinstance(data["size_mismatch_artifacts"], list)
|
|
|
|
@pytest.mark.integration
|
|
def test_consistency_check_after_upload(self, integration_client, test_package):
|
|
"""Test consistency check runs successfully after a valid upload.
|
|
|
|
Note: We don't assert healthy=True because other tests (especially
|
|
corruption detection tests) may leave orphaned S3 objects behind.
|
|
This test validates the consistency check endpoint works and the
|
|
uploaded artifact is included in the check count.
|
|
"""
|
|
project, package = test_package
|
|
content = b"Consistency check test content"
|
|
|
|
# Upload artifact
|
|
upload_test_file(integration_client, project, package, content)
|
|
|
|
# Run consistency check
|
|
response = integration_client.get("/api/v1/admin/consistency-check")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Verify check ran - at least 1 artifact was checked
|
|
assert data["total_artifacts_checked"] >= 1
|
|
# Verify no missing S3 objects (uploaded artifact should exist)
|
|
assert data["missing_s3_objects"] == 0
|
|
|
|
@pytest.mark.integration
|
|
def test_consistency_check_limit_parameter(self, integration_client):
|
|
"""Test consistency check respects limit parameter."""
|
|
response = integration_client.get(
|
|
"/api/v1/admin/consistency-check",
|
|
params={"limit": 10}
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Lists should not exceed limit
|
|
assert len(data["orphaned_s3_keys"]) <= 10
|
|
assert len(data["missing_s3_keys"]) <= 10
|
|
assert len(data["size_mismatch_artifacts"]) <= 10
|
|
|
|
|
|
class TestDigestHeader:
|
|
"""Tests for RFC 3230 Digest header."""
|
|
|
|
@pytest.mark.integration
|
|
def test_download_includes_digest_header(self, integration_client, test_package):
|
|
"""Test download includes Digest header in RFC 3230 format."""
|
|
project, package = test_package
|
|
content = b"Digest header test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content, version="digest-test"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/digest-test",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "Digest" in response.headers
|
|
|
|
# Verify Digest format (sha-256=base64hash)
|
|
digest = response.headers["Digest"]
|
|
assert digest.startswith("sha-256=")
|
|
|
|
@pytest.mark.integration
|
|
def test_digest_header_base64_valid(self, integration_client, test_package):
|
|
"""Test Digest header contains valid base64 encoding."""
|
|
import base64
|
|
|
|
project, package = test_package
|
|
content = b"Digest base64 test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content, version="digest-b64"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/digest-b64",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
digest = response.headers["Digest"]
|
|
base64_part = digest.split("=", 1)[1]
|
|
|
|
# Should be valid base64
|
|
try:
|
|
decoded = base64.b64decode(base64_part)
|
|
assert len(decoded) == 32 # SHA256 is 32 bytes
|
|
except Exception as e:
|
|
pytest.fail(f"Invalid base64 in Digest header: {e}")
|
|
|
|
|
|
class TestVerificationModes:
|
|
"""Tests for download verification modes."""
|
|
|
|
@pytest.mark.integration
|
|
def test_pre_verification_mode(self, integration_client, test_package):
|
|
"""Test pre-verification mode verifies before streaming."""
|
|
project, package = test_package
|
|
content = b"Pre-verification mode test"
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content, version="pre-verify"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/pre-verify",
|
|
params={"mode": "proxy", "verify": "true", "verify_mode": "pre"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.content == content
|
|
|
|
# X-Verified header should be true
|
|
assert response.headers.get("X-Verified") == "true"
|
|
|
|
@pytest.mark.integration
|
|
def test_stream_verification_mode(self, integration_client, test_package):
|
|
"""Test streaming verification mode."""
|
|
project, package = test_package
|
|
content = b"Stream verification mode test"
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content, version="stream-verify"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/stream-verify",
|
|
params={"mode": "proxy", "verify": "true", "verify_mode": "stream"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.content == content
|
|
|
|
|
|
class TestArtifactIntegrityEndpoint:
|
|
"""Tests for artifact-specific integrity operations."""
|
|
|
|
@pytest.mark.integration
|
|
def test_artifact_size_matches(self, integration_client, test_package):
|
|
"""Test artifact endpoint returns correct size."""
|
|
project, package = test_package
|
|
content = b"Artifact size test content"
|
|
expected_size = len(content)
|
|
|
|
result = upload_test_file(integration_client, project, package, content)
|
|
artifact_id = result["artifact_id"]
|
|
|
|
response = integration_client.get(f"/api/v1/artifact/{artifact_id}")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["size"] == expected_size
|
|
|
|
@pytest.mark.integration
|
|
def test_content_length_header_matches_size(self, integration_client, test_package):
|
|
"""Test Content-Length header matches artifact size."""
|
|
project, package = test_package
|
|
content = b"Content-Length header test"
|
|
expected_size = len(content)
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content, version="content-len"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/content-len",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert int(response.headers.get("Content-Length", 0)) == expected_size
|
|
assert len(response.content) == expected_size
|
|
|
|
|
|
@pytest.mark.requires_direct_s3
|
|
class TestCorruptionDetection:
|
|
"""Tests for detecting corrupted S3 objects.
|
|
|
|
These tests directly manipulate S3 objects to simulate corruption
|
|
and verify that the system can detect hash mismatches.
|
|
|
|
Note: These tests require direct S3/MinIO access and are skipped in CI
|
|
where S3 is not directly accessible from the test runner.
|
|
"""
|
|
|
|
@pytest.mark.integration
|
|
def test_detection_of_corrupted_content(self, integration_client, test_package):
|
|
"""Test that corrupted S3 content is detected via hash mismatch.
|
|
|
|
Uploads content, then directly modifies the S3 object, then
|
|
verifies that the downloaded content hash doesn't match.
|
|
"""
|
|
project, package = test_package
|
|
content = b"Original content for corruption test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
# Upload original content
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="corrupt-test"
|
|
)
|
|
assert result["artifact_id"] == expected_hash
|
|
|
|
# Get the S3 object and corrupt it
|
|
s3_client = get_s3_client()
|
|
bucket = get_s3_bucket()
|
|
s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
|
|
|
|
# Replace with corrupted content
|
|
corrupted_content = b"Corrupted content - different from original!"
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=corrupted_content)
|
|
|
|
# Download via proxy (bypasses hash verification)
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/corrupt-test",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# Verify the downloaded content doesn't match original hash
|
|
downloaded_hash = compute_sha256(response.content)
|
|
assert downloaded_hash != expected_hash, "Corruption was not detected - hashes match"
|
|
assert response.content == corrupted_content
|
|
|
|
# The X-Checksum-SHA256 header should still show the original hash (from DB)
|
|
# but the actual content hash is different
|
|
header_hash = response.headers.get("X-Checksum-SHA256")
|
|
assert header_hash == expected_hash # Header shows expected hash
|
|
assert downloaded_hash != header_hash # But content is corrupted
|
|
|
|
# Restore original content for cleanup
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content)
|
|
|
|
@pytest.mark.integration
|
|
def test_detection_of_single_bit_flip(self, integration_client, test_package):
|
|
"""Test detection of a single bit flip in S3 object content."""
|
|
project, package = test_package
|
|
content = b"Content for single bit flip detection test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="bitflip-test"
|
|
)
|
|
assert result["artifact_id"] == expected_hash
|
|
|
|
# Get S3 object and flip a single bit
|
|
s3_client = get_s3_client()
|
|
bucket = get_s3_bucket()
|
|
s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
|
|
|
|
# Flip the first bit of the first byte
|
|
corrupted_content = bytearray(content)
|
|
corrupted_content[0] ^= 0x01
|
|
corrupted_content = bytes(corrupted_content)
|
|
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=corrupted_content)
|
|
|
|
# Download and verify hash mismatch
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/bitflip-test",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
downloaded_hash = compute_sha256(response.content)
|
|
assert downloaded_hash != expected_hash, "Single bit flip not detected"
|
|
|
|
# Restore original
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content)
|
|
|
|
@pytest.mark.integration
|
|
def test_detection_of_truncated_content(self, integration_client, test_package):
|
|
"""Test detection of truncated S3 object."""
|
|
project, package = test_package
|
|
content = b"This is content that will be truncated for testing purposes"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="truncate-test"
|
|
)
|
|
assert result["artifact_id"] == expected_hash
|
|
|
|
# Get S3 object and truncate it
|
|
s3_client = get_s3_client()
|
|
bucket = get_s3_bucket()
|
|
s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
|
|
|
|
# Truncate to half the original size
|
|
truncated_content = content[: len(content) // 2]
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=truncated_content)
|
|
|
|
# Download and verify hash mismatch
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/truncate-test",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
downloaded_hash = compute_sha256(response.content)
|
|
assert downloaded_hash != expected_hash, "Truncation not detected"
|
|
assert len(response.content) < len(content), "Content was not truncated"
|
|
|
|
# Restore original
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content)
|
|
|
|
@pytest.mark.integration
|
|
def test_detection_of_appended_content(self, integration_client, test_package):
|
|
"""Test detection of content with extra bytes appended."""
|
|
project, package = test_package
|
|
content = b"Original content"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="append-test"
|
|
)
|
|
assert result["artifact_id"] == expected_hash
|
|
|
|
# Get S3 object and append extra bytes
|
|
s3_client = get_s3_client()
|
|
bucket = get_s3_bucket()
|
|
s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
|
|
|
|
appended_content = content + b" - extra bytes appended"
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=appended_content)
|
|
|
|
# Download and verify hash mismatch
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/append-test",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
downloaded_hash = compute_sha256(response.content)
|
|
assert downloaded_hash != expected_hash, "Appended content not detected"
|
|
assert len(response.content) > len(content), "Content was not extended"
|
|
|
|
# Restore original
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content)
|
|
|
|
@pytest.mark.integration
|
|
def test_client_detects_hash_mismatch_post_download(
|
|
self, integration_client, test_package
|
|
):
|
|
"""Test that a client can detect hash mismatch after downloading corrupted content.
|
|
|
|
This simulates the full client verification workflow:
|
|
1. Download content
|
|
2. Get expected hash from header
|
|
3. Compute actual hash of content
|
|
4. Verify they match (or detect corruption)
|
|
"""
|
|
project, package = test_package
|
|
content = b"Content for client-side corruption detection"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="client-detect"
|
|
)
|
|
|
|
# Corrupt the S3 object
|
|
s3_client = get_s3_client()
|
|
bucket = get_s3_bucket()
|
|
s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
|
|
corrupted = b"This is completely different content"
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=corrupted)
|
|
|
|
# Simulate client download and verification
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/client-detect",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# Client gets expected hash from header
|
|
header_hash = response.headers.get("X-Checksum-SHA256")
|
|
|
|
# Client computes hash of downloaded content
|
|
actual_hash = compute_sha256(response.content)
|
|
|
|
# Client detects the mismatch
|
|
corruption_detected = actual_hash != header_hash
|
|
assert corruption_detected, "Client should detect hash mismatch"
|
|
|
|
# Restore original
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content)
|
|
|
|
@pytest.mark.integration
|
|
def test_consistency_check_detects_size_mismatch(
|
|
self, integration_client, test_package, unique_test_id
|
|
):
|
|
"""Test that consistency check detects size mismatches.
|
|
|
|
Uploads content, modifies S3 object size, then runs consistency check.
|
|
"""
|
|
project, package = test_package
|
|
content = b"Content for size mismatch consistency check test " + unique_test_id.encode()
|
|
expected_hash = compute_sha256(content)
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="size-mismatch"
|
|
)
|
|
|
|
# Modify S3 object to have different size
|
|
s3_client = get_s3_client()
|
|
bucket = get_s3_bucket()
|
|
s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
|
|
different_size_content = content + b"extra extra extra"
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=different_size_content)
|
|
|
|
# Run consistency check
|
|
response = integration_client.get("/api/v1/admin/consistency-check")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Should detect the size mismatch
|
|
assert data["size_mismatches"] >= 1 or len(data["size_mismatch_artifacts"]) >= 1
|
|
|
|
# Restore original
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content)
|
|
|
|
@pytest.mark.integration
|
|
def test_consistency_check_detects_missing_s3_object(
|
|
self, integration_client, test_package, unique_test_id
|
|
):
|
|
"""Test that consistency check detects missing S3 objects.
|
|
|
|
Uploads content, deletes S3 object, then runs consistency check.
|
|
"""
|
|
project, package = test_package
|
|
content = b"Content for missing S3 object test " + unique_test_id.encode()
|
|
expected_hash = compute_sha256(content)
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="missing-s3"
|
|
)
|
|
|
|
# Delete the S3 object
|
|
s3_client = get_s3_client()
|
|
bucket = get_s3_bucket()
|
|
s3_key = f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
|
|
s3_client.delete_object(Bucket=bucket, Key=s3_key)
|
|
|
|
# Run consistency check
|
|
response = integration_client.get("/api/v1/admin/consistency-check")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
|
|
# Should detect the missing S3 object
|
|
assert data["missing_s3_objects"] >= 1 or len(data["missing_s3_keys"]) >= 1
|
|
|
|
# Restore the object for cleanup
|
|
s3_client.put_object(Bucket=bucket, Key=s3_key, Body=content)
|