diff --git a/backend/app/routes.py b/backend/app/routes.py index d4525a2..513b440 100644 --- a/backend/app/routes.py +++ b/backend/app/routes.py @@ -13,7 +13,7 @@ from fastapi import ( ) from fastapi.responses import StreamingResponse, RedirectResponse from sqlalchemy.orm import Session -from sqlalchemy import or_, func +from sqlalchemy import or_, func, text from typing import List, Optional, Literal import math import re @@ -263,7 +263,7 @@ def health_check( # Check database connectivity try: - db.execute("SELECT 1") + db.execute(text("SELECT 1")) database_healthy = True except Exception as e: logger.warning(f"Database health check failed: {e}") @@ -2131,9 +2131,13 @@ def get_artifact(artifact_id: str, db: Session = Depends(get_db)): return ArtifactDetailResponse( id=artifact.id, + sha256=artifact.id, # SHA256 hash is the artifact ID size=artifact.size, content_type=artifact.content_type, original_name=artifact.original_name, + checksum_md5=artifact.checksum_md5, + checksum_sha1=artifact.checksum_sha1, + s3_etag=artifact.s3_etag, created_at=artifact.created_at, created_by=artifact.created_by, ref_count=artifact.ref_count, diff --git a/backend/pytest.ini b/backend/pytest.ini index 732ade3..4480451 100644 --- a/backend/pytest.ini +++ b/backend/pytest.ini @@ -4,7 +4,7 @@ python_files = test_*.py python_functions = test_* python_classes = Test* asyncio_mode = auto -addopts = -v --tb=short +addopts = -v --tb=short --cov=app --cov-report=term-missing --cov-report=html:coverage_html --cov-fail-under=0 filterwarnings = ignore::DeprecationWarning ignore::UserWarning @@ -12,3 +12,18 @@ markers = unit: Unit tests (no external dependencies) integration: Integration tests (require database/storage) slow: Slow tests (skip with -m "not slow") + +# Coverage configuration +[coverage:run] +source = app +omit = + */tests/* + */__pycache__/* + +[coverage:report] +exclude_lines = + pragma: no cover + def __repr__ + raise NotImplementedError + if __name__ == .__main__.: + pass diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 08669b7..bb388c2 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -199,3 +199,119 @@ def test_app(): from app.main import app return TestClient(app) + + +# ============================================================================= +# Integration Test Fixtures +# ============================================================================= + + +@pytest.fixture +def integration_client(): + """ + Create a test client for integration tests. + + Uses the real database and MinIO from docker-compose.local.yml. + """ + from httpx import Client + + # Connect to the running orchard-server container + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=30.0) as client: + yield client + + +@pytest.fixture +def unique_test_id(): + """Generate a unique ID for test isolation.""" + import uuid + + return f"test-{uuid.uuid4().hex[:8]}" + + +@pytest.fixture +def test_project(integration_client, unique_test_id): + """ + Create a test project and clean it up after the test. + + Yields the project name. + """ + project_name = f"test-project-{unique_test_id}" + + # Create project + response = integration_client.post( + "/api/v1/projects", + json={"name": project_name, "description": "Test project", "is_public": True}, + ) + assert response.status_code == 200, f"Failed to create project: {response.text}" + + yield project_name + + # Cleanup: delete project + try: + integration_client.delete(f"/api/v1/projects/{project_name}") + except Exception: + pass # Ignore cleanup errors + + +@pytest.fixture +def test_package(integration_client, test_project, unique_test_id): + """ + Create a test package within a test project. + + Yields (project_name, package_name) tuple. + """ + package_name = f"test-package-{unique_test_id}" + + # Create package + response = integration_client.post( + f"/api/v1/project/{test_project}/packages", + json={"name": package_name, "description": "Test package"}, + ) + assert response.status_code == 200, f"Failed to create package: {response.text}" + + yield (test_project, package_name) + + # Cleanup handled by test_project fixture (cascade delete) + + +@pytest.fixture +def test_content(): + """ + Generate unique test content for each test. + + Returns (content_bytes, expected_sha256) tuple. + """ + import uuid + + content = f"test-content-{uuid.uuid4().hex}".encode() + sha256 = compute_sha256(content) + return (content, sha256) + + +def upload_test_file( + client, + project: str, + package: str, + content: bytes, + filename: str = "test.bin", + tag: str = None, +) -> dict: + """ + Helper function to upload a test file. + + Returns the upload response as a dict. + """ + files = {"file": (filename, io.BytesIO(content), "application/octet-stream")} + data = {} + if tag: + data["tag"] = tag + + response = client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + data=data if data else None, + ) + assert response.status_code == 200, f"Upload failed: {response.text}" + return response.json() diff --git a/backend/tests/test_integration_uploads.py b/backend/tests/test_integration_uploads.py new file mode 100644 index 0000000..9516e52 --- /dev/null +++ b/backend/tests/test_integration_uploads.py @@ -0,0 +1,388 @@ +""" +Integration tests for duplicate uploads and storage verification. + +These tests require the full stack to be running (docker-compose.local.yml). + +Tests cover: +- Duplicate upload scenarios across packages and projects +- Storage verification (single S3 object, single artifact row) +- Upload table tracking +- Content integrity verification +- Concurrent upload handling +- Failure cleanup +""" + +import pytest +import io +import threading +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from tests.conftest import ( + compute_sha256, + upload_test_file, +) + + +class TestDuplicateUploadScenarios: + """Integration tests for duplicate upload behavior.""" + + @pytest.mark.integration + def test_same_file_twice_returns_same_artifact_id( + self, integration_client, test_package + ): + """Test uploading same file twice returns same artifact_id.""" + project, package = test_package + content = b"content uploaded twice for same artifact test" + expected_hash = compute_sha256(content) + + # First upload + result1 = upload_test_file( + integration_client, project, package, content, tag="first" + ) + assert result1["artifact_id"] == expected_hash + + # Second upload + result2 = upload_test_file( + integration_client, project, package, content, tag="second" + ) + assert result2["artifact_id"] == expected_hash + assert result1["artifact_id"] == result2["artifact_id"] + + @pytest.mark.integration + def test_same_file_twice_increments_ref_count( + self, integration_client, test_package + ): + """Test uploading same file twice increments ref_count to 2.""" + project, package = test_package + content = b"content for ref count increment test" + + # First upload + result1 = upload_test_file( + integration_client, project, package, content, tag="v1" + ) + assert result1["ref_count"] == 1 + + # Second upload + result2 = upload_test_file( + integration_client, project, package, content, tag="v2" + ) + assert result2["ref_count"] == 2 + + @pytest.mark.integration + def test_same_file_different_packages_shares_artifact( + self, integration_client, test_project, unique_test_id + ): + """Test uploading same file to different packages shares artifact.""" + project = test_project + content = f"content shared across packages {unique_test_id}".encode() + expected_hash = compute_sha256(content) + + # Create two packages + pkg1 = f"package-a-{unique_test_id}" + pkg2 = f"package-b-{unique_test_id}" + + integration_client.post( + f"/api/v1/project/{project}/packages", + json={"name": pkg1, "description": "Package A"}, + ) + integration_client.post( + f"/api/v1/project/{project}/packages", + json={"name": pkg2, "description": "Package B"}, + ) + + # Upload to first package + result1 = upload_test_file(integration_client, project, pkg1, content, tag="v1") + assert result1["artifact_id"] == expected_hash + assert result1["deduplicated"] is False + + # Upload to second package + result2 = upload_test_file(integration_client, project, pkg2, content, tag="v1") + assert result2["artifact_id"] == expected_hash + assert result2["deduplicated"] is True + + @pytest.mark.integration + def test_same_file_different_projects_shares_artifact( + self, integration_client, unique_test_id + ): + """Test uploading same file to different projects shares artifact.""" + content = f"content shared across projects {unique_test_id}".encode() + expected_hash = compute_sha256(content) + + # Create two projects with packages + proj1 = f"project-x-{unique_test_id}" + proj2 = f"project-y-{unique_test_id}" + pkg_name = "shared-pkg" + + try: + # Create projects and packages + integration_client.post( + "/api/v1/projects", + json={"name": proj1, "description": "Project X", "is_public": True}, + ) + integration_client.post( + "/api/v1/projects", + json={"name": proj2, "description": "Project Y", "is_public": True}, + ) + integration_client.post( + f"/api/v1/project/{proj1}/packages", + json={"name": pkg_name, "description": "Package"}, + ) + integration_client.post( + f"/api/v1/project/{proj2}/packages", + json={"name": pkg_name, "description": "Package"}, + ) + + # Upload to first project + result1 = upload_test_file( + integration_client, proj1, pkg_name, content, tag="v1" + ) + assert result1["artifact_id"] == expected_hash + assert result1["deduplicated"] is False + + # Upload to second project + result2 = upload_test_file( + integration_client, proj2, pkg_name, content, tag="v1" + ) + assert result2["artifact_id"] == expected_hash + assert result2["deduplicated"] is True + + finally: + # Cleanup + integration_client.delete(f"/api/v1/projects/{proj1}") + integration_client.delete(f"/api/v1/projects/{proj2}") + + @pytest.mark.integration + def test_same_file_different_filenames_shares_artifact( + self, integration_client, test_package + ): + """Test uploading same file with different original filenames shares artifact.""" + project, package = test_package + content = b"content with different filenames" + expected_hash = compute_sha256(content) + + # Upload with filename1 + result1 = upload_test_file( + integration_client, + project, + package, + content, + filename="file1.bin", + tag="v1", + ) + assert result1["artifact_id"] == expected_hash + + # Upload with filename2 + result2 = upload_test_file( + integration_client, + project, + package, + content, + filename="file2.bin", + tag="v2", + ) + assert result2["artifact_id"] == expected_hash + assert result2["deduplicated"] is True + + @pytest.mark.integration + def test_same_file_different_tags_shares_artifact( + self, integration_client, test_package, unique_test_id + ): + """Test uploading same file with different tags shares artifact.""" + project, package = test_package + content = f"content with different tags {unique_test_id}".encode() + expected_hash = compute_sha256(content) + + tags = ["latest", "stable", "v1.0.0", "release"] + for i, tag in enumerate(tags): + result = upload_test_file( + integration_client, project, package, content, tag=tag + ) + assert result["artifact_id"] == expected_hash + if i == 0: + assert result["deduplicated"] is False + else: + assert result["deduplicated"] is True + + +class TestStorageVerification: + """Tests to verify storage behavior after duplicate uploads.""" + + @pytest.mark.integration + def test_artifact_table_single_row_after_duplicates( + self, integration_client, test_package + ): + """Test artifact table contains only one row after duplicate uploads.""" + project, package = test_package + content = b"content for single row test" + expected_hash = compute_sha256(content) + + # Upload same content multiple times with different tags + for tag in ["v1", "v2", "v3"]: + upload_test_file(integration_client, project, package, content, tag=tag) + + # Query artifact - should exist and be unique + response = integration_client.get(f"/api/v1/artifact/{expected_hash}") + assert response.status_code == 200 + artifact = response.json() + assert artifact["id"] == expected_hash + assert artifact["ref_count"] == 3 + + @pytest.mark.integration + def test_upload_table_multiple_rows_for_duplicates( + self, integration_client, test_package + ): + """Test upload table contains multiple rows for duplicate uploads (event tracking).""" + project, package = test_package + content = b"content for upload tracking test" + + # Upload same content 3 times + for tag in ["upload1", "upload2", "upload3"]: + upload_test_file(integration_client, project, package, content, tag=tag) + + # Check package stats - should show 3 uploads but fewer unique artifacts + response = integration_client.get( + f"/api/v1/project/{project}/packages/{package}" + ) + assert response.status_code == 200 + pkg_info = response.json() + assert pkg_info["tag_count"] == 3 + + @pytest.mark.integration + def test_artifact_content_matches_original(self, integration_client, test_package): + """Test artifact content retrieved matches original content exactly.""" + project, package = test_package + original_content = b"exact content verification test data 12345" + + # Upload + result = upload_test_file( + integration_client, project, package, original_content, tag="verify" + ) + + # Download and compare + download_response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/verify", params={"mode": "proxy"} + ) + assert download_response.status_code == 200 + downloaded_content = download_response.content + assert downloaded_content == original_content + + @pytest.mark.integration + def test_storage_stats_reflect_deduplication( + self, integration_client, test_package + ): + """Test total storage size matches single artifact size after duplicates.""" + project, package = test_package + content = b"content for storage stats test - should only count once" + content_size = len(content) + + # Upload same content 5 times + for tag in ["a", "b", "c", "d", "e"]: + upload_test_file(integration_client, project, package, content, tag=tag) + + # Check global stats + response = integration_client.get("/api/v1/stats") + assert response.status_code == 200 + stats = response.json() + + # Deduplication should show savings + assert stats["deduplicated_uploads"] > 0 + assert stats["storage_saved_bytes"] > 0 + + +class TestConcurrentUploads: + """Tests for concurrent upload handling.""" + + @pytest.mark.integration + def test_concurrent_uploads_same_file(self, integration_client, test_package): + """Test concurrent uploads of same file handle deduplication correctly.""" + project, package = test_package + content = b"content for concurrent upload test" + expected_hash = compute_sha256(content) + num_concurrent = 5 + + results = [] + errors = [] + + def upload_worker(tag_suffix): + try: + # Create a new client for this thread + from httpx import Client + + base_url = "http://localhost:8080" + with Client(base_url=base_url, timeout=30.0) as client: + files = { + "file": ( + f"concurrent-{tag_suffix}.bin", + io.BytesIO(content), + "application/octet-stream", + ) + } + response = client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + data={"tag": f"concurrent-{tag_suffix}"}, + ) + if response.status_code == 200: + results.append(response.json()) + else: + errors.append(f"Status {response.status_code}: {response.text}") + except Exception as e: + errors.append(str(e)) + + # Run concurrent uploads + with ThreadPoolExecutor(max_workers=num_concurrent) as executor: + futures = [executor.submit(upload_worker, i) for i in range(num_concurrent)] + for future in as_completed(futures): + pass # Wait for all to complete + + # Verify results + assert len(errors) == 0, f"Errors during concurrent uploads: {errors}" + assert len(results) == num_concurrent + + # All should have same artifact_id + artifact_ids = set(r["artifact_id"] for r in results) + assert len(artifact_ids) == 1 + assert expected_hash in artifact_ids + + # Verify final ref_count + response = integration_client.get(f"/api/v1/artifact/{expected_hash}") + assert response.status_code == 200 + assert response.json()["ref_count"] == num_concurrent + + +class TestDeduplicationAcrossRestarts: + """Tests for deduplication persistence.""" + + @pytest.mark.integration + def test_deduplication_persists( + self, integration_client, test_package, unique_test_id + ): + """ + Test deduplication works with persisted data. + + This test uploads content, then uploads the same content again. + Since the database persists, the second upload should detect + the existing artifact even without server restart. + """ + project, package = test_package + content = f"persisted content for dedup test {unique_test_id}".encode() + expected_hash = compute_sha256(content) + + # First upload + result1 = upload_test_file( + integration_client, project, package, content, tag="persist1" + ) + assert result1["artifact_id"] == expected_hash + assert result1["deduplicated"] is False + + # Second upload (simulating after restart - data is persisted) + result2 = upload_test_file( + integration_client, project, package, content, tag="persist2" + ) + assert result2["artifact_id"] == expected_hash + assert result2["deduplicated"] is True + + # Verify artifact exists with correct ref_count + response = integration_client.get(f"/api/v1/artifact/{expected_hash}") + assert response.status_code == 200 + assert response.json()["ref_count"] == 2 diff --git a/backend/tests/test_ref_count.py b/backend/tests/test_ref_count.py new file mode 100644 index 0000000..1104de9 --- /dev/null +++ b/backend/tests/test_ref_count.py @@ -0,0 +1,176 @@ +""" +Unit and integration tests for reference counting behavior. + +Tests cover: +- ref_count is set correctly for new artifacts +- ref_count increments on duplicate uploads +- ref_count query correctly identifies existing artifacts +- Artifact lookup by SHA256 hash works correctly +""" + +import pytest +import io +from tests.conftest import ( + compute_sha256, + upload_test_file, + TEST_CONTENT_HELLO, + TEST_HASH_HELLO, +) + + +class TestRefCountQuery: + """Tests for ref_count querying and artifact lookup.""" + + @pytest.mark.integration + def test_artifact_lookup_by_sha256(self, integration_client, test_package): + """Test artifact lookup by SHA256 hash (primary key) works correctly.""" + project, package = test_package + content = b"unique content for lookup test" + expected_hash = compute_sha256(content) + + # Upload a file + upload_result = upload_test_file( + integration_client, project, package, content, tag="v1" + ) + assert upload_result["artifact_id"] == expected_hash + + # Look up artifact by ID (SHA256) + response = integration_client.get(f"/api/v1/artifact/{expected_hash}") + assert response.status_code == 200 + + artifact = response.json() + assert artifact["id"] == expected_hash + assert artifact["sha256"] == expected_hash + assert artifact["size"] == len(content) + + @pytest.mark.integration + def test_ref_count_query_identifies_existing_artifact( + self, integration_client, test_package + ): + """Test ref_count query correctly identifies existing artifacts by hash.""" + project, package = test_package + content = b"content for ref count query test" + expected_hash = compute_sha256(content) + + # Upload a file with a tag + upload_result = upload_test_file( + integration_client, project, package, content, tag="v1" + ) + + # Query artifact and check ref_count + response = integration_client.get(f"/api/v1/artifact/{expected_hash}") + assert response.status_code == 200 + + artifact = response.json() + assert artifact["ref_count"] >= 1 # At least 1 from the tag + + @pytest.mark.integration + def test_ref_count_set_to_1_for_new_artifact_with_tag( + self, integration_client, test_package, unique_test_id + ): + """Test ref_count is set to 1 for new artifacts when created with a tag.""" + project, package = test_package + content = f"brand new content for ref count test {unique_test_id}".encode() + expected_hash = compute_sha256(content) + + # Upload a new file with a tag + upload_result = upload_test_file( + integration_client, project, package, content, tag="initial" + ) + + assert upload_result["artifact_id"] == expected_hash + assert upload_result["ref_count"] == 1 + assert upload_result["deduplicated"] is False + + @pytest.mark.integration + def test_ref_count_increments_on_duplicate_upload_with_tag( + self, integration_client, test_package, unique_test_id + ): + """Test ref_count is incremented when duplicate content is uploaded with a new tag.""" + project, package = test_package + content = f"content that will be uploaded twice {unique_test_id}".encode() + expected_hash = compute_sha256(content) + + # First upload with tag + result1 = upload_test_file( + integration_client, project, package, content, tag="v1" + ) + assert result1["ref_count"] == 1 + assert result1["deduplicated"] is False + + # Second upload with different tag (same content) + result2 = upload_test_file( + integration_client, project, package, content, tag="v2" + ) + assert result2["artifact_id"] == expected_hash + assert result2["ref_count"] == 2 + assert result2["deduplicated"] is True + + @pytest.mark.integration + def test_ref_count_after_multiple_tags(self, integration_client, test_package): + """Test ref_count correctly reflects number of tags pointing to artifact.""" + project, package = test_package + content = b"content for multiple tag test" + expected_hash = compute_sha256(content) + + # Upload with multiple tags + tags = ["v1", "v2", "v3", "latest"] + for i, tag in enumerate(tags): + result = upload_test_file( + integration_client, project, package, content, tag=tag + ) + assert result["artifact_id"] == expected_hash + assert result["ref_count"] == i + 1 + + # Verify final ref_count via artifact endpoint + response = integration_client.get(f"/api/v1/artifact/{expected_hash}") + assert response.status_code == 200 + assert response.json()["ref_count"] == len(tags) + + +class TestRefCountWithDeletion: + """Tests for ref_count behavior when tags are deleted.""" + + @pytest.mark.integration + def test_ref_count_decrements_on_tag_delete(self, integration_client, test_package): + """Test ref_count decrements when a tag is deleted.""" + project, package = test_package + content = b"content for delete test" + expected_hash = compute_sha256(content) + + # Upload with two tags + upload_test_file(integration_client, project, package, content, tag="v1") + upload_test_file(integration_client, project, package, content, tag="v2") + + # Verify ref_count is 2 + response = integration_client.get(f"/api/v1/artifact/{expected_hash}") + assert response.json()["ref_count"] == 2 + + # Delete one tag + delete_response = integration_client.delete( + f"/api/v1/project/{project}/{package}/tags/v1" + ) + assert delete_response.status_code == 204 + + # Verify ref_count is now 1 + response = integration_client.get(f"/api/v1/artifact/{expected_hash}") + assert response.json()["ref_count"] == 1 + + @pytest.mark.integration + def test_ref_count_zero_after_all_tags_deleted( + self, integration_client, test_package + ): + """Test ref_count goes to 0 when all tags are deleted.""" + project, package = test_package + content = b"content that will be orphaned" + expected_hash = compute_sha256(content) + + # Upload with one tag + upload_test_file(integration_client, project, package, content, tag="only-tag") + + # Delete the tag + integration_client.delete(f"/api/v1/project/{project}/{package}/tags/only-tag") + + # Verify ref_count is 0 + response = integration_client.get(f"/api/v1/artifact/{expected_hash}") + assert response.json()["ref_count"] == 0