Add upload/download tests for size boundaries and concurrency

- Add size boundary tests: 1B, 1KB, 10KB, 100KB, 1MB, 5MB, 10MB, 50MB
- Add large file tests (100MB-1GB) marked with @pytest.mark.large
- Add chunk boundary tests at 64KB boundaries
- Add concurrent upload/download tests (2, 5, 10 parallel)
- Add data integrity tests (binary, text, unicode, compressed)
- Add generate_content() and sized_content fixture for test helpers
- Add @pytest.mark.large and @pytest.mark.concurrent markers
- Fix Content-Disposition header encoding for non-ASCII filenames (RFC 5987)
This commit is contained in:
Mondo Diaz
2026-01-16 17:22:16 +00:00
parent a98ac154d5
commit 9106e79aac
7 changed files with 1437 additions and 3 deletions

View File

@@ -0,0 +1,737 @@
"""
Integration tests for concurrent upload and download operations.
Tests cover:
- Concurrent uploads of different files
- Concurrent uploads of same file (deduplication race)
- Concurrent downloads of same artifact
- Concurrent downloads of different artifacts
- Mixed concurrent uploads and downloads
- Data corruption prevention under concurrency
"""
import pytest
import io
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from tests.factories import (
compute_sha256,
upload_test_file,
generate_content_with_hash,
)
def get_api_key(integration_client):
"""Create an API key for concurrent test workers."""
import uuid
response = integration_client.post(
"/api/v1/auth/keys",
json={"name": f"concurrent-test-{uuid.uuid4().hex[:8]}"},
)
if response.status_code == 200:
return response.json()["key"]
return None
class TestConcurrentUploads:
"""Tests for concurrent upload operations."""
@pytest.mark.integration
@pytest.mark.concurrent
def test_2_concurrent_uploads_different_files(self, integration_client, test_package):
"""Test 2 concurrent uploads of different files."""
project, package = test_package
api_key = get_api_key(integration_client)
assert api_key, "Failed to create API key"
files_data = [
generate_content_with_hash(1024, seed=i) for i in range(2)
]
results = []
errors = []
def upload_worker(idx, content, expected_hash):
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
files = {
"file": (f"file-{idx}.bin", io.BytesIO(content), "application/octet-stream")
}
response = client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
data={"tag": f"concurrent-{idx}"},
headers={"Authorization": f"Bearer {api_key}"},
)
if response.status_code == 200:
result = response.json()
results.append((idx, result, expected_hash))
else:
errors.append(f"Worker {idx}: Status {response.status_code}: {response.text}")
except Exception as e:
errors.append(f"Worker {idx}: {str(e)}")
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [
executor.submit(upload_worker, i, content, hash)
for i, (content, hash) in enumerate(files_data)
]
for future in as_completed(futures):
pass
assert len(errors) == 0, f"Errors: {errors}"
assert len(results) == 2
# Verify each upload returned correct artifact_id
for idx, result, expected_hash in results:
assert result["artifact_id"] == expected_hash
@pytest.mark.integration
@pytest.mark.concurrent
def test_5_concurrent_uploads_different_files(self, integration_client, test_package):
"""Test 5 concurrent uploads of different files."""
project, package = test_package
api_key = get_api_key(integration_client)
assert api_key, "Failed to create API key"
num_files = 5
files_data = [
generate_content_with_hash(2048, seed=100 + i) for i in range(num_files)
]
results = []
errors = []
def upload_worker(idx, content, expected_hash):
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
files = {
"file": (f"file-{idx}.bin", io.BytesIO(content), "application/octet-stream")
}
response = client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
data={"tag": f"concurrent5-{idx}"},
headers={"Authorization": f"Bearer {api_key}"},
)
if response.status_code == 200:
result = response.json()
results.append((idx, result, expected_hash))
else:
errors.append(f"Worker {idx}: Status {response.status_code}")
except Exception as e:
errors.append(f"Worker {idx}: {str(e)}")
with ThreadPoolExecutor(max_workers=num_files) as executor:
futures = [
executor.submit(upload_worker, i, content, hash)
for i, (content, hash) in enumerate(files_data)
]
for future in as_completed(futures):
pass
assert len(errors) == 0, f"Errors: {errors}"
assert len(results) == num_files
# Verify all uploads have unique artifact_ids
artifact_ids = set(r[1]["artifact_id"] for r in results)
assert len(artifact_ids) == num_files
@pytest.mark.integration
@pytest.mark.concurrent
def test_10_concurrent_uploads_different_files(self, integration_client, test_package):
"""Test 10 concurrent uploads of different files."""
project, package = test_package
api_key = get_api_key(integration_client)
assert api_key, "Failed to create API key"
num_files = 10
files_data = [
generate_content_with_hash(1024, seed=200 + i) for i in range(num_files)
]
results = []
errors = []
def upload_worker(idx, content, expected_hash):
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
files = {
"file": (f"file-{idx}.bin", io.BytesIO(content), "application/octet-stream")
}
response = client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
data={"tag": f"concurrent10-{idx}"},
headers={"Authorization": f"Bearer {api_key}"},
)
if response.status_code == 200:
result = response.json()
results.append((idx, result, expected_hash))
else:
errors.append(f"Worker {idx}: Status {response.status_code}")
except Exception as e:
errors.append(f"Worker {idx}: {str(e)}")
with ThreadPoolExecutor(max_workers=num_files) as executor:
futures = [
executor.submit(upload_worker, i, content, hash)
for i, (content, hash) in enumerate(files_data)
]
for future in as_completed(futures):
pass
assert len(errors) == 0, f"Errors: {errors}"
assert len(results) == num_files
@pytest.mark.integration
@pytest.mark.concurrent
def test_concurrent_uploads_same_file_deduplication(self, integration_client, test_package):
"""Test concurrent uploads of same file handle deduplication correctly."""
project, package = test_package
api_key = get_api_key(integration_client)
assert api_key, "Failed to create API key"
content, expected_hash = generate_content_with_hash(4096, seed=999)
num_concurrent = 5
results = []
errors = []
def upload_worker(idx):
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
files = {
"file": (f"same-{idx}.bin", io.BytesIO(content), "application/octet-stream")
}
response = client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
data={"tag": f"dedup-{idx}"},
headers={"Authorization": f"Bearer {api_key}"},
)
if response.status_code == 200:
results.append(response.json())
else:
errors.append(f"Worker {idx}: Status {response.status_code}")
except Exception as e:
errors.append(f"Worker {idx}: {str(e)}")
with ThreadPoolExecutor(max_workers=num_concurrent) as executor:
futures = [executor.submit(upload_worker, i) for i in range(num_concurrent)]
for future in as_completed(futures):
pass
assert len(errors) == 0, f"Errors: {errors}"
assert len(results) == num_concurrent
# All should have same artifact_id
artifact_ids = set(r["artifact_id"] for r in results)
assert len(artifact_ids) == 1
assert expected_hash in artifact_ids
# Verify final ref_count equals number of uploads
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
assert response.json()["ref_count"] == num_concurrent
@pytest.mark.integration
@pytest.mark.concurrent
def test_concurrent_uploads_to_different_packages(self, integration_client, test_project, unique_test_id):
"""Test concurrent uploads to different packages."""
project = test_project
api_key = get_api_key(integration_client)
assert api_key, "Failed to create API key"
num_packages = 3
package_names = []
# Create multiple packages
for i in range(num_packages):
pkg_name = f"pkg-{unique_test_id}-{i}"
response = integration_client.post(
f"/api/v1/project/{project}/packages",
json={"name": pkg_name, "description": f"Package {i}"},
)
assert response.status_code == 200
package_names.append(pkg_name)
files_data = [
generate_content_with_hash(1024, seed=300 + i) for i in range(num_packages)
]
results = []
errors = []
def upload_worker(idx, package, content, expected_hash):
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
files = {
"file": (f"file-{idx}.bin", io.BytesIO(content), "application/octet-stream")
}
response = client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
data={"tag": "latest"},
headers={"Authorization": f"Bearer {api_key}"},
)
if response.status_code == 200:
result = response.json()
results.append((package, result, expected_hash))
else:
errors.append(f"Worker {idx}: Status {response.status_code}")
except Exception as e:
errors.append(f"Worker {idx}: {str(e)}")
with ThreadPoolExecutor(max_workers=num_packages) as executor:
futures = [
executor.submit(upload_worker, i, package_names[i], content, hash)
for i, (content, hash) in enumerate(files_data)
]
for future in as_completed(futures):
pass
assert len(errors) == 0, f"Errors: {errors}"
assert len(results) == num_packages
class TestConcurrentDownloads:
"""Tests for concurrent download operations."""
@pytest.mark.integration
@pytest.mark.concurrent
def test_2_concurrent_downloads_same_artifact(self, integration_client, test_package):
"""Test 2 concurrent downloads of same artifact."""
project, package = test_package
content, expected_hash = generate_content_with_hash(2048, seed=400)
# Upload first
upload_test_file(integration_client, project, package, content, tag="download-test")
results = []
errors = []
def download_worker(idx):
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
response = client.get(
f"/api/v1/project/{project}/{package}/+/download-test",
params={"mode": "proxy"},
)
if response.status_code == 200:
results.append((idx, response.content))
else:
errors.append(f"Worker {idx}: Status {response.status_code}")
except Exception as e:
errors.append(f"Worker {idx}: {str(e)}")
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [executor.submit(download_worker, i) for i in range(2)]
for future in as_completed(futures):
pass
assert len(errors) == 0, f"Errors: {errors}"
assert len(results) == 2
# All downloads should match original
for idx, downloaded in results:
assert downloaded == content
@pytest.mark.integration
@pytest.mark.concurrent
def test_5_concurrent_downloads_same_artifact(self, integration_client, test_package):
"""Test 5 concurrent downloads of same artifact."""
project, package = test_package
content, expected_hash = generate_content_with_hash(4096, seed=500)
upload_test_file(integration_client, project, package, content, tag="download5-test")
num_downloads = 5
results = []
errors = []
def download_worker(idx):
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
response = client.get(
f"/api/v1/project/{project}/{package}/+/download5-test",
params={"mode": "proxy"},
)
if response.status_code == 200:
results.append((idx, response.content))
else:
errors.append(f"Worker {idx}: Status {response.status_code}")
except Exception as e:
errors.append(f"Worker {idx}: {str(e)}")
with ThreadPoolExecutor(max_workers=num_downloads) as executor:
futures = [executor.submit(download_worker, i) for i in range(num_downloads)]
for future in as_completed(futures):
pass
assert len(errors) == 0, f"Errors: {errors}"
assert len(results) == num_downloads
for idx, downloaded in results:
assert downloaded == content
@pytest.mark.integration
@pytest.mark.concurrent
def test_10_concurrent_downloads_same_artifact(self, integration_client, test_package):
"""Test 10 concurrent downloads of same artifact."""
project, package = test_package
content, expected_hash = generate_content_with_hash(8192, seed=600)
upload_test_file(integration_client, project, package, content, tag="download10-test")
num_downloads = 10
results = []
errors = []
def download_worker(idx):
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
response = client.get(
f"/api/v1/project/{project}/{package}/+/download10-test",
params={"mode": "proxy"},
)
if response.status_code == 200:
results.append((idx, response.content))
else:
errors.append(f"Worker {idx}: Status {response.status_code}")
except Exception as e:
errors.append(f"Worker {idx}: {str(e)}")
with ThreadPoolExecutor(max_workers=num_downloads) as executor:
futures = [executor.submit(download_worker, i) for i in range(num_downloads)]
for future in as_completed(futures):
pass
assert len(errors) == 0, f"Errors: {errors}"
assert len(results) == num_downloads
for idx, downloaded in results:
assert downloaded == content
@pytest.mark.integration
@pytest.mark.concurrent
def test_concurrent_downloads_different_artifacts(self, integration_client, test_package):
"""Test concurrent downloads of different artifacts."""
project, package = test_package
# Upload multiple files
num_files = 5
uploads = []
for i in range(num_files):
content, expected_hash = generate_content_with_hash(1024, seed=700 + i)
upload_test_file(
integration_client, project, package, content,
tag=f"multi-download-{i}"
)
uploads.append((f"multi-download-{i}", content))
results = []
errors = []
def download_worker(tag, expected_content):
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
response = client.get(
f"/api/v1/project/{project}/{package}/+/{tag}",
params={"mode": "proxy"},
)
if response.status_code == 200:
results.append((tag, response.content, expected_content))
else:
errors.append(f"Tag {tag}: Status {response.status_code}")
except Exception as e:
errors.append(f"Tag {tag}: {str(e)}")
with ThreadPoolExecutor(max_workers=num_files) as executor:
futures = [
executor.submit(download_worker, tag, content)
for tag, content in uploads
]
for future in as_completed(futures):
pass
assert len(errors) == 0, f"Errors: {errors}"
assert len(results) == num_files
for tag, downloaded, expected in results:
assert downloaded == expected, f"Content mismatch for {tag}"
class TestMixedConcurrentOperations:
"""Tests for mixed concurrent upload and download operations."""
@pytest.mark.integration
@pytest.mark.concurrent
def test_upload_while_download_in_progress(self, integration_client, test_package):
"""Test uploading while a download is in progress."""
project, package = test_package
api_key = get_api_key(integration_client)
assert api_key, "Failed to create API key"
# Upload initial content
content1, hash1 = generate_content_with_hash(10240, seed=800) # 10KB
upload_test_file(integration_client, project, package, content1, tag="initial")
# New content for upload during download
content2, hash2 = generate_content_with_hash(10240, seed=801)
results = {"downloads": [], "uploads": []}
errors = []
def download_worker():
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
response = client.get(
f"/api/v1/project/{project}/{package}/+/initial",
params={"mode": "proxy"},
)
if response.status_code == 200:
results["downloads"].append(response.content)
else:
errors.append(f"Download: Status {response.status_code}")
except Exception as e:
errors.append(f"Download: {str(e)}")
def upload_worker():
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
files = {
"file": ("new.bin", io.BytesIO(content2), "application/octet-stream")
}
response = client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
data={"tag": "during-download"},
headers={"Authorization": f"Bearer {api_key}"},
)
if response.status_code == 200:
results["uploads"].append(response.json())
else:
errors.append(f"Upload: Status {response.status_code}")
except Exception as e:
errors.append(f"Upload: {str(e)}")
with ThreadPoolExecutor(max_workers=2) as executor:
futures = [
executor.submit(download_worker),
executor.submit(upload_worker),
]
for future in as_completed(futures):
pass
assert len(errors) == 0, f"Errors: {errors}"
assert len(results["downloads"]) == 1
assert len(results["uploads"]) == 1
# Verify download got correct content
assert results["downloads"][0] == content1
# Verify upload succeeded
assert results["uploads"][0]["artifact_id"] == hash2
@pytest.mark.integration
@pytest.mark.concurrent
def test_multiple_uploads_and_downloads_simultaneously(self, integration_client, test_package):
"""Test multiple uploads and downloads running simultaneously."""
project, package = test_package
api_key = get_api_key(integration_client)
assert api_key, "Failed to create API key"
# Pre-upload some files for downloading
existing_files = []
for i in range(3):
content, hash = generate_content_with_hash(2048, seed=900 + i)
upload_test_file(integration_client, project, package, content, tag=f"existing-{i}")
existing_files.append((f"existing-{i}", content))
# New files for uploading
new_files = [
generate_content_with_hash(2048, seed=910 + i) for i in range(3)
]
results = {"downloads": [], "uploads": []}
errors = []
def download_worker(tag, expected):
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
response = client.get(
f"/api/v1/project/{project}/{package}/+/{tag}",
params={"mode": "proxy"},
)
if response.status_code == 200:
results["downloads"].append((tag, response.content, expected))
else:
errors.append(f"Download {tag}: Status {response.status_code}")
except Exception as e:
errors.append(f"Download {tag}: {str(e)}")
def upload_worker(idx, content, expected_hash):
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
files = {
"file": (f"new-{idx}.bin", io.BytesIO(content), "application/octet-stream")
}
response = client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
data={"tag": f"new-{idx}"},
headers={"Authorization": f"Bearer {api_key}"},
)
if response.status_code == 200:
results["uploads"].append((idx, response.json(), expected_hash))
else:
errors.append(f"Upload {idx}: Status {response.status_code}")
except Exception as e:
errors.append(f"Upload {idx}: {str(e)}")
with ThreadPoolExecutor(max_workers=6) as executor:
futures = []
# Submit downloads
for tag, content in existing_files:
futures.append(executor.submit(download_worker, tag, content))
# Submit uploads
for i, (content, hash) in enumerate(new_files):
futures.append(executor.submit(upload_worker, i, content, hash))
for future in as_completed(futures):
pass
assert len(errors) == 0, f"Errors: {errors}"
assert len(results["downloads"]) == 3
assert len(results["uploads"]) == 3
# Verify downloads
for tag, downloaded, expected in results["downloads"]:
assert downloaded == expected, f"Download mismatch for {tag}"
# Verify uploads
for idx, result, expected_hash in results["uploads"]:
assert result["artifact_id"] == expected_hash
@pytest.mark.integration
@pytest.mark.concurrent
def test_no_data_corruption_under_concurrency(self, integration_client, test_package):
"""Test that no data corruption occurs under concurrent operations."""
project, package = test_package
api_key = get_api_key(integration_client)
assert api_key, "Failed to create API key"
# Create content with recognizable patterns
num_files = 5
files_data = []
for i in range(num_files):
# Each file has unique repeating pattern for easy corruption detection
pattern = bytes([i] * 256)
content = pattern * 40 # 10KB each
hash = compute_sha256(content)
files_data.append((content, hash))
results = []
errors = []
def upload_and_verify(idx, content, expected_hash):
try:
from httpx import Client
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=60.0) as client:
# Upload
files = {
"file": (f"pattern-{idx}.bin", io.BytesIO(content), "application/octet-stream")
}
upload_resp = client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
data={"tag": f"pattern-{idx}"},
headers={"Authorization": f"Bearer {api_key}"},
)
if upload_resp.status_code != 200:
errors.append(f"Upload {idx}: Status {upload_resp.status_code}")
return
upload_result = upload_resp.json()
if upload_result["artifact_id"] != expected_hash:
errors.append(f"Upload {idx}: Hash mismatch")
return
# Immediately download and verify
download_resp = client.get(
f"/api/v1/project/{project}/{package}/+/pattern-{idx}",
params={"mode": "proxy"},
)
if download_resp.status_code != 200:
errors.append(f"Download {idx}: Status {download_resp.status_code}")
return
if download_resp.content != content:
errors.append(f"Worker {idx}: DATA CORRUPTION DETECTED")
return
# Verify the downloaded content hash
downloaded_hash = compute_sha256(download_resp.content)
if downloaded_hash != expected_hash:
errors.append(f"Worker {idx}: Hash verification failed")
return
results.append(idx)
except Exception as e:
errors.append(f"Worker {idx}: {str(e)}")
with ThreadPoolExecutor(max_workers=num_files) as executor:
futures = [
executor.submit(upload_and_verify, i, content, hash)
for i, (content, hash) in enumerate(files_data)
]
for future in as_completed(futures):
pass
assert len(errors) == 0, f"Errors: {errors}"
assert len(results) == num_files

