""" Integration tests for upload and download API endpoints. Tests cover: - Upload functionality and deduplication - Download by tag and artifact ID - Concurrent upload handling - File size validation - Upload failure cleanup - S3 storage verification """ import os import pytest import io import threading from concurrent.futures import ThreadPoolExecutor, as_completed from tests.factories import ( compute_sha256, upload_test_file, list_s3_objects_by_hash, s3_object_exists, ) class TestUploadBasics: """Tests for basic upload functionality.""" @pytest.mark.integration def test_upload_returns_200(self, integration_client, test_package): """Test upload with valid file returns 200.""" project, package = test_package content = b"valid file upload test" files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, ) assert response.status_code == 200 @pytest.mark.integration def test_upload_returns_artifact_id(self, integration_client, test_package): """Test upload returns the artifact ID (SHA256 hash).""" project_name, package_name = test_package content = b"basic upload test" expected_hash = compute_sha256(content) result = upload_test_file( integration_client, project_name, package_name, content, version="v1" ) assert result["artifact_id"] == expected_hash @pytest.mark.integration def test_upload_response_has_upload_id(self, integration_client, test_package): """Test upload response includes upload_id.""" project_name, package_name = test_package result = upload_test_file( integration_client, project_name, package_name, b"upload id test", "uploadid.txt", ) assert "upload_id" in result assert result["upload_id"] is not None @pytest.mark.integration def test_upload_response_has_content_type(self, integration_client, test_package): """Test upload response includes content_type.""" project_name, package_name = test_package result = upload_test_file( integration_client, project_name, package_name, b"content type test", "content.txt", ) assert "content_type" in result @pytest.mark.integration def test_upload_response_has_original_name(self, integration_client, test_package): """Test upload response includes original_name.""" project_name, package_name = test_package result = upload_test_file( integration_client, project_name, package_name, b"original name test", "originalname.txt", ) assert "original_name" in result assert result["original_name"] == "originalname.txt" @pytest.mark.integration def test_upload_response_has_created_at(self, integration_client, test_package): """Test upload response includes created_at.""" project_name, package_name = test_package result = upload_test_file( integration_client, project_name, package_name, b"created at test", "createdat.txt", ) assert "created_at" in result assert result["created_at"] is not None @pytest.mark.integration def test_upload_without_version_succeeds(self, integration_client, test_package): """Test upload without version succeeds (no version created).""" project, package = test_package content = b"upload without version test" expected_hash = compute_sha256(content) files = {"file": ("no_version.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, # No version parameter ) assert response.status_code == 200 result = response.json() assert result["artifact_id"] == expected_hash # Version should be None when not specified assert result.get("version") is None @pytest.mark.integration def test_upload_creates_artifact_in_database(self, integration_client, test_package): """Test upload creates artifact record in database.""" project, package = test_package content = b"database artifact test" expected_hash = compute_sha256(content) upload_test_file(integration_client, project, package, content) # Verify artifact exists via API response = integration_client.get(f"/api/v1/artifact/{expected_hash}") assert response.status_code == 200 artifact = response.json() assert artifact["id"] == expected_hash assert artifact["size"] == len(content) @pytest.mark.integration @pytest.mark.requires_direct_s3 def test_upload_creates_object_in_s3(self, integration_client, test_package): """Test upload creates object in S3 storage.""" project, package = test_package content = b"s3 object creation test" expected_hash = compute_sha256(content) upload_test_file(integration_client, project, package, content) # Verify S3 object exists assert s3_object_exists(expected_hash), "S3 object should exist after upload" @pytest.mark.integration def test_upload_with_version_creates_version_record(self, integration_client, test_package): """Test upload with version creates version record.""" project, package = test_package content = b"version creation test" expected_hash = compute_sha256(content) version_name = "1.0.0" result = upload_test_file( integration_client, project, package, content, version=version_name ) # Verify version was created assert result.get("version") == version_name assert result["artifact_id"] == expected_hash # Verify version exists in versions list versions_response = integration_client.get( f"/api/v1/project/{project}/{package}/versions" ) assert versions_response.status_code == 200 versions = versions_response.json() version_names = [v["version"] for v in versions.get("items", [])] assert version_name in version_names class TestDuplicateUploads: """Tests for duplicate upload deduplication behavior.""" @pytest.mark.integration def test_same_file_twice_returns_same_artifact_id( self, integration_client, test_package ): """Test uploading same file twice returns same artifact_id.""" project, package = test_package content = b"content uploaded twice for same artifact test" expected_hash = compute_sha256(content) # First upload result1 = upload_test_file( integration_client, project, package, content, version="first" ) assert result1["artifact_id"] == expected_hash # Second upload result2 = upload_test_file( integration_client, project, package, content, version="second" ) assert result2["artifact_id"] == expected_hash assert result1["artifact_id"] == result2["artifact_id"] @pytest.mark.integration def test_same_file_twice_returns_existing_version( self, integration_client, test_package ): """Test uploading same file twice in same package returns existing version. Same artifact can only have one version per package. Uploading the same content with a different version name returns the existing version, not a new one. ref_count stays at 1 because there's still only one PackageVersion reference. """ project, package = test_package content = b"content for ref count increment test" # First upload result1 = upload_test_file( integration_client, project, package, content, version="v1" ) assert result1["ref_count"] == 1 # Second upload with different version name returns existing version result2 = upload_test_file( integration_client, project, package, content, version="v2" ) # Same artifact, same package = same version returned, ref_count stays 1 assert result2["ref_count"] == 1 assert result2["deduplicated"] is True assert result1["version"] == result2["version"] # Both return "v1" @pytest.mark.integration def test_same_file_different_packages_shares_artifact( self, integration_client, test_project, unique_test_id ): """Test uploading same file to different packages shares artifact.""" project = test_project content = f"content shared across packages {unique_test_id}".encode() expected_hash = compute_sha256(content) # Create two packages pkg1 = f"package-a-{unique_test_id}" pkg2 = f"package-b-{unique_test_id}" integration_client.post( f"/api/v1/project/{project}/packages", json={"name": pkg1, "description": "Package A"}, ) integration_client.post( f"/api/v1/project/{project}/packages", json={"name": pkg2, "description": "Package B"}, ) # Upload to first package result1 = upload_test_file(integration_client, project, pkg1, content, version="v1") assert result1["artifact_id"] == expected_hash assert result1["deduplicated"] is False # Upload to second package result2 = upload_test_file(integration_client, project, pkg2, content, version="v1") assert result2["artifact_id"] == expected_hash assert result2["deduplicated"] is True @pytest.mark.integration def test_same_file_different_filenames_shares_artifact( self, integration_client, test_package ): """Test uploading same file with different filenames shares artifact.""" project, package = test_package content = b"content with different filenames" expected_hash = compute_sha256(content) # Upload with filename1 result1 = upload_test_file( integration_client, project, package, content, filename="file1.bin", version="v1", ) assert result1["artifact_id"] == expected_hash # Upload with filename2 result2 = upload_test_file( integration_client, project, package, content, filename="file2.bin", version="v2", ) assert result2["artifact_id"] == expected_hash assert result2["deduplicated"] is True class TestDownload: """Tests for download functionality.""" @pytest.mark.integration def test_download_by_version(self, integration_client, test_package): """Test downloading artifact by version.""" project, package = test_package original_content = b"download by version test" upload_test_file( integration_client, project, package, original_content, version="1.0.0" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/1.0.0", params={"mode": "proxy"}, ) assert response.status_code == 200 assert response.content == original_content @pytest.mark.integration def test_download_by_artifact_id(self, integration_client, test_package): """Test downloading artifact by artifact ID.""" project, package = test_package original_content = b"download by id test" expected_hash = compute_sha256(original_content) upload_test_file(integration_client, project, package, original_content) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/artifact:{expected_hash}", params={"mode": "proxy"}, ) assert response.status_code == 200 assert response.content == original_content @pytest.mark.integration def test_download_by_version_prefix(self, integration_client, test_package): """Test downloading artifact using version: prefix.""" project, package = test_package original_content = b"download by version prefix test" upload_test_file( integration_client, project, package, original_content, version="2.0.0" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/version:2.0.0", params={"mode": "proxy"}, ) assert response.status_code == 200 assert response.content == original_content @pytest.mark.integration def test_download_nonexistent_version(self, integration_client, test_package): """Test downloading nonexistent version returns 404.""" project, package = test_package response = integration_client.get( f"/api/v1/project/{project}/{package}/+/nonexistent-version" ) assert response.status_code == 404 @pytest.mark.integration def test_download_nonexistent_artifact(self, integration_client, test_package): """Test downloading nonexistent artifact ID returns 404.""" project, package = test_package fake_hash = "0" * 64 response = integration_client.get( f"/api/v1/project/{project}/{package}/+/artifact:{fake_hash}" ) assert response.status_code == 404 @pytest.mark.integration def test_download_from_nonexistent_project(self, integration_client, unique_test_id): """Test downloading from nonexistent project returns 404.""" response = integration_client.get( f"/api/v1/project/nonexistent-project-{unique_test_id}/somepackage/+/sometag" ) assert response.status_code == 404 @pytest.mark.integration def test_download_from_nonexistent_package(self, integration_client, test_project, unique_test_id): """Test downloading from nonexistent package returns 404.""" response = integration_client.get( f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/+/sometag" ) assert response.status_code == 404 @pytest.mark.integration def test_content_matches_original(self, integration_client, test_package): """Test downloaded content matches original exactly.""" project, package = test_package original_content = b"exact content verification test data 12345" upload_test_file( integration_client, project, package, original_content, version="verify" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/verify", params={"mode": "proxy"} ) assert response.status_code == 200 assert response.content == original_content class TestDownloadHeaders: """Tests for download response headers.""" @pytest.mark.integration def test_download_content_type_header(self, integration_client, test_package): """Test download returns correct Content-Type header.""" project, package = test_package content = b"content type header test" upload_test_file( integration_client, project, package, content, filename="test.txt", version="content-type-test" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/content-type-test", params={"mode": "proxy"}, ) assert response.status_code == 200 # Content-Type should be set (either text/plain or application/octet-stream) assert "content-type" in response.headers @pytest.mark.integration def test_download_content_length_header(self, integration_client, test_package): """Test download returns correct Content-Length header.""" project, package = test_package content = b"content length header test - exactly 41 bytes!" expected_length = len(content) upload_test_file( integration_client, project, package, content, version="content-length-test" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/content-length-test", params={"mode": "proxy"}, ) assert response.status_code == 200 assert "content-length" in response.headers assert int(response.headers["content-length"]) == expected_length @pytest.mark.integration def test_download_content_disposition_header(self, integration_client, test_package): """Test download returns correct Content-Disposition header.""" project, package = test_package content = b"content disposition test" filename = "my-test-file.bin" upload_test_file( integration_client, project, package, content, filename=filename, version="disposition-test" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/disposition-test", params={"mode": "proxy"}, ) assert response.status_code == 200 assert "content-disposition" in response.headers disposition = response.headers["content-disposition"] assert "attachment" in disposition assert filename in disposition @pytest.mark.integration def test_download_checksum_headers(self, integration_client, test_package): """Test download returns checksum headers.""" project, package = test_package content = b"checksum header test content" expected_hash = compute_sha256(content) upload_test_file( integration_client, project, package, content, version="checksum-headers" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/checksum-headers", params={"mode": "proxy"}, ) assert response.status_code == 200 # Check for checksum headers assert "x-checksum-sha256" in response.headers assert response.headers["x-checksum-sha256"] == expected_hash @pytest.mark.integration def test_download_etag_header(self, integration_client, test_package): """Test download returns ETag header (artifact ID).""" project, package = test_package content = b"etag header test" expected_hash = compute_sha256(content) upload_test_file( integration_client, project, package, content, version="etag-test" ) response = integration_client.get( f"/api/v1/project/{project}/{package}/+/etag-test", params={"mode": "proxy"}, ) assert response.status_code == 200 assert "etag" in response.headers # ETag should contain the artifact ID (hash) etag = response.headers["etag"].strip('"') assert etag == expected_hash class TestConcurrentUploads: """Tests for concurrent upload handling.""" @pytest.mark.integration def test_concurrent_uploads_same_file(self, integration_client, test_project, unique_test_id): """Test concurrent uploads of same file to different packages handle deduplication correctly. Same artifact can only have one version per package, so we create multiple packages to test that concurrent uploads to different packages correctly increment ref_count. """ content = b"content for concurrent upload test" expected_hash = compute_sha256(content) num_concurrent = 5 # Create packages for each concurrent upload packages = [] for i in range(num_concurrent): pkg_name = f"concurrent-pkg-{unique_test_id}-{i}" response = integration_client.post( f"/api/v1/project/{test_project}/packages", json={"name": pkg_name}, ) assert response.status_code == 200 packages.append(pkg_name) # Create an API key for worker threads api_key_response = integration_client.post( "/api/v1/auth/keys", json={"name": f"concurrent-test-key-{unique_test_id}"}, ) assert api_key_response.status_code == 200, f"Failed to create API key: {api_key_response.text}" api_key = api_key_response.json()["key"] results = [] errors = [] def upload_worker(idx): try: from httpx import Client base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") with Client(base_url=base_url, timeout=30.0) as client: files = { "file": ( f"concurrent-{idx}.bin", io.BytesIO(content), "application/octet-stream", ) } response = client.post( f"/api/v1/project/{test_project}/{packages[idx]}/upload", files=files, data={"version": "1.0.0"}, headers={"Authorization": f"Bearer {api_key}"}, ) if response.status_code == 200: results.append(response.json()) else: errors.append(f"Status {response.status_code}: {response.text}") except Exception as e: errors.append(str(e)) with ThreadPoolExecutor(max_workers=num_concurrent) as executor: futures = [executor.submit(upload_worker, i) for i in range(num_concurrent)] for future in as_completed(futures): pass assert len(errors) == 0, f"Errors during concurrent uploads: {errors}" assert len(results) == num_concurrent # All should have same artifact_id artifact_ids = set(r["artifact_id"] for r in results) assert len(artifact_ids) == 1 assert expected_hash in artifact_ids # Verify final ref_count equals number of packages response = integration_client.get(f"/api/v1/artifact/{expected_hash}") assert response.status_code == 200 assert response.json()["ref_count"] == num_concurrent class TestFileSizeValidation: """Tests for file size limits and empty file rejection.""" @pytest.mark.integration def test_empty_file_rejected(self, integration_client, test_package): """Test empty files are rejected with appropriate error.""" project, package = test_package files = {"file": ("empty.txt", io.BytesIO(b""), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, ) assert response.status_code in [422, 400] @pytest.mark.integration def test_small_valid_file_accepted(self, integration_client, test_package): """Test small (1 byte) files are accepted.""" project, package = test_package content = b"X" result = upload_test_file( integration_client, project, package, content, version="tiny" ) assert result["artifact_id"] is not None assert result["size"] == 1 @pytest.mark.integration def test_file_size_reported_correctly( self, integration_client, test_package, unique_test_id ): """Test file size is correctly reported in response.""" project, package = test_package content = f"Test content for size check {unique_test_id}".encode() expected_size = len(content) result = upload_test_file( integration_client, project, package, content, version="size-test" ) assert result["size"] == expected_size # Also verify via artifact endpoint artifact_response = integration_client.get( f"/api/v1/artifact/{result['artifact_id']}" ) assert artifact_response.json()["size"] == expected_size class TestUploadFailureCleanup: """Tests for cleanup when uploads fail.""" @pytest.mark.integration @pytest.mark.requires_direct_s3 def test_upload_failure_invalid_project_no_orphaned_s3( self, integration_client, unique_test_id ): """Test upload to non-existent project doesn't leave orphaned S3 objects.""" content = f"content for orphan s3 test {unique_test_id}".encode() expected_hash = compute_sha256(content) files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/nonexistent-project-{unique_test_id}/nonexistent-pkg/upload", files=files, data={"version": "test"}, ) assert response.status_code == 404 # Verify no S3 object was created assert not s3_object_exists(expected_hash), ( "Orphaned S3 object found after failed upload" ) @pytest.mark.integration @pytest.mark.requires_direct_s3 def test_upload_failure_invalid_package_no_orphaned_s3( self, integration_client, test_project, unique_test_id ): """Test upload to non-existent package doesn't leave orphaned S3 objects.""" content = f"content for orphan s3 test pkg {unique_test_id}".encode() expected_hash = compute_sha256(content) files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload", files=files, data={"version": "test"}, ) assert response.status_code == 404 assert not s3_object_exists(expected_hash), ( "Orphaned S3 object found after failed upload" ) @pytest.mark.integration def test_upload_failure_no_orphaned_database_records( self, integration_client, test_project, unique_test_id ): """Test failed upload doesn't leave orphaned database records.""" content = f"content for db orphan test {unique_test_id}".encode() expected_hash = compute_sha256(content) files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload", files=files, data={"version": "test"}, ) assert response.status_code == 404 artifact_response = integration_client.get(f"/api/v1/artifact/{expected_hash}") assert artifact_response.status_code == 404, ( "Orphaned artifact record found after failed upload" ) class TestS3StorageVerification: """Tests to verify S3 storage behavior.""" @pytest.mark.integration @pytest.mark.requires_direct_s3 def test_s3_single_object_after_duplicates( self, integration_client, test_package, unique_test_id ): """Test S3 bucket contains only one object after duplicate uploads.""" project, package = test_package content = f"content for s3 object count test {unique_test_id}".encode() expected_hash = compute_sha256(content) # Upload same content multiple times for tag in ["s3test1", "s3test2", "s3test3"]: upload_test_file(integration_client, project, package, content, version=tag) # Verify only one S3 object exists s3_objects = list_s3_objects_by_hash(expected_hash) assert len(s3_objects) == 1, ( f"Expected 1 S3 object, found {len(s3_objects)}: {s3_objects}" ) # Verify object key follows expected pattern expected_key = ( f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}" ) assert s3_objects[0] == expected_key @pytest.mark.integration def test_artifact_table_single_row_after_duplicates( self, integration_client, test_project, unique_test_id ): """Test artifact table contains only one row after duplicate uploads to different packages. Same artifact can only have one version per package, so we create multiple packages to test deduplication across packages. """ content = b"content for single row test" expected_hash = compute_sha256(content) # Create 3 packages and upload same content to each for i in range(3): pkg_name = f"single-row-pkg-{unique_test_id}-{i}" integration_client.post( f"/api/v1/project/{test_project}/packages", json={"name": pkg_name}, ) upload_test_file( integration_client, test_project, pkg_name, content, version="1.0.0" ) # Query artifact response = integration_client.get(f"/api/v1/artifact/{expected_hash}") assert response.status_code == 200 artifact = response.json() assert artifact["id"] == expected_hash assert artifact["ref_count"] == 3 class TestSecurityPathTraversal: """Tests for path traversal attack prevention. Note: Orchard uses content-addressable storage where files are stored by SHA256 hash, not filename. Filenames are metadata only and never used in file path construction, so path traversal in filenames is not a security vulnerability. These tests verify the system handles unusual inputs safely. """ @pytest.mark.integration @pytest.mark.requires_direct_s3 def test_path_traversal_in_filename_stored_safely( self, integration_client, test_package ): """Test filenames with path traversal are stored safely (as metadata only).""" project, package = test_package content = b"path traversal test content" expected_hash = compute_sha256(content) files = { "file": ( "../../../etc/passwd", io.BytesIO(content), "application/octet-stream", ) } response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, data={"version": "traversal-test"}, ) assert response.status_code == 200 result = response.json() assert result["artifact_id"] == expected_hash s3_objects = list_s3_objects_by_hash(expected_hash) assert len(s3_objects) == 1 assert ".." not in s3_objects[0] @pytest.mark.integration def test_path_traversal_in_package_name(self, integration_client, test_project): """Test package names with path traversal sequences are rejected.""" response = integration_client.get( f"/api/v1/project/{test_project}/packages/../../../etc/passwd" ) assert response.status_code in [400, 404, 422] @pytest.mark.integration def test_path_traversal_in_version_name(self, integration_client, test_package): """Test version names with path traversal are handled safely.""" project, package = test_package content = b"version traversal test" files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")} response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, data={"version": "../../../etc/passwd"}, ) assert response.status_code in [200, 400, 422] @pytest.mark.integration def test_download_path_traversal_in_ref(self, integration_client, test_package): """Test download ref with path traversal is rejected.""" project, package = test_package response = integration_client.get( f"/api/v1/project/{project}/{package}/+/../../../etc/passwd" ) assert response.status_code in [400, 404, 422] class TestSecurityMalformedRequests: """Tests for malformed request handling.""" @pytest.mark.integration def test_upload_missing_file_field(self, integration_client, test_package): """Test upload without file field returns appropriate error.""" project, package = test_package response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", data={"version": "no-file"}, ) assert response.status_code == 422 @pytest.mark.integration def test_upload_null_bytes_in_filename(self, integration_client, test_package): """Test filename with null bytes is handled safely.""" project, package = test_package content = b"null byte test" files = { "file": ("test\x00.bin", io.BytesIO(content), "application/octet-stream") } response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, ) assert response.status_code in [200, 400, 422] @pytest.mark.integration def test_upload_very_long_filename(self, integration_client, test_package): """Test very long filename is handled (truncated or rejected).""" project, package = test_package content = b"long filename test" long_filename = "a" * 1000 + ".bin" files = { "file": (long_filename, io.BytesIO(content), "application/octet-stream") } response = integration_client.post( f"/api/v1/project/{project}/{package}/upload", files=files, ) assert response.status_code in [200, 400, 413, 422] @pytest.mark.integration def test_upload_special_characters_in_filename( self, integration_client, test_package ): """Test filenames with special characters are handled safely.""" project, package = test_package content = b"special char test" special_filenames = [ "test