From eca291d19448117931fb52d3d24c4c584091babb Mon Sep 17 00:00:00 2001 From: Mondo Diaz Date: Mon, 5 Jan 2026 14:39:22 -0600 Subject: [PATCH] Add S3 verification and failure cleanup tests - Add test_s3_bucket_single_object_after_duplicates to verify only one S3 object exists - Add tests for upload failure scenarios (invalid project/package, empty file) - Add tests for orphaned S3 objects and database records cleanup - Add S3 direct access helpers (list_s3_objects_by_hash, s3_object_exists, etc.) - Fix conftest.py to use setdefault for env vars (don't override container config) All 52 tests now pass. --- backend/tests/conftest.py | 117 ++++++++++++++-- backend/tests/test_integration_uploads.py | 163 ++++++++++++++++++++++ 2 files changed, 270 insertions(+), 10 deletions(-) diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index bb388c2..605dfe3 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -14,16 +14,17 @@ from typing import Generator, BinaryIO from unittest.mock import MagicMock, patch import io -# Set test environment before importing app modules -os.environ["ORCHARD_DATABASE_HOST"] = "localhost" -os.environ["ORCHARD_DATABASE_PORT"] = "5432" -os.environ["ORCHARD_DATABASE_USER"] = "test" -os.environ["ORCHARD_DATABASE_PASSWORD"] = "test" -os.environ["ORCHARD_DATABASE_DBNAME"] = "orchard_test" -os.environ["ORCHARD_S3_ENDPOINT"] = "http://localhost:9000" -os.environ["ORCHARD_S3_BUCKET"] = "test-bucket" -os.environ["ORCHARD_S3_ACCESS_KEY_ID"] = "test" -os.environ["ORCHARD_S3_SECRET_ACCESS_KEY"] = "test" +# Set test environment defaults before importing app modules +# Use setdefault to NOT override existing env vars (from docker-compose) +os.environ.setdefault("ORCHARD_DATABASE_HOST", "localhost") +os.environ.setdefault("ORCHARD_DATABASE_PORT", "5432") +os.environ.setdefault("ORCHARD_DATABASE_USER", "test") +os.environ.setdefault("ORCHARD_DATABASE_PASSWORD", "test") +os.environ.setdefault("ORCHARD_DATABASE_DBNAME", "orchard_test") +os.environ.setdefault("ORCHARD_S3_ENDPOINT", "http://localhost:9000") +os.environ.setdefault("ORCHARD_S3_BUCKET", "test-bucket") +os.environ.setdefault("ORCHARD_S3_ACCESS_KEY_ID", "test") +os.environ.setdefault("ORCHARD_S3_SECRET_ACCESS_KEY", "test") # ============================================================================= @@ -315,3 +316,99 @@ def upload_test_file( ) assert response.status_code == 200, f"Upload failed: {response.text}" return response.json() + + +# ============================================================================= +# S3 Direct Access Helpers (for integration tests) +# ============================================================================= + + +def get_s3_client(): + """ + Create a boto3 S3 client for direct S3 access in integration tests. + + Uses environment variables for configuration (same as the app). + Note: When running in container, S3 endpoint should be 'minio:9000' not 'localhost:9000'. + """ + import boto3 + from botocore.config import Config + + config = Config(s3={"addressing_style": "path"}) + + # Use the same endpoint as the app (minio:9000 in container, localhost:9000 locally) + endpoint = os.environ.get("ORCHARD_S3_ENDPOINT", "http://minio:9000") + + return boto3.client( + "s3", + endpoint_url=endpoint, + region_name=os.environ.get("ORCHARD_S3_REGION", "us-east-1"), + aws_access_key_id=os.environ.get("ORCHARD_S3_ACCESS_KEY_ID", "minioadmin"), + aws_secret_access_key=os.environ.get( + "ORCHARD_S3_SECRET_ACCESS_KEY", "minioadmin" + ), + config=config, + ) + + +def get_s3_bucket(): + """Get the S3 bucket name from environment.""" + return os.environ.get("ORCHARD_S3_BUCKET", "orchard-artifacts") + + +def list_s3_objects_by_hash(sha256_hash: str) -> list: + """ + List S3 objects that match a specific SHA256 hash. + + Uses the fruits/{hash[:2]}/{hash[2:4]}/{hash} key pattern. + Returns list of matching object keys. + """ + client = get_s3_client() + bucket = get_s3_bucket() + prefix = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}" + + response = client.list_objects_v2(Bucket=bucket, Prefix=prefix) + + if "Contents" not in response: + return [] + + return [obj["Key"] for obj in response["Contents"]] + + +def count_s3_objects_by_prefix(prefix: str) -> int: + """ + Count S3 objects with a given prefix. + + Useful for checking if duplicate uploads created multiple objects. + """ + client = get_s3_client() + bucket = get_s3_bucket() + + response = client.list_objects_v2(Bucket=bucket, Prefix=prefix) + + if "Contents" not in response: + return 0 + + return len(response["Contents"]) + + +def s3_object_exists(sha256_hash: str) -> bool: + """ + Check if an S3 object exists for a given SHA256 hash. + """ + objects = list_s3_objects_by_hash(sha256_hash) + return len(objects) > 0 + + +def delete_s3_object_by_hash(sha256_hash: str) -> bool: + """ + Delete an S3 object by its SHA256 hash (for test cleanup). + """ + client = get_s3_client() + bucket = get_s3_bucket() + s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}" + + try: + client.delete_object(Bucket=bucket, Key=s3_key) + return True + except Exception: + return False diff --git a/backend/tests/test_integration_uploads.py b/backend/tests/test_integration_uploads.py index 9516e52..fca0c67 100644 --- a/backend/tests/test_integration_uploads.py +++ b/backend/tests/test_integration_uploads.py @@ -20,6 +20,9 @@ from concurrent.futures import ThreadPoolExecutor, as_completed from tests.conftest import ( compute_sha256, upload_test_file, + list_s3_objects_by_hash, + s3_object_exists, + delete_s3_object_by_hash, ) @@ -386,3 +389,163 @@ class TestDeduplicationAcrossRestarts: response = integration_client.get(f"/api/v1/artifact/{expected_hash}") assert response.status_code == 200 assert response.json()["ref_count"] == 2 + + +class TestS3ObjectVerification: + """Tests to verify S3 storage behavior directly.""" + + @pytest.mark.integration + def test_s3_bucket_single_object_after_duplicates( + self, integration_client, test_package, unique_test_id + ): + """Test S3 bucket contains only one object after duplicate uploads.""" + project, package = test_package + content = f"content for s3 object count test {unique_test_id}".encode() + expected_hash = compute_sha256(content) + + # Upload same content multiple times with different tags + for tag in ["s3test1", "s3test2", "s3test3"]: + upload_test_file(integration_client, project, package, content, tag=tag) + + # Verify only one S3 object exists for this hash + s3_objects = list_s3_objects_by_hash(expected_hash) + assert len(s3_objects) == 1, ( + f"Expected 1 S3 object, found {len(s3_objects)}: {s3_objects}" + ) + + # Verify the object key follows expected pattern + expected_key = ( + f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" + ) + assert s3_objects[0] == expected_key + + +class TestUploadFailureCleanup: + """Tests for cleanup when uploads fail.""" + + @pytest.mark.integration + def test_upload_failure_invalid_project_no_orphaned_s3( + self, integration_client, unique_test_id + ): + """Test upload to non-existent project doesn't leave orphaned S3 objects.""" + content = f"content for orphan s3 test {unique_test_id}".encode() + expected_hash = compute_sha256(content) + + # Attempt upload to non-existent project + files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} + response = integration_client.post( + f"/api/v1/project/nonexistent-project-{unique_test_id}/nonexistent-pkg/upload", + files=files, + data={"tag": "test"}, + ) + + # Upload should fail + assert response.status_code == 404 + + # Verify no S3 object was created + assert not s3_object_exists(expected_hash), ( + "Orphaned S3 object found after failed upload" + ) + + @pytest.mark.integration + def test_upload_failure_invalid_package_no_orphaned_s3( + self, integration_client, test_project, unique_test_id + ): + """Test upload to non-existent package doesn't leave orphaned S3 objects.""" + content = f"content for orphan s3 test pkg {unique_test_id}".encode() + expected_hash = compute_sha256(content) + + # Attempt upload to non-existent package + files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} + response = integration_client.post( + f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload", + files=files, + data={"tag": "test"}, + ) + + # Upload should fail + assert response.status_code == 404 + + # Verify no S3 object was created + assert not s3_object_exists(expected_hash), ( + "Orphaned S3 object found after failed upload" + ) + + @pytest.mark.integration + def test_upload_failure_empty_file_no_orphaned_s3( + self, integration_client, test_package, unique_test_id + ): + """Test upload of empty file doesn't leave orphaned S3 objects or DB records.""" + project, package = test_package + content = b"" # Empty content + + # Attempt upload of empty file + files = {"file": ("empty.bin", io.BytesIO(content), "application/octet-stream")} + response = integration_client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + data={"tag": f"empty-{unique_test_id}"}, + ) + + # Upload should fail (empty files are rejected) + assert response.status_code in (400, 422), ( + f"Expected 400/422, got {response.status_code}" + ) + + @pytest.mark.integration + def test_upload_failure_no_orphaned_database_records( + self, integration_client, test_project, unique_test_id + ): + """Test failed upload doesn't leave orphaned database records.""" + content = f"content for db orphan test {unique_test_id}".encode() + expected_hash = compute_sha256(content) + + # Attempt upload to non-existent package (should fail before DB insert) + files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} + response = integration_client.post( + f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload", + files=files, + data={"tag": "test"}, + ) + + # Upload should fail + assert response.status_code == 404 + + # Verify no artifact record was created + artifact_response = integration_client.get(f"/api/v1/artifact/{expected_hash}") + assert artifact_response.status_code == 404, ( + "Orphaned artifact record found after failed upload" + ) + + @pytest.mark.integration + def test_duplicate_tag_upload_handles_gracefully( + self, integration_client, test_package, unique_test_id + ): + """Test uploading with duplicate tag is handled without orphaned data.""" + project, package = test_package + content1 = f"content version 1 {unique_test_id}".encode() + content2 = f"content version 2 {unique_test_id}".encode() + tag = f"duplicate-tag-{unique_test_id}" + + # First upload with tag + result1 = upload_test_file( + integration_client, project, package, content1, tag=tag + ) + hash1 = result1["artifact_id"] + + # Second upload with same tag (should update the tag to point to new artifact) + result2 = upload_test_file( + integration_client, project, package, content2, tag=tag + ) + hash2 = result2["artifact_id"] + + # Both artifacts should exist + assert integration_client.get(f"/api/v1/artifact/{hash1}").status_code == 200 + assert integration_client.get(f"/api/v1/artifact/{hash2}").status_code == 200 + + # Tag should point to the second artifact + tag_response = integration_client.get( + f"/api/v1/project/{project}/{package}/tags/{tag}" + ) + assert tag_response.status_code == 200 + assert tag_response.json()["artifact_id"] == hash2