View File

@@ -0,0 +1,583 @@
"""
Integration tests for upload/download with various file sizes.
Tests cover:
- Small files (0B - 100KB)
- Medium files (1MB - 50MB)
- Large files (100MB - 1GB) - marked as slow/large
- Exact chunk boundaries
- Data integrity verification across all sizes
"""
import pytest
import io
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tests.factories import (
compute_sha256,
upload_test_file,
generate_content,
generate_content_with_hash,
)
from tests.conftest import (
SIZE_1B,
SIZE_1KB,
SIZE_10KB,
SIZE_100KB,
SIZE_1MB,
SIZE_5MB,
SIZE_10MB,
SIZE_50MB,
SIZE_100MB,
SIZE_250MB,
SIZE_500MB,
SIZE_1GB,
CHUNK_SIZE,
MULTIPART_THRESHOLD,
)
class TestSmallFileSizes:
"""Tests for small file uploads/downloads (0B - 100KB)."""
@pytest.mark.integration
def test_upload_download_1_byte(self, integration_client, test_package, sized_content):
"""Test upload/download of 1 byte file."""
project, package = test_package
content, expected_hash = sized_content(SIZE_1B, seed=1)
result = upload_test_file(
integration_client, project, package, content,
filename="1byte.bin", tag="1byte"
)
assert result["artifact_id"] == expected_hash
assert result["size"] == SIZE_1B
# Download and verify
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/1byte",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
assert len(response.content) == SIZE_1B
@pytest.mark.integration
def test_upload_download_1kb(self, integration_client, test_package, sized_content):
"""Test upload/download of 1KB file."""
project, package = test_package
content, expected_hash = sized_content(SIZE_1KB, seed=2)
result = upload_test_file(
integration_client, project, package, content,
filename="1kb.bin", tag="1kb"
)
assert result["artifact_id"] == expected_hash
assert result["size"] == SIZE_1KB
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/1kb",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
@pytest.mark.integration
def test_upload_download_10kb(self, integration_client, test_package, sized_content):
"""Test upload/download of 10KB file."""
project, package = test_package
content, expected_hash = sized_content(SIZE_10KB, seed=3)
result = upload_test_file(
integration_client, project, package, content,
filename="10kb.bin", tag="10kb"
)
assert result["artifact_id"] == expected_hash
assert result["size"] == SIZE_10KB
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/10kb",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
@pytest.mark.integration
def test_upload_download_100kb(self, integration_client, test_package, sized_content):
"""Test upload/download of 100KB file."""
project, package = test_package
content, expected_hash = sized_content(SIZE_100KB, seed=4)
result = upload_test_file(
integration_client, project, package, content,
filename="100kb.bin", tag="100kb"
)
assert result["artifact_id"] == expected_hash
assert result["size"] == SIZE_100KB
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/100kb",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
class TestMediumFileSizes:
"""Tests for medium file uploads/downloads (1MB - 50MB)."""
@pytest.mark.integration
def test_upload_download_1mb(self, integration_client, test_package, sized_content):
"""Test upload/download of 1MB file."""
project, package = test_package
content, expected_hash = sized_content(SIZE_1MB, seed=10)
result = upload_test_file(
integration_client, project, package, content,
filename="1mb.bin", tag="1mb"
)
assert result["artifact_id"] == expected_hash
assert result["size"] == SIZE_1MB
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/1mb",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert len(response.content) == SIZE_1MB
assert compute_sha256(response.content) == expected_hash
@pytest.mark.integration
def test_upload_download_5mb(self, integration_client, test_package, sized_content):
"""Test upload/download of 5MB file (multipart threshold boundary area)."""
project, package = test_package
content, expected_hash = sized_content(SIZE_5MB, seed=11)
result = upload_test_file(
integration_client, project, package, content,
filename="5mb.bin", tag="5mb"
)
assert result["artifact_id"] == expected_hash
assert result["size"] == SIZE_5MB
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/5mb",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert len(response.content) == SIZE_5MB
assert compute_sha256(response.content) == expected_hash
@pytest.mark.integration
@pytest.mark.slow
def test_upload_download_10mb(self, integration_client, test_package, sized_content):
"""Test upload/download of 10MB file."""
project, package = test_package
content, expected_hash = sized_content(SIZE_10MB, seed=12)
result = upload_test_file(
integration_client, project, package, content,
filename="10mb.bin", tag="10mb"
)
assert result["artifact_id"] == expected_hash
assert result["size"] == SIZE_10MB
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/10mb",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert len(response.content) == SIZE_10MB
assert compute_sha256(response.content) == expected_hash
@pytest.mark.integration
@pytest.mark.slow
def test_upload_download_50mb(self, integration_client, test_package, sized_content):
"""Test upload/download of 50MB file."""
project, package = test_package
content, expected_hash = sized_content(SIZE_50MB, seed=13)
start_time = time.time()
result = upload_test_file(
integration_client, project, package, content,
filename="50mb.bin", tag="50mb"
)
upload_time = time.time() - start_time
assert result["artifact_id"] == expected_hash
assert result["size"] == SIZE_50MB
start_time = time.time()
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/50mb",
params={"mode": "proxy"},
)
download_time = time.time() - start_time
assert response.status_code == 200
assert len(response.content) == SIZE_50MB
assert compute_sha256(response.content) == expected_hash
# Log timing for performance tracking
print(f"\n50MB upload: {upload_time:.2f}s, download: {download_time:.2f}s")
class TestLargeFileSizes:
"""Tests for large file uploads/downloads (100MB - 1GB).
These tests are marked as slow and large, skipped by default.
Run with: pytest -m "large" to include these tests.
"""
@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.large
def test_upload_download_100mb(self, integration_client, test_package, sized_content):
"""Test upload/download of 100MB file (multipart threshold)."""
project, package = test_package
content, expected_hash = sized_content(SIZE_100MB, seed=100)
start_time = time.time()
result = upload_test_file(
integration_client, project, package, content,
filename="100mb.bin", tag="100mb"
)
upload_time = time.time() - start_time
assert result["artifact_id"] == expected_hash
assert result["size"] == SIZE_100MB
start_time = time.time()
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/100mb",
params={"mode": "proxy"},
)
download_time = time.time() - start_time
assert response.status_code == 200
assert len(response.content) == SIZE_100MB
assert compute_sha256(response.content) == expected_hash
print(f"\n100MB upload: {upload_time:.2f}s, download: {download_time:.2f}s")
@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.large
def test_upload_download_250mb(self, integration_client, test_package, sized_content):
"""Test upload/download of 250MB file."""
project, package = test_package
content, expected_hash = sized_content(SIZE_250MB, seed=250)
start_time = time.time()
result = upload_test_file(
integration_client, project, package, content,
filename="250mb.bin", tag="250mb"
)
upload_time = time.time() - start_time
assert result["artifact_id"] == expected_hash
assert result["size"] == SIZE_250MB
start_time = time.time()
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/250mb",
params={"mode": "proxy"},
)
download_time = time.time() - start_time
assert response.status_code == 200
assert len(response.content) == SIZE_250MB
assert compute_sha256(response.content) == expected_hash
print(f"\n250MB upload: {upload_time:.2f}s, download: {download_time:.2f}s")
@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.large
def test_upload_download_500mb(self, integration_client, test_package, sized_content):
"""Test upload/download of 500MB file."""
project, package = test_package
content, expected_hash = sized_content(SIZE_500MB, seed=500)
start_time = time.time()
result = upload_test_file(
integration_client, project, package, content,
filename="500mb.bin", tag="500mb"
)
upload_time = time.time() - start_time
assert result["artifact_id"] == expected_hash
assert result["size"] == SIZE_500MB
start_time = time.time()
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/500mb",
params={"mode": "proxy"},
)
download_time = time.time() - start_time
assert response.status_code == 200
assert len(response.content) == SIZE_500MB
assert compute_sha256(response.content) == expected_hash
print(f"\n500MB upload: {upload_time:.2f}s, download: {download_time:.2f}s")
@pytest.mark.integration
@pytest.mark.slow
@pytest.mark.large
def test_upload_download_1gb(self, integration_client, test_package, sized_content):
"""Test upload/download of 1GB file.
This test may take several minutes depending on network/disk speed.
"""
project, package = test_package
content, expected_hash = sized_content(SIZE_1GB, seed=1024)
start_time = time.time()
result = upload_test_file(
integration_client, project, package, content,
filename="1gb.bin", tag="1gb"
)
upload_time = time.time() - start_time
assert result["artifact_id"] == expected_hash
assert result["size"] == SIZE_1GB
start_time = time.time()
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/1gb",
params={"mode": "proxy"},
)
download_time = time.time() - start_time
assert response.status_code == 200
assert len(response.content) == SIZE_1GB
assert compute_sha256(response.content) == expected_hash
print(f"\n1GB upload: {upload_time:.2f}s, download: {download_time:.2f}s")
class TestChunkBoundaries:
"""Tests for exact chunk size boundaries."""
@pytest.mark.integration
def test_upload_download_at_chunk_size(self, integration_client, test_package, sized_content):
"""Test upload/download at exact chunk size (64KB)."""
project, package = test_package
content, expected_hash = sized_content(CHUNK_SIZE, seed=64)
result = upload_test_file(
integration_client, project, package, content,
filename="chunk.bin", tag="chunk-exact"
)
assert result["artifact_id"] == expected_hash
assert result["size"] == CHUNK_SIZE
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/chunk-exact",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
@pytest.mark.integration
def test_upload_download_chunk_size_plus_1(self, integration_client, test_package, sized_content):
"""Test upload/download at chunk size + 1 byte."""
project, package = test_package
size = CHUNK_SIZE + 1
content, expected_hash = sized_content(size, seed=65)
result = upload_test_file(
integration_client, project, package, content,
filename="chunk_plus.bin", tag="chunk-plus"
)
assert result["artifact_id"] == expected_hash
assert result["size"] == size
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/chunk-plus",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
@pytest.mark.integration
def test_upload_download_chunk_size_minus_1(self, integration_client, test_package, sized_content):
"""Test upload/download at chunk size - 1 byte."""
project, package = test_package
size = CHUNK_SIZE - 1
content, expected_hash = sized_content(size, seed=63)
result = upload_test_file(
integration_client, project, package, content,
filename="chunk_minus.bin", tag="chunk-minus"
)
assert result["artifact_id"] == expected_hash
assert result["size"] == size
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/chunk-minus",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
@pytest.mark.integration
def test_upload_download_multiple_chunks(self, integration_client, test_package, sized_content):
"""Test upload/download spanning multiple chunks."""
project, package = test_package
size = CHUNK_SIZE * 3 + 1000 # 3 full chunks + partial
content, expected_hash = sized_content(size, seed=300)
result = upload_test_file(
integration_client, project, package, content,
filename="multi_chunk.bin", tag="multi-chunk"
)
assert result["artifact_id"] == expected_hash
assert result["size"] == size
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/multi-chunk",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
class TestDataIntegrity:
"""Tests for data integrity with various content types."""
@pytest.mark.integration
def test_binary_content_integrity(self, integration_client, test_package):
"""Test binary content (all byte values 0-255) integrity."""
project, package = test_package
# Content with all 256 possible byte values
content = bytes(range(256)) * 100 # 25.6KB
expected_hash = compute_sha256(content)
result = upload_test_file(
integration_client, project, package, content,
filename="binary.bin", tag="binary"
)
assert result["artifact_id"] == expected_hash
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/binary",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
@pytest.mark.integration
def test_text_content_integrity(self, integration_client, test_package):
"""Test UTF-8 text content integrity."""
project, package = test_package
content = "Hello, World! 你好世界 🌍 مرحبا العالم".encode("utf-8")
expected_hash = compute_sha256(content)
result = upload_test_file(
integration_client, project, package, content,
filename="text.txt", tag="text"
)
assert result["artifact_id"] == expected_hash
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/text",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
assert response.content.decode("utf-8") == "Hello, World! 你好世界 🌍 مرحبا العالم"
@pytest.mark.integration
def test_null_bytes_content_integrity(self, integration_client, test_package):
"""Test content with null bytes."""
project, package = test_package
content = b"before\x00null\x00bytes\x00after"
expected_hash = compute_sha256(content)
result = upload_test_file(
integration_client, project, package, content,
filename="nulls.bin", tag="nulls"
)
assert result["artifact_id"] == expected_hash
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/nulls",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
assert b"\x00" in response.content
@pytest.mark.integration
def test_unicode_filename_integrity(self, integration_client, test_package):
"""Test file with unicode filename."""
project, package = test_package
content = b"unicode filename test"
expected_hash = compute_sha256(content)
result = upload_test_file(
integration_client, project, package, content,
filename="文件名.txt", tag="unicode-name"
)
assert result["artifact_id"] == expected_hash
assert result["original_name"] == "文件名.txt"
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/unicode-name",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
@pytest.mark.integration
def test_compressed_content_integrity(self, integration_client, test_package):
"""Test gzip-compressed content integrity."""
import gzip
project, package = test_package
original = b"This is some text that will be compressed " * 100
content = gzip.compress(original)
expected_hash = compute_sha256(content)
result = upload_test_file(
integration_client, project, package, content,
filename="data.gz", tag="compressed"
)
assert result["artifact_id"] == expected_hash
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/compressed",
params={"mode": "proxy"},
)
assert response.status_code == 200
assert response.content == content
# Verify we can decompress
assert gzip.decompress(response.content) == original
@pytest.mark.integration
def test_hash_verification_matches(self, integration_client, test_package, sized_content):
"""Test that computed hash matches artifact_id for various sizes."""
project, package = test_package
sizes = [SIZE_1B, SIZE_1KB, SIZE_10KB, SIZE_100KB, SIZE_1MB]
for i, size in enumerate(sizes):
content, expected_hash = sized_content(size, seed=1000 + i)
result = upload_test_file(
integration_client, project, package, content,
filename=f"hash_test_{size}.bin", tag=f"hash-{size}"
)
# Verify artifact_id matches expected hash
assert result["artifact_id"] == expected_hash
# Download and verify hash of downloaded content
response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/hash-{size}",
params={"mode": "proxy"},
)
downloaded_hash = compute_sha256(response.content)
assert downloaded_hash == expected_hash