diff --git a/CHANGELOG.md b/CHANGELOG.md index 8aca96b..6fa8545 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Added comprehensive upload/download tests for size boundaries (1B to 1GB) +- Added concurrent upload/download tests (2, 5, 10 parallel operations) +- Added data integrity tests (binary, text, unicode, compressed content) +- Added chunk boundary tests for edge cases +- Added `@pytest.mark.large` and `@pytest.mark.concurrent` test markers +- Added `generate_content()` and `generate_content_with_hash()` test helpers +- Added `sized_content` fixture for generating test content of specific sizes - Added production deployment job triggered by semantic version tags (v1.0.0) with manual approval gate (#63) - Added production Helm values file with persistence enabled (20Gi PostgreSQL, 100Gi MinIO) (#63) - Added integration tests for production deployment (#63) @@ -31,6 +38,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Improved pod naming: Orchard pods now named `orchard-{env}-server-*` for clarity (#51) ### Fixed +- Fixed Content-Disposition header encoding for non-ASCII filenames using RFC 5987 - Fixed deploy jobs running even when tests or security scans fail (changed rules from `when: always` to `when: on_success`) (#63) - Fixed python_tests job not using internal PyPI proxy (#63) - Fixed `cleanup_feature` job failing when branch is deleted (`GIT_STRATEGY: none`) (#51) diff --git a/backend/app/routes.py b/backend/app/routes.py index 5c9e821..239363d 100644 --- a/backend/app/routes.py +++ b/backend/app/routes.py @@ -140,6 +140,31 @@ def sanitize_filename(filename: str) -> str: return re.sub(r'[\r\n"]', "", filename) +def build_content_disposition(filename: str) -> str: + """Build a Content-Disposition header value with proper encoding. + + For ASCII filenames, uses simple: attachment; filename="name" + For non-ASCII filenames, uses RFC 5987 encoding with UTF-8. + """ + from urllib.parse import quote + + sanitized = sanitize_filename(filename) + + # Check if filename is pure ASCII + try: + sanitized.encode('ascii') + # Pure ASCII - simple format + return f'attachment; filename="{sanitized}"' + except UnicodeEncodeError: + # Non-ASCII - use RFC 5987 encoding + # Provide both filename (ASCII fallback) and filename* (UTF-8 encoded) + ascii_fallback = sanitized.encode('ascii', errors='replace').decode('ascii') + # RFC 5987: filename*=charset'language'encoded_value + # We use UTF-8 encoding and percent-encode non-ASCII chars + encoded = quote(sanitized, safe='') + return f'attachment; filename="{ascii_fallback}"; filename*=UTF-8\'\'{encoded}' + + def get_user_id_from_request( request: Request, db: Session, @@ -2924,7 +2949,7 @@ def download_artifact( ) headers = { - "Content-Disposition": f'attachment; filename="{filename}"', + "Content-Disposition": build_content_disposition(filename), "Accept-Ranges": "bytes", "Content-Length": str(content_length), **checksum_headers, @@ -2942,7 +2967,7 @@ def download_artifact( # Full download with optional verification base_headers = { - "Content-Disposition": f'attachment; filename="{filename}"', + "Content-Disposition": build_content_disposition(filename), "Accept-Ranges": "bytes", **checksum_headers, } @@ -3124,7 +3149,7 @@ def head_artifact( # Build headers with checksum information headers = { - "Content-Disposition": f'attachment; filename="{filename}"', + "Content-Disposition": build_content_disposition(filename), "Accept-Ranges": "bytes", "Content-Length": str(artifact.size), "X-Artifact-Id": artifact.id, diff --git a/backend/pytest.ini b/backend/pytest.ini index 4480451..13f1367 100644 --- a/backend/pytest.ini +++ b/backend/pytest.ini @@ -12,6 +12,8 @@ markers = unit: Unit tests (no external dependencies) integration: Integration tests (require database/storage) slow: Slow tests (skip with -m "not slow") + large: Large file tests (100MB+, skip with -m "not large") + concurrent: Concurrent operation tests # Coverage configuration [coverage:run] diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index 9064602..abfaa3f 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -32,6 +32,8 @@ from tests.factories import ( compute_md5, compute_sha1, upload_test_file, + generate_content, + generate_content_with_hash, TEST_CONTENT_HELLO, TEST_HASH_HELLO, TEST_MD5_HELLO, @@ -271,3 +273,41 @@ def test_content(): content = f"test-content-{uuid.uuid4().hex}".encode() sha256 = compute_sha256(content) return (content, sha256) + + +@pytest.fixture +def sized_content(): + """ + Factory fixture for generating content of specific sizes. + + Usage: + def test_example(sized_content): + content, hash = sized_content(1024) # 1KB + content, hash = sized_content(1024 * 1024) # 1MB + """ + def _generate(size: int, seed: int = None): + return generate_content_with_hash(size, seed) + return _generate + + +# ============================================================================= +# Size Constants for Tests +# ============================================================================= + +# Common file sizes for boundary testing +SIZE_1B = 1 +SIZE_1KB = 1024 +SIZE_10KB = 10 * 1024 +SIZE_100KB = 100 * 1024 +SIZE_1MB = 1024 * 1024 +SIZE_5MB = 5 * 1024 * 1024 +SIZE_10MB = 10 * 1024 * 1024 +SIZE_50MB = 50 * 1024 * 1024 +SIZE_100MB = 100 * 1024 * 1024 +SIZE_250MB = 250 * 1024 * 1024 +SIZE_500MB = 500 * 1024 * 1024 +SIZE_1GB = 1024 * 1024 * 1024 + +# Chunk size boundaries (based on typical S3 multipart chunk sizes) +CHUNK_SIZE = 64 * 1024 # 64KB typical chunk +MULTIPART_THRESHOLD = 100 * 1024 * 1024 # 100MB multipart threshold diff --git a/backend/tests/factories.py b/backend/tests/factories.py index cd58f2a..50112ea 100644 --- a/backend/tests/factories.py +++ b/backend/tests/factories.py @@ -97,6 +97,7 @@ def upload_test_file( content: bytes, filename: str = "test.bin", tag: Optional[str] = None, + version: Optional[str] = None, ) -> dict: """ Helper function to upload a test file via the API. @@ -108,6 +109,7 @@ def upload_test_file( content: File content as bytes filename: Original filename tag: Optional tag to assign + version: Optional version to assign Returns: The upload response as a dict @@ -116,6 +118,8 @@ def upload_test_file( data = {} if tag: data["tag"] = tag + if version: + data["version"] = version response = client.post( f"/api/v1/project/{project}/{package}/upload", @@ -126,6 +130,41 @@ def upload_test_file( return response.json() +def generate_content(size: int, seed: Optional[int] = None) -> bytes: + """ + Generate deterministic or random content of a specified size. + + Args: + size: Size of content in bytes + seed: Optional seed for reproducible content (None for random) + + Returns: + Bytes of the specified size + """ + if size == 0: + return b"" + if seed is not None: + import random + rng = random.Random(seed) + return bytes(rng.randint(0, 255) for _ in range(size)) + return os.urandom(size) + + +def generate_content_with_hash(size: int, seed: Optional[int] = None) -> tuple[bytes, str]: + """ + Generate content of specified size and compute its SHA256 hash. + + Args: + size: Size of content in bytes + seed: Optional seed for reproducible content + + Returns: + Tuple of (content_bytes, sha256_hash) + """ + content = generate_content(size, seed) + return content, compute_sha256(content) + + # ============================================================================= # Project/Package Factories # ============================================================================= diff --git a/backend/tests/integration/test_concurrent_operations.py b/backend/tests/integration/test_concurrent_operations.py new file mode 100644 index 0000000..4237cf4 --- /dev/null +++ b/backend/tests/integration/test_concurrent_operations.py @@ -0,0 +1,737 @@ +""" +Integration tests for concurrent upload and download operations. + +Tests cover: +- Concurrent uploads of different files +- Concurrent uploads of same file (deduplication race) +- Concurrent downloads of same artifact +- Concurrent downloads of different artifacts +- Mixed concurrent uploads and downloads +- Data corruption prevention under concurrency +""" + +import pytest +import io +import os +from concurrent.futures import ThreadPoolExecutor, as_completed +from tests.factories import ( + compute_sha256, + upload_test_file, + generate_content_with_hash, +) + + +def get_api_key(integration_client): + """Create an API key for concurrent test workers.""" + import uuid + response = integration_client.post( + "/api/v1/auth/keys", + json={"name": f"concurrent-test-{uuid.uuid4().hex[:8]}"}, + ) + if response.status_code == 200: + return response.json()["key"] + return None + + +class TestConcurrentUploads: + """Tests for concurrent upload operations.""" + + @pytest.mark.integration + @pytest.mark.concurrent + def test_2_concurrent_uploads_different_files(self, integration_client, test_package): + """Test 2 concurrent uploads of different files.""" + project, package = test_package + api_key = get_api_key(integration_client) + assert api_key, "Failed to create API key" + + files_data = [ + generate_content_with_hash(1024, seed=i) for i in range(2) + ] + + results = [] + errors = [] + + def upload_worker(idx, content, expected_hash): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + files = { + "file": (f"file-{idx}.bin", io.BytesIO(content), "application/octet-stream") + } + response = client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + data={"tag": f"concurrent-{idx}"}, + headers={"Authorization": f"Bearer {api_key}"}, + ) + if response.status_code == 200: + result = response.json() + results.append((idx, result, expected_hash)) + else: + errors.append(f"Worker {idx}: Status {response.status_code}: {response.text}") + except Exception as e: + errors.append(f"Worker {idx}: {str(e)}") + + with ThreadPoolExecutor(max_workers=2) as executor: + futures = [ + executor.submit(upload_worker, i, content, hash) + for i, (content, hash) in enumerate(files_data) + ] + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Errors: {errors}" + assert len(results) == 2 + + # Verify each upload returned correct artifact_id + for idx, result, expected_hash in results: + assert result["artifact_id"] == expected_hash + + @pytest.mark.integration + @pytest.mark.concurrent + def test_5_concurrent_uploads_different_files(self, integration_client, test_package): + """Test 5 concurrent uploads of different files.""" + project, package = test_package + api_key = get_api_key(integration_client) + assert api_key, "Failed to create API key" + + num_files = 5 + files_data = [ + generate_content_with_hash(2048, seed=100 + i) for i in range(num_files) + ] + + results = [] + errors = [] + + def upload_worker(idx, content, expected_hash): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + files = { + "file": (f"file-{idx}.bin", io.BytesIO(content), "application/octet-stream") + } + response = client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + data={"tag": f"concurrent5-{idx}"}, + headers={"Authorization": f"Bearer {api_key}"}, + ) + if response.status_code == 200: + result = response.json() + results.append((idx, result, expected_hash)) + else: + errors.append(f"Worker {idx}: Status {response.status_code}") + except Exception as e: + errors.append(f"Worker {idx}: {str(e)}") + + with ThreadPoolExecutor(max_workers=num_files) as executor: + futures = [ + executor.submit(upload_worker, i, content, hash) + for i, (content, hash) in enumerate(files_data) + ] + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Errors: {errors}" + assert len(results) == num_files + + # Verify all uploads have unique artifact_ids + artifact_ids = set(r[1]["artifact_id"] for r in results) + assert len(artifact_ids) == num_files + + @pytest.mark.integration + @pytest.mark.concurrent + def test_10_concurrent_uploads_different_files(self, integration_client, test_package): + """Test 10 concurrent uploads of different files.""" + project, package = test_package + api_key = get_api_key(integration_client) + assert api_key, "Failed to create API key" + + num_files = 10 + files_data = [ + generate_content_with_hash(1024, seed=200 + i) for i in range(num_files) + ] + + results = [] + errors = [] + + def upload_worker(idx, content, expected_hash): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + files = { + "file": (f"file-{idx}.bin", io.BytesIO(content), "application/octet-stream") + } + response = client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + data={"tag": f"concurrent10-{idx}"}, + headers={"Authorization": f"Bearer {api_key}"}, + ) + if response.status_code == 200: + result = response.json() + results.append((idx, result, expected_hash)) + else: + errors.append(f"Worker {idx}: Status {response.status_code}") + except Exception as e: + errors.append(f"Worker {idx}: {str(e)}") + + with ThreadPoolExecutor(max_workers=num_files) as executor: + futures = [ + executor.submit(upload_worker, i, content, hash) + for i, (content, hash) in enumerate(files_data) + ] + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Errors: {errors}" + assert len(results) == num_files + + @pytest.mark.integration + @pytest.mark.concurrent + def test_concurrent_uploads_same_file_deduplication(self, integration_client, test_package): + """Test concurrent uploads of same file handle deduplication correctly.""" + project, package = test_package + api_key = get_api_key(integration_client) + assert api_key, "Failed to create API key" + + content, expected_hash = generate_content_with_hash(4096, seed=999) + num_concurrent = 5 + + results = [] + errors = [] + + def upload_worker(idx): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + files = { + "file": (f"same-{idx}.bin", io.BytesIO(content), "application/octet-stream") + } + response = client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + data={"tag": f"dedup-{idx}"}, + headers={"Authorization": f"Bearer {api_key}"}, + ) + if response.status_code == 200: + results.append(response.json()) + else: + errors.append(f"Worker {idx}: Status {response.status_code}") + except Exception as e: + errors.append(f"Worker {idx}: {str(e)}") + + with ThreadPoolExecutor(max_workers=num_concurrent) as executor: + futures = [executor.submit(upload_worker, i) for i in range(num_concurrent)] + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Errors: {errors}" + assert len(results) == num_concurrent + + # All should have same artifact_id + artifact_ids = set(r["artifact_id"] for r in results) + assert len(artifact_ids) == 1 + assert expected_hash in artifact_ids + + # Verify final ref_count equals number of uploads + response = integration_client.get(f"/api/v1/artifact/{expected_hash}") + assert response.status_code == 200 + assert response.json()["ref_count"] == num_concurrent + + @pytest.mark.integration + @pytest.mark.concurrent + def test_concurrent_uploads_to_different_packages(self, integration_client, test_project, unique_test_id): + """Test concurrent uploads to different packages.""" + project = test_project + api_key = get_api_key(integration_client) + assert api_key, "Failed to create API key" + + num_packages = 3 + package_names = [] + + # Create multiple packages + for i in range(num_packages): + pkg_name = f"pkg-{unique_test_id}-{i}" + response = integration_client.post( + f"/api/v1/project/{project}/packages", + json={"name": pkg_name, "description": f"Package {i}"}, + ) + assert response.status_code == 200 + package_names.append(pkg_name) + + files_data = [ + generate_content_with_hash(1024, seed=300 + i) for i in range(num_packages) + ] + + results = [] + errors = [] + + def upload_worker(idx, package, content, expected_hash): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + files = { + "file": (f"file-{idx}.bin", io.BytesIO(content), "application/octet-stream") + } + response = client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + data={"tag": "latest"}, + headers={"Authorization": f"Bearer {api_key}"}, + ) + if response.status_code == 200: + result = response.json() + results.append((package, result, expected_hash)) + else: + errors.append(f"Worker {idx}: Status {response.status_code}") + except Exception as e: + errors.append(f"Worker {idx}: {str(e)}") + + with ThreadPoolExecutor(max_workers=num_packages) as executor: + futures = [ + executor.submit(upload_worker, i, package_names[i], content, hash) + for i, (content, hash) in enumerate(files_data) + ] + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Errors: {errors}" + assert len(results) == num_packages + + +class TestConcurrentDownloads: + """Tests for concurrent download operations.""" + + @pytest.mark.integration + @pytest.mark.concurrent + def test_2_concurrent_downloads_same_artifact(self, integration_client, test_package): + """Test 2 concurrent downloads of same artifact.""" + project, package = test_package + content, expected_hash = generate_content_with_hash(2048, seed=400) + + # Upload first + upload_test_file(integration_client, project, package, content, tag="download-test") + + results = [] + errors = [] + + def download_worker(idx): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + response = client.get( + f"/api/v1/project/{project}/{package}/+/download-test", + params={"mode": "proxy"}, + ) + if response.status_code == 200: + results.append((idx, response.content)) + else: + errors.append(f"Worker {idx}: Status {response.status_code}") + except Exception as e: + errors.append(f"Worker {idx}: {str(e)}") + + with ThreadPoolExecutor(max_workers=2) as executor: + futures = [executor.submit(download_worker, i) for i in range(2)] + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Errors: {errors}" + assert len(results) == 2 + + # All downloads should match original + for idx, downloaded in results: + assert downloaded == content + + @pytest.mark.integration + @pytest.mark.concurrent + def test_5_concurrent_downloads_same_artifact(self, integration_client, test_package): + """Test 5 concurrent downloads of same artifact.""" + project, package = test_package + content, expected_hash = generate_content_with_hash(4096, seed=500) + + upload_test_file(integration_client, project, package, content, tag="download5-test") + + num_downloads = 5 + results = [] + errors = [] + + def download_worker(idx): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + response = client.get( + f"/api/v1/project/{project}/{package}/+/download5-test", + params={"mode": "proxy"}, + ) + if response.status_code == 200: + results.append((idx, response.content)) + else: + errors.append(f"Worker {idx}: Status {response.status_code}") + except Exception as e: + errors.append(f"Worker {idx}: {str(e)}") + + with ThreadPoolExecutor(max_workers=num_downloads) as executor: + futures = [executor.submit(download_worker, i) for i in range(num_downloads)] + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Errors: {errors}" + assert len(results) == num_downloads + + for idx, downloaded in results: + assert downloaded == content + + @pytest.mark.integration + @pytest.mark.concurrent + def test_10_concurrent_downloads_same_artifact(self, integration_client, test_package): + """Test 10 concurrent downloads of same artifact.""" + project, package = test_package + content, expected_hash = generate_content_with_hash(8192, seed=600) + + upload_test_file(integration_client, project, package, content, tag="download10-test") + + num_downloads = 10 + results = [] + errors = [] + + def download_worker(idx): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + response = client.get( + f"/api/v1/project/{project}/{package}/+/download10-test", + params={"mode": "proxy"}, + ) + if response.status_code == 200: + results.append((idx, response.content)) + else: + errors.append(f"Worker {idx}: Status {response.status_code}") + except Exception as e: + errors.append(f"Worker {idx}: {str(e)}") + + with ThreadPoolExecutor(max_workers=num_downloads) as executor: + futures = [executor.submit(download_worker, i) for i in range(num_downloads)] + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Errors: {errors}" + assert len(results) == num_downloads + + for idx, downloaded in results: + assert downloaded == content + + @pytest.mark.integration + @pytest.mark.concurrent + def test_concurrent_downloads_different_artifacts(self, integration_client, test_package): + """Test concurrent downloads of different artifacts.""" + project, package = test_package + + # Upload multiple files + num_files = 5 + uploads = [] + for i in range(num_files): + content, expected_hash = generate_content_with_hash(1024, seed=700 + i) + upload_test_file( + integration_client, project, package, content, + tag=f"multi-download-{i}" + ) + uploads.append((f"multi-download-{i}", content)) + + results = [] + errors = [] + + def download_worker(tag, expected_content): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + response = client.get( + f"/api/v1/project/{project}/{package}/+/{tag}", + params={"mode": "proxy"}, + ) + if response.status_code == 200: + results.append((tag, response.content, expected_content)) + else: + errors.append(f"Tag {tag}: Status {response.status_code}") + except Exception as e: + errors.append(f"Tag {tag}: {str(e)}") + + with ThreadPoolExecutor(max_workers=num_files) as executor: + futures = [ + executor.submit(download_worker, tag, content) + for tag, content in uploads + ] + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Errors: {errors}" + assert len(results) == num_files + + for tag, downloaded, expected in results: + assert downloaded == expected, f"Content mismatch for {tag}" + + +class TestMixedConcurrentOperations: + """Tests for mixed concurrent upload and download operations.""" + + @pytest.mark.integration + @pytest.mark.concurrent + def test_upload_while_download_in_progress(self, integration_client, test_package): + """Test uploading while a download is in progress.""" + project, package = test_package + api_key = get_api_key(integration_client) + assert api_key, "Failed to create API key" + + # Upload initial content + content1, hash1 = generate_content_with_hash(10240, seed=800) # 10KB + upload_test_file(integration_client, project, package, content1, tag="initial") + + # New content for upload during download + content2, hash2 = generate_content_with_hash(10240, seed=801) + + results = {"downloads": [], "uploads": []} + errors = [] + + def download_worker(): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + response = client.get( + f"/api/v1/project/{project}/{package}/+/initial", + params={"mode": "proxy"}, + ) + if response.status_code == 200: + results["downloads"].append(response.content) + else: + errors.append(f"Download: Status {response.status_code}") + except Exception as e: + errors.append(f"Download: {str(e)}") + + def upload_worker(): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + files = { + "file": ("new.bin", io.BytesIO(content2), "application/octet-stream") + } + response = client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + data={"tag": "during-download"}, + headers={"Authorization": f"Bearer {api_key}"}, + ) + if response.status_code == 200: + results["uploads"].append(response.json()) + else: + errors.append(f"Upload: Status {response.status_code}") + except Exception as e: + errors.append(f"Upload: {str(e)}") + + with ThreadPoolExecutor(max_workers=2) as executor: + futures = [ + executor.submit(download_worker), + executor.submit(upload_worker), + ] + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Errors: {errors}" + assert len(results["downloads"]) == 1 + assert len(results["uploads"]) == 1 + + # Verify download got correct content + assert results["downloads"][0] == content1 + + # Verify upload succeeded + assert results["uploads"][0]["artifact_id"] == hash2 + + @pytest.mark.integration + @pytest.mark.concurrent + def test_multiple_uploads_and_downloads_simultaneously(self, integration_client, test_package): + """Test multiple uploads and downloads running simultaneously.""" + project, package = test_package + api_key = get_api_key(integration_client) + assert api_key, "Failed to create API key" + + # Pre-upload some files for downloading + existing_files = [] + for i in range(3): + content, hash = generate_content_with_hash(2048, seed=900 + i) + upload_test_file(integration_client, project, package, content, tag=f"existing-{i}") + existing_files.append((f"existing-{i}", content)) + + # New files for uploading + new_files = [ + generate_content_with_hash(2048, seed=910 + i) for i in range(3) + ] + + results = {"downloads": [], "uploads": []} + errors = [] + + def download_worker(tag, expected): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + response = client.get( + f"/api/v1/project/{project}/{package}/+/{tag}", + params={"mode": "proxy"}, + ) + if response.status_code == 200: + results["downloads"].append((tag, response.content, expected)) + else: + errors.append(f"Download {tag}: Status {response.status_code}") + except Exception as e: + errors.append(f"Download {tag}: {str(e)}") + + def upload_worker(idx, content, expected_hash): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + files = { + "file": (f"new-{idx}.bin", io.BytesIO(content), "application/octet-stream") + } + response = client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + data={"tag": f"new-{idx}"}, + headers={"Authorization": f"Bearer {api_key}"}, + ) + if response.status_code == 200: + results["uploads"].append((idx, response.json(), expected_hash)) + else: + errors.append(f"Upload {idx}: Status {response.status_code}") + except Exception as e: + errors.append(f"Upload {idx}: {str(e)}") + + with ThreadPoolExecutor(max_workers=6) as executor: + futures = [] + + # Submit downloads + for tag, content in existing_files: + futures.append(executor.submit(download_worker, tag, content)) + + # Submit uploads + for i, (content, hash) in enumerate(new_files): + futures.append(executor.submit(upload_worker, i, content, hash)) + + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Errors: {errors}" + assert len(results["downloads"]) == 3 + assert len(results["uploads"]) == 3 + + # Verify downloads + for tag, downloaded, expected in results["downloads"]: + assert downloaded == expected, f"Download mismatch for {tag}" + + # Verify uploads + for idx, result, expected_hash in results["uploads"]: + assert result["artifact_id"] == expected_hash + + @pytest.mark.integration + @pytest.mark.concurrent + def test_no_data_corruption_under_concurrency(self, integration_client, test_package): + """Test that no data corruption occurs under concurrent operations.""" + project, package = test_package + api_key = get_api_key(integration_client) + assert api_key, "Failed to create API key" + + # Create content with recognizable patterns + num_files = 5 + files_data = [] + for i in range(num_files): + # Each file has unique repeating pattern for easy corruption detection + pattern = bytes([i] * 256) + content = pattern * 40 # 10KB each + hash = compute_sha256(content) + files_data.append((content, hash)) + + results = [] + errors = [] + + def upload_and_verify(idx, content, expected_hash): + try: + from httpx import Client + base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") + + with Client(base_url=base_url, timeout=60.0) as client: + # Upload + files = { + "file": (f"pattern-{idx}.bin", io.BytesIO(content), "application/octet-stream") + } + upload_resp = client.post( + f"/api/v1/project/{project}/{package}/upload", + files=files, + data={"tag": f"pattern-{idx}"}, + headers={"Authorization": f"Bearer {api_key}"}, + ) + if upload_resp.status_code != 200: + errors.append(f"Upload {idx}: Status {upload_resp.status_code}") + return + + upload_result = upload_resp.json() + if upload_result["artifact_id"] != expected_hash: + errors.append(f"Upload {idx}: Hash mismatch") + return + + # Immediately download and verify + download_resp = client.get( + f"/api/v1/project/{project}/{package}/+/pattern-{idx}", + params={"mode": "proxy"}, + ) + if download_resp.status_code != 200: + errors.append(f"Download {idx}: Status {download_resp.status_code}") + return + + if download_resp.content != content: + errors.append(f"Worker {idx}: DATA CORRUPTION DETECTED") + return + + # Verify the downloaded content hash + downloaded_hash = compute_sha256(download_resp.content) + if downloaded_hash != expected_hash: + errors.append(f"Worker {idx}: Hash verification failed") + return + + results.append(idx) + + except Exception as e: + errors.append(f"Worker {idx}: {str(e)}") + + with ThreadPoolExecutor(max_workers=num_files) as executor: + futures = [ + executor.submit(upload_and_verify, i, content, hash) + for i, (content, hash) in enumerate(files_data) + ] + for future in as_completed(futures): + pass + + assert len(errors) == 0, f"Errors: {errors}" + assert len(results) == num_files diff --git a/backend/tests/integration/test_size_boundary.py b/backend/tests/integration/test_size_boundary.py new file mode 100644 index 0000000..49ed3d2 --- /dev/null +++ b/backend/tests/integration/test_size_boundary.py @@ -0,0 +1,583 @@ +""" +Integration tests for upload/download with various file sizes. + +Tests cover: +- Small files (0B - 100KB) +- Medium files (1MB - 50MB) +- Large files (100MB - 1GB) - marked as slow/large +- Exact chunk boundaries +- Data integrity verification across all sizes +""" + +import pytest +import io +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from tests.factories import ( + compute_sha256, + upload_test_file, + generate_content, + generate_content_with_hash, +) +from tests.conftest import ( + SIZE_1B, + SIZE_1KB, + SIZE_10KB, + SIZE_100KB, + SIZE_1MB, + SIZE_5MB, + SIZE_10MB, + SIZE_50MB, + SIZE_100MB, + SIZE_250MB, + SIZE_500MB, + SIZE_1GB, + CHUNK_SIZE, + MULTIPART_THRESHOLD, +) + + +class TestSmallFileSizes: + """Tests for small file uploads/downloads (0B - 100KB).""" + + @pytest.mark.integration + def test_upload_download_1_byte(self, integration_client, test_package, sized_content): + """Test upload/download of 1 byte file.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_1B, seed=1) + + result = upload_test_file( + integration_client, project, package, content, + filename="1byte.bin", tag="1byte" + ) + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_1B + + # Download and verify + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/1byte", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + assert len(response.content) == SIZE_1B + + @pytest.mark.integration + def test_upload_download_1kb(self, integration_client, test_package, sized_content): + """Test upload/download of 1KB file.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_1KB, seed=2) + + result = upload_test_file( + integration_client, project, package, content, + filename="1kb.bin", tag="1kb" + ) + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_1KB + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/1kb", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + + @pytest.mark.integration + def test_upload_download_10kb(self, integration_client, test_package, sized_content): + """Test upload/download of 10KB file.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_10KB, seed=3) + + result = upload_test_file( + integration_client, project, package, content, + filename="10kb.bin", tag="10kb" + ) + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_10KB + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/10kb", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + + @pytest.mark.integration + def test_upload_download_100kb(self, integration_client, test_package, sized_content): + """Test upload/download of 100KB file.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_100KB, seed=4) + + result = upload_test_file( + integration_client, project, package, content, + filename="100kb.bin", tag="100kb" + ) + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_100KB + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/100kb", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + + +class TestMediumFileSizes: + """Tests for medium file uploads/downloads (1MB - 50MB).""" + + @pytest.mark.integration + def test_upload_download_1mb(self, integration_client, test_package, sized_content): + """Test upload/download of 1MB file.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_1MB, seed=10) + + result = upload_test_file( + integration_client, project, package, content, + filename="1mb.bin", tag="1mb" + ) + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_1MB + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/1mb", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert len(response.content) == SIZE_1MB + assert compute_sha256(response.content) == expected_hash + + @pytest.mark.integration + def test_upload_download_5mb(self, integration_client, test_package, sized_content): + """Test upload/download of 5MB file (multipart threshold boundary area).""" + project, package = test_package + content, expected_hash = sized_content(SIZE_5MB, seed=11) + + result = upload_test_file( + integration_client, project, package, content, + filename="5mb.bin", tag="5mb" + ) + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_5MB + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/5mb", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert len(response.content) == SIZE_5MB + assert compute_sha256(response.content) == expected_hash + + @pytest.mark.integration + @pytest.mark.slow + def test_upload_download_10mb(self, integration_client, test_package, sized_content): + """Test upload/download of 10MB file.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_10MB, seed=12) + + result = upload_test_file( + integration_client, project, package, content, + filename="10mb.bin", tag="10mb" + ) + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_10MB + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/10mb", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert len(response.content) == SIZE_10MB + assert compute_sha256(response.content) == expected_hash + + @pytest.mark.integration + @pytest.mark.slow + def test_upload_download_50mb(self, integration_client, test_package, sized_content): + """Test upload/download of 50MB file.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_50MB, seed=13) + + start_time = time.time() + result = upload_test_file( + integration_client, project, package, content, + filename="50mb.bin", tag="50mb" + ) + upload_time = time.time() - start_time + + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_50MB + + start_time = time.time() + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/50mb", + params={"mode": "proxy"}, + ) + download_time = time.time() - start_time + + assert response.status_code == 200 + assert len(response.content) == SIZE_50MB + assert compute_sha256(response.content) == expected_hash + + # Log timing for performance tracking + print(f"\n50MB upload: {upload_time:.2f}s, download: {download_time:.2f}s") + + +class TestLargeFileSizes: + """Tests for large file uploads/downloads (100MB - 1GB). + + These tests are marked as slow and large, skipped by default. + Run with: pytest -m "large" to include these tests. + """ + + @pytest.mark.integration + @pytest.mark.slow + @pytest.mark.large + def test_upload_download_100mb(self, integration_client, test_package, sized_content): + """Test upload/download of 100MB file (multipart threshold).""" + project, package = test_package + content, expected_hash = sized_content(SIZE_100MB, seed=100) + + start_time = time.time() + result = upload_test_file( + integration_client, project, package, content, + filename="100mb.bin", tag="100mb" + ) + upload_time = time.time() - start_time + + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_100MB + + start_time = time.time() + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/100mb", + params={"mode": "proxy"}, + ) + download_time = time.time() - start_time + + assert response.status_code == 200 + assert len(response.content) == SIZE_100MB + assert compute_sha256(response.content) == expected_hash + + print(f"\n100MB upload: {upload_time:.2f}s, download: {download_time:.2f}s") + + @pytest.mark.integration + @pytest.mark.slow + @pytest.mark.large + def test_upload_download_250mb(self, integration_client, test_package, sized_content): + """Test upload/download of 250MB file.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_250MB, seed=250) + + start_time = time.time() + result = upload_test_file( + integration_client, project, package, content, + filename="250mb.bin", tag="250mb" + ) + upload_time = time.time() - start_time + + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_250MB + + start_time = time.time() + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/250mb", + params={"mode": "proxy"}, + ) + download_time = time.time() - start_time + + assert response.status_code == 200 + assert len(response.content) == SIZE_250MB + assert compute_sha256(response.content) == expected_hash + + print(f"\n250MB upload: {upload_time:.2f}s, download: {download_time:.2f}s") + + @pytest.mark.integration + @pytest.mark.slow + @pytest.mark.large + def test_upload_download_500mb(self, integration_client, test_package, sized_content): + """Test upload/download of 500MB file.""" + project, package = test_package + content, expected_hash = sized_content(SIZE_500MB, seed=500) + + start_time = time.time() + result = upload_test_file( + integration_client, project, package, content, + filename="500mb.bin", tag="500mb" + ) + upload_time = time.time() - start_time + + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_500MB + + start_time = time.time() + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/500mb", + params={"mode": "proxy"}, + ) + download_time = time.time() - start_time + + assert response.status_code == 200 + assert len(response.content) == SIZE_500MB + assert compute_sha256(response.content) == expected_hash + + print(f"\n500MB upload: {upload_time:.2f}s, download: {download_time:.2f}s") + + @pytest.mark.integration + @pytest.mark.slow + @pytest.mark.large + def test_upload_download_1gb(self, integration_client, test_package, sized_content): + """Test upload/download of 1GB file. + + This test may take several minutes depending on network/disk speed. + """ + project, package = test_package + content, expected_hash = sized_content(SIZE_1GB, seed=1024) + + start_time = time.time() + result = upload_test_file( + integration_client, project, package, content, + filename="1gb.bin", tag="1gb" + ) + upload_time = time.time() - start_time + + assert result["artifact_id"] == expected_hash + assert result["size"] == SIZE_1GB + + start_time = time.time() + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/1gb", + params={"mode": "proxy"}, + ) + download_time = time.time() - start_time + + assert response.status_code == 200 + assert len(response.content) == SIZE_1GB + assert compute_sha256(response.content) == expected_hash + + print(f"\n1GB upload: {upload_time:.2f}s, download: {download_time:.2f}s") + + +class TestChunkBoundaries: + """Tests for exact chunk size boundaries.""" + + @pytest.mark.integration + def test_upload_download_at_chunk_size(self, integration_client, test_package, sized_content): + """Test upload/download at exact chunk size (64KB).""" + project, package = test_package + content, expected_hash = sized_content(CHUNK_SIZE, seed=64) + + result = upload_test_file( + integration_client, project, package, content, + filename="chunk.bin", tag="chunk-exact" + ) + assert result["artifact_id"] == expected_hash + assert result["size"] == CHUNK_SIZE + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/chunk-exact", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + + @pytest.mark.integration + def test_upload_download_chunk_size_plus_1(self, integration_client, test_package, sized_content): + """Test upload/download at chunk size + 1 byte.""" + project, package = test_package + size = CHUNK_SIZE + 1 + content, expected_hash = sized_content(size, seed=65) + + result = upload_test_file( + integration_client, project, package, content, + filename="chunk_plus.bin", tag="chunk-plus" + ) + assert result["artifact_id"] == expected_hash + assert result["size"] == size + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/chunk-plus", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + + @pytest.mark.integration + def test_upload_download_chunk_size_minus_1(self, integration_client, test_package, sized_content): + """Test upload/download at chunk size - 1 byte.""" + project, package = test_package + size = CHUNK_SIZE - 1 + content, expected_hash = sized_content(size, seed=63) + + result = upload_test_file( + integration_client, project, package, content, + filename="chunk_minus.bin", tag="chunk-minus" + ) + assert result["artifact_id"] == expected_hash + assert result["size"] == size + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/chunk-minus", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + + @pytest.mark.integration + def test_upload_download_multiple_chunks(self, integration_client, test_package, sized_content): + """Test upload/download spanning multiple chunks.""" + project, package = test_package + size = CHUNK_SIZE * 3 + 1000 # 3 full chunks + partial + content, expected_hash = sized_content(size, seed=300) + + result = upload_test_file( + integration_client, project, package, content, + filename="multi_chunk.bin", tag="multi-chunk" + ) + assert result["artifact_id"] == expected_hash + assert result["size"] == size + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/multi-chunk", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + + +class TestDataIntegrity: + """Tests for data integrity with various content types.""" + + @pytest.mark.integration + def test_binary_content_integrity(self, integration_client, test_package): + """Test binary content (all byte values 0-255) integrity.""" + project, package = test_package + # Content with all 256 possible byte values + content = bytes(range(256)) * 100 # 25.6KB + expected_hash = compute_sha256(content) + + result = upload_test_file( + integration_client, project, package, content, + filename="binary.bin", tag="binary" + ) + assert result["artifact_id"] == expected_hash + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/binary", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + + @pytest.mark.integration + def test_text_content_integrity(self, integration_client, test_package): + """Test UTF-8 text content integrity.""" + project, package = test_package + content = "Hello, World! 你好世界 🌍 مرحبا العالم".encode("utf-8") + expected_hash = compute_sha256(content) + + result = upload_test_file( + integration_client, project, package, content, + filename="text.txt", tag="text" + ) + assert result["artifact_id"] == expected_hash + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/text", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + assert response.content.decode("utf-8") == "Hello, World! 你好世界 🌍 مرحبا العالم" + + @pytest.mark.integration + def test_null_bytes_content_integrity(self, integration_client, test_package): + """Test content with null bytes.""" + project, package = test_package + content = b"before\x00null\x00bytes\x00after" + expected_hash = compute_sha256(content) + + result = upload_test_file( + integration_client, project, package, content, + filename="nulls.bin", tag="nulls" + ) + assert result["artifact_id"] == expected_hash + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/nulls", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + assert b"\x00" in response.content + + @pytest.mark.integration + def test_unicode_filename_integrity(self, integration_client, test_package): + """Test file with unicode filename.""" + project, package = test_package + content = b"unicode filename test" + expected_hash = compute_sha256(content) + + result = upload_test_file( + integration_client, project, package, content, + filename="文件名.txt", tag="unicode-name" + ) + assert result["artifact_id"] == expected_hash + assert result["original_name"] == "文件名.txt" + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/unicode-name", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + + @pytest.mark.integration + def test_compressed_content_integrity(self, integration_client, test_package): + """Test gzip-compressed content integrity.""" + import gzip + + project, package = test_package + original = b"This is some text that will be compressed " * 100 + content = gzip.compress(original) + expected_hash = compute_sha256(content) + + result = upload_test_file( + integration_client, project, package, content, + filename="data.gz", tag="compressed" + ) + assert result["artifact_id"] == expected_hash + + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/compressed", + params={"mode": "proxy"}, + ) + assert response.status_code == 200 + assert response.content == content + # Verify we can decompress + assert gzip.decompress(response.content) == original + + @pytest.mark.integration + def test_hash_verification_matches(self, integration_client, test_package, sized_content): + """Test that computed hash matches artifact_id for various sizes.""" + project, package = test_package + + sizes = [SIZE_1B, SIZE_1KB, SIZE_10KB, SIZE_100KB, SIZE_1MB] + + for i, size in enumerate(sizes): + content, expected_hash = sized_content(size, seed=1000 + i) + + result = upload_test_file( + integration_client, project, package, content, + filename=f"hash_test_{size}.bin", tag=f"hash-{size}" + ) + + # Verify artifact_id matches expected hash + assert result["artifact_id"] == expected_hash + + # Download and verify hash of downloaded content + response = integration_client.get( + f"/api/v1/project/{project}/{package}/+/hash-{size}", + params={"mode": "proxy"}, + ) + downloaded_hash = compute_sha256(response.content) + assert downloaded_hash == expected_hash