Add S3 verification and failure cleanup tests
- Add test_s3_bucket_single_object_after_duplicates to verify only one S3 object exists - Add tests for upload failure scenarios (invalid project/package, empty file) - Add tests for orphaned S3 objects and database records cleanup - Add S3 direct access helpers (list_s3_objects_by_hash, s3_object_exists, etc.) - Fix conftest.py to use setdefault for env vars (don't override container config) All 52 tests now pass.
This commit is contained in:
@@ -20,6 +20,9 @@ from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from tests.conftest import (
|
||||
compute_sha256,
|
||||
upload_test_file,
|
||||
list_s3_objects_by_hash,
|
||||
s3_object_exists,
|
||||
delete_s3_object_by_hash,
|
||||
)
|
||||
|
||||
|
||||
@@ -386,3 +389,163 @@ class TestDeduplicationAcrossRestarts:
|
||||
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert response.status_code == 200
|
||||
assert response.json()["ref_count"] == 2
|
||||
|
||||
|
||||
class TestS3ObjectVerification:
|
||||
"""Tests to verify S3 storage behavior directly."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_s3_bucket_single_object_after_duplicates(
|
||||
self, integration_client, test_package, unique_test_id
|
||||
):
|
||||
"""Test S3 bucket contains only one object after duplicate uploads."""
|
||||
project, package = test_package
|
||||
content = f"content for s3 object count test {unique_test_id}".encode()
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Upload same content multiple times with different tags
|
||||
for tag in ["s3test1", "s3test2", "s3test3"]:
|
||||
upload_test_file(integration_client, project, package, content, tag=tag)
|
||||
|
||||
# Verify only one S3 object exists for this hash
|
||||
s3_objects = list_s3_objects_by_hash(expected_hash)
|
||||
assert len(s3_objects) == 1, (
|
||||
f"Expected 1 S3 object, found {len(s3_objects)}: {s3_objects}"
|
||||
)
|
||||
|
||||
# Verify the object key follows expected pattern
|
||||
expected_key = (
|
||||
f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
|
||||
)
|
||||
assert s3_objects[0] == expected_key
|
||||
|
||||
|
||||
class TestUploadFailureCleanup:
|
||||
"""Tests for cleanup when uploads fail."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_failure_invalid_project_no_orphaned_s3(
|
||||
self, integration_client, unique_test_id
|
||||
):
|
||||
"""Test upload to non-existent project doesn't leave orphaned S3 objects."""
|
||||
content = f"content for orphan s3 test {unique_test_id}".encode()
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Attempt upload to non-existent project
|
||||
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/nonexistent-project-{unique_test_id}/nonexistent-pkg/upload",
|
||||
files=files,
|
||||
data={"tag": "test"},
|
||||
)
|
||||
|
||||
# Upload should fail
|
||||
assert response.status_code == 404
|
||||
|
||||
# Verify no S3 object was created
|
||||
assert not s3_object_exists(expected_hash), (
|
||||
"Orphaned S3 object found after failed upload"
|
||||
)
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_failure_invalid_package_no_orphaned_s3(
|
||||
self, integration_client, test_project, unique_test_id
|
||||
):
|
||||
"""Test upload to non-existent package doesn't leave orphaned S3 objects."""
|
||||
content = f"content for orphan s3 test pkg {unique_test_id}".encode()
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Attempt upload to non-existent package
|
||||
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload",
|
||||
files=files,
|
||||
data={"tag": "test"},
|
||||
)
|
||||
|
||||
# Upload should fail
|
||||
assert response.status_code == 404
|
||||
|
||||
# Verify no S3 object was created
|
||||
assert not s3_object_exists(expected_hash), (
|
||||
"Orphaned S3 object found after failed upload"
|
||||
)
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_failure_empty_file_no_orphaned_s3(
|
||||
self, integration_client, test_package, unique_test_id
|
||||
):
|
||||
"""Test upload of empty file doesn't leave orphaned S3 objects or DB records."""
|
||||
project, package = test_package
|
||||
content = b"" # Empty content
|
||||
|
||||
# Attempt upload of empty file
|
||||
files = {"file": ("empty.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"tag": f"empty-{unique_test_id}"},
|
||||
)
|
||||
|
||||
# Upload should fail (empty files are rejected)
|
||||
assert response.status_code in (400, 422), (
|
||||
f"Expected 400/422, got {response.status_code}"
|
||||
)
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_failure_no_orphaned_database_records(
|
||||
self, integration_client, test_project, unique_test_id
|
||||
):
|
||||
"""Test failed upload doesn't leave orphaned database records."""
|
||||
content = f"content for db orphan test {unique_test_id}".encode()
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Attempt upload to non-existent package (should fail before DB insert)
|
||||
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload",
|
||||
files=files,
|
||||
data={"tag": "test"},
|
||||
)
|
||||
|
||||
# Upload should fail
|
||||
assert response.status_code == 404
|
||||
|
||||
# Verify no artifact record was created
|
||||
artifact_response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert artifact_response.status_code == 404, (
|
||||
"Orphaned artifact record found after failed upload"
|
||||
)
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_duplicate_tag_upload_handles_gracefully(
|
||||
self, integration_client, test_package, unique_test_id
|
||||
):
|
||||
"""Test uploading with duplicate tag is handled without orphaned data."""
|
||||
project, package = test_package
|
||||
content1 = f"content version 1 {unique_test_id}".encode()
|
||||
content2 = f"content version 2 {unique_test_id}".encode()
|
||||
tag = f"duplicate-tag-{unique_test_id}"
|
||||
|
||||
# First upload with tag
|
||||
result1 = upload_test_file(
|
||||
integration_client, project, package, content1, tag=tag
|
||||
)
|
||||
hash1 = result1["artifact_id"]
|
||||
|
||||
# Second upload with same tag (should update the tag to point to new artifact)
|
||||
result2 = upload_test_file(
|
||||
integration_client, project, package, content2, tag=tag
|
||||
)
|
||||
hash2 = result2["artifact_id"]
|
||||
|
||||
# Both artifacts should exist
|
||||
assert integration_client.get(f"/api/v1/artifact/{hash1}").status_code == 200
|
||||
assert integration_client.get(f"/api/v1/artifact/{hash2}").status_code == 200
|
||||
|
||||
# Tag should point to the second artifact
|
||||
tag_response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/tags/{tag}"
|
||||
)
|
||||
assert tag_response.status_code == 200
|
||||
assert tag_response.json()["artifact_id"] == hash2
|
||||
|
||||
Reference in New Issue
Block a user