Compare commits
4 Commits
feature/st
...
2bb619975e
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2bb619975e | ||
|
|
e7ae94b1e1 | ||
|
|
4deadc708f | ||
|
|
9106e79aac |
13
CHANGELOG.md
13
CHANGELOG.md
@@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
### Added
|
||||
- Added comprehensive upload/download tests for size boundaries (1B to 1GB)
|
||||
- Added concurrent upload/download tests (2, 5, 10 parallel operations)
|
||||
- Added data integrity tests (binary, text, unicode, compressed content)
|
||||
- Added chunk boundary tests for edge cases
|
||||
- Added `@pytest.mark.large` and `@pytest.mark.concurrent` test markers
|
||||
- Added `generate_content()` and `generate_content_with_hash()` test helpers
|
||||
- Added `sized_content` fixture for generating test content of specific sizes
|
||||
- Added upload API tests: upload without tag, artifact creation verification, S3 object creation
|
||||
- Added download API tests: tag: prefix resolution, 404 for nonexistent project/package/artifact
|
||||
- Added download header tests: Content-Type, Content-Length, Content-Disposition, ETag, X-Checksum-SHA256
|
||||
- Added error handling tests: timeout behavior, checksum validation, resource cleanup, graceful error responses
|
||||
- Added version API tests: version creation, auto-detection, listing, download by version prefix, deletion
|
||||
- Added `package_versions` table for immutable version tracking separate from mutable tags (#56)
|
||||
- Versions are set at upload time via explicit `version` parameter or auto-detected from filename/metadata
|
||||
- Version detection priority: explicit parameter > package metadata > filename pattern
|
||||
@@ -44,6 +56,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Improved pod naming: Orchard pods now named `orchard-{env}-server-*` for clarity (#51)
|
||||
|
||||
### Fixed
|
||||
- Fixed Content-Disposition header encoding for non-ASCII filenames using RFC 5987
|
||||
- Fixed deploy jobs running even when tests or security scans fail (changed rules from `when: always` to `when: on_success`) (#63)
|
||||
- Fixed python_tests job not using internal PyPI proxy (#63)
|
||||
- Fixed `cleanup_feature` job failing when branch is deleted (`GIT_STRATEGY: none`) (#51)
|
||||
|
||||
@@ -143,6 +143,31 @@ def sanitize_filename(filename: str) -> str:
|
||||
return re.sub(r'[\r\n"]', "", filename)
|
||||
|
||||
|
||||
def build_content_disposition(filename: str) -> str:
|
||||
"""Build a Content-Disposition header value with proper encoding.
|
||||
|
||||
For ASCII filenames, uses simple: attachment; filename="name"
|
||||
For non-ASCII filenames, uses RFC 5987 encoding with UTF-8.
|
||||
"""
|
||||
from urllib.parse import quote
|
||||
|
||||
sanitized = sanitize_filename(filename)
|
||||
|
||||
# Check if filename is pure ASCII
|
||||
try:
|
||||
sanitized.encode('ascii')
|
||||
# Pure ASCII - simple format
|
||||
return f'attachment; filename="{sanitized}"'
|
||||
except UnicodeEncodeError:
|
||||
# Non-ASCII - use RFC 5987 encoding
|
||||
# Provide both filename (ASCII fallback) and filename* (UTF-8 encoded)
|
||||
ascii_fallback = sanitized.encode('ascii', errors='replace').decode('ascii')
|
||||
# RFC 5987: filename*=charset'language'encoded_value
|
||||
# We use UTF-8 encoding and percent-encode non-ASCII chars
|
||||
encoded = quote(sanitized, safe='')
|
||||
return f'attachment; filename="{ascii_fallback}"; filename*=UTF-8\'\'{encoded}'
|
||||
|
||||
|
||||
def get_user_id_from_request(
|
||||
request: Request,
|
||||
db: Session,
|
||||
@@ -3076,7 +3101,7 @@ def download_artifact(
|
||||
)
|
||||
|
||||
headers = {
|
||||
"Content-Disposition": f'attachment; filename="{filename}"',
|
||||
"Content-Disposition": build_content_disposition(filename),
|
||||
"Accept-Ranges": "bytes",
|
||||
"Content-Length": str(content_length),
|
||||
**checksum_headers,
|
||||
@@ -3094,7 +3119,7 @@ def download_artifact(
|
||||
|
||||
# Full download with optional verification
|
||||
base_headers = {
|
||||
"Content-Disposition": f'attachment; filename="{filename}"',
|
||||
"Content-Disposition": build_content_disposition(filename),
|
||||
"Accept-Ranges": "bytes",
|
||||
**checksum_headers,
|
||||
}
|
||||
@@ -3276,7 +3301,7 @@ def head_artifact(
|
||||
|
||||
# Build headers with checksum information
|
||||
headers = {
|
||||
"Content-Disposition": f'attachment; filename="{filename}"',
|
||||
"Content-Disposition": build_content_disposition(filename),
|
||||
"Accept-Ranges": "bytes",
|
||||
"Content-Length": str(artifact.size),
|
||||
"X-Artifact-Id": artifact.id,
|
||||
|
||||
@@ -12,6 +12,8 @@ markers =
|
||||
unit: Unit tests (no external dependencies)
|
||||
integration: Integration tests (require database/storage)
|
||||
slow: Slow tests (skip with -m "not slow")
|
||||
large: Large file tests (100MB+, skip with -m "not large")
|
||||
concurrent: Concurrent operation tests
|
||||
|
||||
# Coverage configuration
|
||||
[coverage:run]
|
||||
|
||||
@@ -32,6 +32,8 @@ from tests.factories import (
|
||||
compute_md5,
|
||||
compute_sha1,
|
||||
upload_test_file,
|
||||
generate_content,
|
||||
generate_content_with_hash,
|
||||
TEST_CONTENT_HELLO,
|
||||
TEST_HASH_HELLO,
|
||||
TEST_MD5_HELLO,
|
||||
@@ -271,3 +273,41 @@ def test_content():
|
||||
content = f"test-content-{uuid.uuid4().hex}".encode()
|
||||
sha256 = compute_sha256(content)
|
||||
return (content, sha256)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def sized_content():
|
||||
"""
|
||||
Factory fixture for generating content of specific sizes.
|
||||
|
||||
Usage:
|
||||
def test_example(sized_content):
|
||||
content, hash = sized_content(1024) # 1KB
|
||||
content, hash = sized_content(1024 * 1024) # 1MB
|
||||
"""
|
||||
def _generate(size: int, seed: int = None):
|
||||
return generate_content_with_hash(size, seed)
|
||||
return _generate
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Size Constants for Tests
|
||||
# =============================================================================
|
||||
|
||||
# Common file sizes for boundary testing
|
||||
SIZE_1B = 1
|
||||
SIZE_1KB = 1024
|
||||
SIZE_10KB = 10 * 1024
|
||||
SIZE_100KB = 100 * 1024
|
||||
SIZE_1MB = 1024 * 1024
|
||||
SIZE_5MB = 5 * 1024 * 1024
|
||||
SIZE_10MB = 10 * 1024 * 1024
|
||||
SIZE_50MB = 50 * 1024 * 1024
|
||||
SIZE_100MB = 100 * 1024 * 1024
|
||||
SIZE_250MB = 250 * 1024 * 1024
|
||||
SIZE_500MB = 500 * 1024 * 1024
|
||||
SIZE_1GB = 1024 * 1024 * 1024
|
||||
|
||||
# Chunk size boundaries (based on typical S3 multipart chunk sizes)
|
||||
CHUNK_SIZE = 64 * 1024 # 64KB typical chunk
|
||||
MULTIPART_THRESHOLD = 100 * 1024 * 1024 # 100MB multipart threshold
|
||||
|
||||
@@ -130,6 +130,41 @@ def upload_test_file(
|
||||
return response.json()
|
||||
|
||||
|
||||
def generate_content(size: int, seed: Optional[int] = None) -> bytes:
|
||||
"""
|
||||
Generate deterministic or random content of a specified size.
|
||||
|
||||
Args:
|
||||
size: Size of content in bytes
|
||||
seed: Optional seed for reproducible content (None for random)
|
||||
|
||||
Returns:
|
||||
Bytes of the specified size
|
||||
"""
|
||||
if size == 0:
|
||||
return b""
|
||||
if seed is not None:
|
||||
import random
|
||||
rng = random.Random(seed)
|
||||
return bytes(rng.randint(0, 255) for _ in range(size))
|
||||
return os.urandom(size)
|
||||
|
||||
|
||||
def generate_content_with_hash(size: int, seed: Optional[int] = None) -> tuple[bytes, str]:
|
||||
"""
|
||||
Generate content of specified size and compute its SHA256 hash.
|
||||
|
||||
Args:
|
||||
size: Size of content in bytes
|
||||
seed: Optional seed for reproducible content
|
||||
|
||||
Returns:
|
||||
Tuple of (content_bytes, sha256_hash)
|
||||
"""
|
||||
content = generate_content(size, seed)
|
||||
return content, compute_sha256(content)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Project/Package Factories
|
||||
# =============================================================================
|
||||
|
||||
737
backend/tests/integration/test_concurrent_operations.py
Normal file
737
backend/tests/integration/test_concurrent_operations.py
Normal file
@@ -0,0 +1,737 @@
|
||||
"""
|
||||
Integration tests for concurrent upload and download operations.
|
||||
|
||||
Tests cover:
|
||||
- Concurrent uploads of different files
|
||||
- Concurrent uploads of same file (deduplication race)
|
||||
- Concurrent downloads of same artifact
|
||||
- Concurrent downloads of different artifacts
|
||||
- Mixed concurrent uploads and downloads
|
||||
- Data corruption prevention under concurrency
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import io
|
||||
import os
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from tests.factories import (
|
||||
compute_sha256,
|
||||
upload_test_file,
|
||||
generate_content_with_hash,
|
||||
)
|
||||
|
||||
|
||||
def get_api_key(integration_client):
|
||||
"""Create an API key for concurrent test workers."""
|
||||
import uuid
|
||||
response = integration_client.post(
|
||||
"/api/v1/auth/keys",
|
||||
json={"name": f"concurrent-test-{uuid.uuid4().hex[:8]}"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
return response.json()["key"]
|
||||
return None
|
||||
|
||||
|
||||
class TestConcurrentUploads:
|
||||
"""Tests for concurrent upload operations."""
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.concurrent
|
||||
def test_2_concurrent_uploads_different_files(self, integration_client, test_package):
|
||||
"""Test 2 concurrent uploads of different files."""
|
||||
project, package = test_package
|
||||
api_key = get_api_key(integration_client)
|
||||
assert api_key, "Failed to create API key"
|
||||
|
||||
files_data = [
|
||||
generate_content_with_hash(1024, seed=i) for i in range(2)
|
||||
]
|
||||
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def upload_worker(idx, content, expected_hash):
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
files = {
|
||||
"file": (f"file-{idx}.bin", io.BytesIO(content), "application/octet-stream")
|
||||
}
|
||||
response = client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"tag": f"concurrent-{idx}"},
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
results.append((idx, result, expected_hash))
|
||||
else:
|
||||
errors.append(f"Worker {idx}: Status {response.status_code}: {response.text}")
|
||||
except Exception as e:
|
||||
errors.append(f"Worker {idx}: {str(e)}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
futures = [
|
||||
executor.submit(upload_worker, i, content, hash)
|
||||
for i, (content, hash) in enumerate(files_data)
|
||||
]
|
||||
for future in as_completed(futures):
|
||||
pass
|
||||
|
||||
assert len(errors) == 0, f"Errors: {errors}"
|
||||
assert len(results) == 2
|
||||
|
||||
# Verify each upload returned correct artifact_id
|
||||
for idx, result, expected_hash in results:
|
||||
assert result["artifact_id"] == expected_hash
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.concurrent
|
||||
def test_5_concurrent_uploads_different_files(self, integration_client, test_package):
|
||||
"""Test 5 concurrent uploads of different files."""
|
||||
project, package = test_package
|
||||
api_key = get_api_key(integration_client)
|
||||
assert api_key, "Failed to create API key"
|
||||
|
||||
num_files = 5
|
||||
files_data = [
|
||||
generate_content_with_hash(2048, seed=100 + i) for i in range(num_files)
|
||||
]
|
||||
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def upload_worker(idx, content, expected_hash):
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
files = {
|
||||
"file": (f"file-{idx}.bin", io.BytesIO(content), "application/octet-stream")
|
||||
}
|
||||
response = client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"tag": f"concurrent5-{idx}"},
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
results.append((idx, result, expected_hash))
|
||||
else:
|
||||
errors.append(f"Worker {idx}: Status {response.status_code}")
|
||||
except Exception as e:
|
||||
errors.append(f"Worker {idx}: {str(e)}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=num_files) as executor:
|
||||
futures = [
|
||||
executor.submit(upload_worker, i, content, hash)
|
||||
for i, (content, hash) in enumerate(files_data)
|
||||
]
|
||||
for future in as_completed(futures):
|
||||
pass
|
||||
|
||||
assert len(errors) == 0, f"Errors: {errors}"
|
||||
assert len(results) == num_files
|
||||
|
||||
# Verify all uploads have unique artifact_ids
|
||||
artifact_ids = set(r[1]["artifact_id"] for r in results)
|
||||
assert len(artifact_ids) == num_files
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.concurrent
|
||||
def test_10_concurrent_uploads_different_files(self, integration_client, test_package):
|
||||
"""Test 10 concurrent uploads of different files."""
|
||||
project, package = test_package
|
||||
api_key = get_api_key(integration_client)
|
||||
assert api_key, "Failed to create API key"
|
||||
|
||||
num_files = 10
|
||||
files_data = [
|
||||
generate_content_with_hash(1024, seed=200 + i) for i in range(num_files)
|
||||
]
|
||||
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def upload_worker(idx, content, expected_hash):
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
files = {
|
||||
"file": (f"file-{idx}.bin", io.BytesIO(content), "application/octet-stream")
|
||||
}
|
||||
response = client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"tag": f"concurrent10-{idx}"},
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
results.append((idx, result, expected_hash))
|
||||
else:
|
||||
errors.append(f"Worker {idx}: Status {response.status_code}")
|
||||
except Exception as e:
|
||||
errors.append(f"Worker {idx}: {str(e)}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=num_files) as executor:
|
||||
futures = [
|
||||
executor.submit(upload_worker, i, content, hash)
|
||||
for i, (content, hash) in enumerate(files_data)
|
||||
]
|
||||
for future in as_completed(futures):
|
||||
pass
|
||||
|
||||
assert len(errors) == 0, f"Errors: {errors}"
|
||||
assert len(results) == num_files
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.concurrent
|
||||
def test_concurrent_uploads_same_file_deduplication(self, integration_client, test_package):
|
||||
"""Test concurrent uploads of same file handle deduplication correctly."""
|
||||
project, package = test_package
|
||||
api_key = get_api_key(integration_client)
|
||||
assert api_key, "Failed to create API key"
|
||||
|
||||
content, expected_hash = generate_content_with_hash(4096, seed=999)
|
||||
num_concurrent = 5
|
||||
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def upload_worker(idx):
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
files = {
|
||||
"file": (f"same-{idx}.bin", io.BytesIO(content), "application/octet-stream")
|
||||
}
|
||||
response = client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"tag": f"dedup-{idx}"},
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
results.append(response.json())
|
||||
else:
|
||||
errors.append(f"Worker {idx}: Status {response.status_code}")
|
||||
except Exception as e:
|
||||
errors.append(f"Worker {idx}: {str(e)}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=num_concurrent) as executor:
|
||||
futures = [executor.submit(upload_worker, i) for i in range(num_concurrent)]
|
||||
for future in as_completed(futures):
|
||||
pass
|
||||
|
||||
assert len(errors) == 0, f"Errors: {errors}"
|
||||
assert len(results) == num_concurrent
|
||||
|
||||
# All should have same artifact_id
|
||||
artifact_ids = set(r["artifact_id"] for r in results)
|
||||
assert len(artifact_ids) == 1
|
||||
assert expected_hash in artifact_ids
|
||||
|
||||
# Verify final ref_count equals number of uploads
|
||||
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert response.status_code == 200
|
||||
assert response.json()["ref_count"] == num_concurrent
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.concurrent
|
||||
def test_concurrent_uploads_to_different_packages(self, integration_client, test_project, unique_test_id):
|
||||
"""Test concurrent uploads to different packages."""
|
||||
project = test_project
|
||||
api_key = get_api_key(integration_client)
|
||||
assert api_key, "Failed to create API key"
|
||||
|
||||
num_packages = 3
|
||||
package_names = []
|
||||
|
||||
# Create multiple packages
|
||||
for i in range(num_packages):
|
||||
pkg_name = f"pkg-{unique_test_id}-{i}"
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/packages",
|
||||
json={"name": pkg_name, "description": f"Package {i}"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
package_names.append(pkg_name)
|
||||
|
||||
files_data = [
|
||||
generate_content_with_hash(1024, seed=300 + i) for i in range(num_packages)
|
||||
]
|
||||
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def upload_worker(idx, package, content, expected_hash):
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
files = {
|
||||
"file": (f"file-{idx}.bin", io.BytesIO(content), "application/octet-stream")
|
||||
}
|
||||
response = client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"tag": "latest"},
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
results.append((package, result, expected_hash))
|
||||
else:
|
||||
errors.append(f"Worker {idx}: Status {response.status_code}")
|
||||
except Exception as e:
|
||||
errors.append(f"Worker {idx}: {str(e)}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=num_packages) as executor:
|
||||
futures = [
|
||||
executor.submit(upload_worker, i, package_names[i], content, hash)
|
||||
for i, (content, hash) in enumerate(files_data)
|
||||
]
|
||||
for future in as_completed(futures):
|
||||
pass
|
||||
|
||||
assert len(errors) == 0, f"Errors: {errors}"
|
||||
assert len(results) == num_packages
|
||||
|
||||
|
||||
class TestConcurrentDownloads:
|
||||
"""Tests for concurrent download operations."""
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.concurrent
|
||||
def test_2_concurrent_downloads_same_artifact(self, integration_client, test_package):
|
||||
"""Test 2 concurrent downloads of same artifact."""
|
||||
project, package = test_package
|
||||
content, expected_hash = generate_content_with_hash(2048, seed=400)
|
||||
|
||||
# Upload first
|
||||
upload_test_file(integration_client, project, package, content, tag="download-test")
|
||||
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def download_worker(idx):
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
response = client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/download-test",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
results.append((idx, response.content))
|
||||
else:
|
||||
errors.append(f"Worker {idx}: Status {response.status_code}")
|
||||
except Exception as e:
|
||||
errors.append(f"Worker {idx}: {str(e)}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
futures = [executor.submit(download_worker, i) for i in range(2)]
|
||||
for future in as_completed(futures):
|
||||
pass
|
||||
|
||||
assert len(errors) == 0, f"Errors: {errors}"
|
||||
assert len(results) == 2
|
||||
|
||||
# All downloads should match original
|
||||
for idx, downloaded in results:
|
||||
assert downloaded == content
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.concurrent
|
||||
def test_5_concurrent_downloads_same_artifact(self, integration_client, test_package):
|
||||
"""Test 5 concurrent downloads of same artifact."""
|
||||
project, package = test_package
|
||||
content, expected_hash = generate_content_with_hash(4096, seed=500)
|
||||
|
||||
upload_test_file(integration_client, project, package, content, tag="download5-test")
|
||||
|
||||
num_downloads = 5
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def download_worker(idx):
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
response = client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/download5-test",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
results.append((idx, response.content))
|
||||
else:
|
||||
errors.append(f"Worker {idx}: Status {response.status_code}")
|
||||
except Exception as e:
|
||||
errors.append(f"Worker {idx}: {str(e)}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=num_downloads) as executor:
|
||||
futures = [executor.submit(download_worker, i) for i in range(num_downloads)]
|
||||
for future in as_completed(futures):
|
||||
pass
|
||||
|
||||
assert len(errors) == 0, f"Errors: {errors}"
|
||||
assert len(results) == num_downloads
|
||||
|
||||
for idx, downloaded in results:
|
||||
assert downloaded == content
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.concurrent
|
||||
def test_10_concurrent_downloads_same_artifact(self, integration_client, test_package):
|
||||
"""Test 10 concurrent downloads of same artifact."""
|
||||
project, package = test_package
|
||||
content, expected_hash = generate_content_with_hash(8192, seed=600)
|
||||
|
||||
upload_test_file(integration_client, project, package, content, tag="download10-test")
|
||||
|
||||
num_downloads = 10
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def download_worker(idx):
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
response = client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/download10-test",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
results.append((idx, response.content))
|
||||
else:
|
||||
errors.append(f"Worker {idx}: Status {response.status_code}")
|
||||
except Exception as e:
|
||||
errors.append(f"Worker {idx}: {str(e)}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=num_downloads) as executor:
|
||||
futures = [executor.submit(download_worker, i) for i in range(num_downloads)]
|
||||
for future in as_completed(futures):
|
||||
pass
|
||||
|
||||
assert len(errors) == 0, f"Errors: {errors}"
|
||||
assert len(results) == num_downloads
|
||||
|
||||
for idx, downloaded in results:
|
||||
assert downloaded == content
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.concurrent
|
||||
def test_concurrent_downloads_different_artifacts(self, integration_client, test_package):
|
||||
"""Test concurrent downloads of different artifacts."""
|
||||
project, package = test_package
|
||||
|
||||
# Upload multiple files
|
||||
num_files = 5
|
||||
uploads = []
|
||||
for i in range(num_files):
|
||||
content, expected_hash = generate_content_with_hash(1024, seed=700 + i)
|
||||
upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
tag=f"multi-download-{i}"
|
||||
)
|
||||
uploads.append((f"multi-download-{i}", content))
|
||||
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def download_worker(tag, expected_content):
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
response = client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/{tag}",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
results.append((tag, response.content, expected_content))
|
||||
else:
|
||||
errors.append(f"Tag {tag}: Status {response.status_code}")
|
||||
except Exception as e:
|
||||
errors.append(f"Tag {tag}: {str(e)}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=num_files) as executor:
|
||||
futures = [
|
||||
executor.submit(download_worker, tag, content)
|
||||
for tag, content in uploads
|
||||
]
|
||||
for future in as_completed(futures):
|
||||
pass
|
||||
|
||||
assert len(errors) == 0, f"Errors: {errors}"
|
||||
assert len(results) == num_files
|
||||
|
||||
for tag, downloaded, expected in results:
|
||||
assert downloaded == expected, f"Content mismatch for {tag}"
|
||||
|
||||
|
||||
class TestMixedConcurrentOperations:
|
||||
"""Tests for mixed concurrent upload and download operations."""
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.concurrent
|
||||
def test_upload_while_download_in_progress(self, integration_client, test_package):
|
||||
"""Test uploading while a download is in progress."""
|
||||
project, package = test_package
|
||||
api_key = get_api_key(integration_client)
|
||||
assert api_key, "Failed to create API key"
|
||||
|
||||
# Upload initial content
|
||||
content1, hash1 = generate_content_with_hash(10240, seed=800) # 10KB
|
||||
upload_test_file(integration_client, project, package, content1, tag="initial")
|
||||
|
||||
# New content for upload during download
|
||||
content2, hash2 = generate_content_with_hash(10240, seed=801)
|
||||
|
||||
results = {"downloads": [], "uploads": []}
|
||||
errors = []
|
||||
|
||||
def download_worker():
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
response = client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/initial",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
results["downloads"].append(response.content)
|
||||
else:
|
||||
errors.append(f"Download: Status {response.status_code}")
|
||||
except Exception as e:
|
||||
errors.append(f"Download: {str(e)}")
|
||||
|
||||
def upload_worker():
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
files = {
|
||||
"file": ("new.bin", io.BytesIO(content2), "application/octet-stream")
|
||||
}
|
||||
response = client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"tag": "during-download"},
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
results["uploads"].append(response.json())
|
||||
else:
|
||||
errors.append(f"Upload: Status {response.status_code}")
|
||||
except Exception as e:
|
||||
errors.append(f"Upload: {str(e)}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=2) as executor:
|
||||
futures = [
|
||||
executor.submit(download_worker),
|
||||
executor.submit(upload_worker),
|
||||
]
|
||||
for future in as_completed(futures):
|
||||
pass
|
||||
|
||||
assert len(errors) == 0, f"Errors: {errors}"
|
||||
assert len(results["downloads"]) == 1
|
||||
assert len(results["uploads"]) == 1
|
||||
|
||||
# Verify download got correct content
|
||||
assert results["downloads"][0] == content1
|
||||
|
||||
# Verify upload succeeded
|
||||
assert results["uploads"][0]["artifact_id"] == hash2
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.concurrent
|
||||
def test_multiple_uploads_and_downloads_simultaneously(self, integration_client, test_package):
|
||||
"""Test multiple uploads and downloads running simultaneously."""
|
||||
project, package = test_package
|
||||
api_key = get_api_key(integration_client)
|
||||
assert api_key, "Failed to create API key"
|
||||
|
||||
# Pre-upload some files for downloading
|
||||
existing_files = []
|
||||
for i in range(3):
|
||||
content, hash = generate_content_with_hash(2048, seed=900 + i)
|
||||
upload_test_file(integration_client, project, package, content, tag=f"existing-{i}")
|
||||
existing_files.append((f"existing-{i}", content))
|
||||
|
||||
# New files for uploading
|
||||
new_files = [
|
||||
generate_content_with_hash(2048, seed=910 + i) for i in range(3)
|
||||
]
|
||||
|
||||
results = {"downloads": [], "uploads": []}
|
||||
errors = []
|
||||
|
||||
def download_worker(tag, expected):
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
response = client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/{tag}",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
results["downloads"].append((tag, response.content, expected))
|
||||
else:
|
||||
errors.append(f"Download {tag}: Status {response.status_code}")
|
||||
except Exception as e:
|
||||
errors.append(f"Download {tag}: {str(e)}")
|
||||
|
||||
def upload_worker(idx, content, expected_hash):
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
files = {
|
||||
"file": (f"new-{idx}.bin", io.BytesIO(content), "application/octet-stream")
|
||||
}
|
||||
response = client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"tag": f"new-{idx}"},
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
results["uploads"].append((idx, response.json(), expected_hash))
|
||||
else:
|
||||
errors.append(f"Upload {idx}: Status {response.status_code}")
|
||||
except Exception as e:
|
||||
errors.append(f"Upload {idx}: {str(e)}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=6) as executor:
|
||||
futures = []
|
||||
|
||||
# Submit downloads
|
||||
for tag, content in existing_files:
|
||||
futures.append(executor.submit(download_worker, tag, content))
|
||||
|
||||
# Submit uploads
|
||||
for i, (content, hash) in enumerate(new_files):
|
||||
futures.append(executor.submit(upload_worker, i, content, hash))
|
||||
|
||||
for future in as_completed(futures):
|
||||
pass
|
||||
|
||||
assert len(errors) == 0, f"Errors: {errors}"
|
||||
assert len(results["downloads"]) == 3
|
||||
assert len(results["uploads"]) == 3
|
||||
|
||||
# Verify downloads
|
||||
for tag, downloaded, expected in results["downloads"]:
|
||||
assert downloaded == expected, f"Download mismatch for {tag}"
|
||||
|
||||
# Verify uploads
|
||||
for idx, result, expected_hash in results["uploads"]:
|
||||
assert result["artifact_id"] == expected_hash
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.concurrent
|
||||
def test_no_data_corruption_under_concurrency(self, integration_client, test_package):
|
||||
"""Test that no data corruption occurs under concurrent operations."""
|
||||
project, package = test_package
|
||||
api_key = get_api_key(integration_client)
|
||||
assert api_key, "Failed to create API key"
|
||||
|
||||
# Create content with recognizable patterns
|
||||
num_files = 5
|
||||
files_data = []
|
||||
for i in range(num_files):
|
||||
# Each file has unique repeating pattern for easy corruption detection
|
||||
pattern = bytes([i] * 256)
|
||||
content = pattern * 40 # 10KB each
|
||||
hash = compute_sha256(content)
|
||||
files_data.append((content, hash))
|
||||
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def upload_and_verify(idx, content, expected_hash):
|
||||
try:
|
||||
from httpx import Client
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=60.0) as client:
|
||||
# Upload
|
||||
files = {
|
||||
"file": (f"pattern-{idx}.bin", io.BytesIO(content), "application/octet-stream")
|
||||
}
|
||||
upload_resp = client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"tag": f"pattern-{idx}"},
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
)
|
||||
if upload_resp.status_code != 200:
|
||||
errors.append(f"Upload {idx}: Status {upload_resp.status_code}")
|
||||
return
|
||||
|
||||
upload_result = upload_resp.json()
|
||||
if upload_result["artifact_id"] != expected_hash:
|
||||
errors.append(f"Upload {idx}: Hash mismatch")
|
||||
return
|
||||
|
||||
# Immediately download and verify
|
||||
download_resp = client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/pattern-{idx}",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
if download_resp.status_code != 200:
|
||||
errors.append(f"Download {idx}: Status {download_resp.status_code}")
|
||||
return
|
||||
|
||||
if download_resp.content != content:
|
||||
errors.append(f"Worker {idx}: DATA CORRUPTION DETECTED")
|
||||
return
|
||||
|
||||
# Verify the downloaded content hash
|
||||
downloaded_hash = compute_sha256(download_resp.content)
|
||||
if downloaded_hash != expected_hash:
|
||||
errors.append(f"Worker {idx}: Hash verification failed")
|
||||
return
|
||||
|
||||
results.append(idx)
|
||||
|
||||
except Exception as e:
|
||||
errors.append(f"Worker {idx}: {str(e)}")
|
||||
|
||||
with ThreadPoolExecutor(max_workers=num_files) as executor:
|
||||
futures = [
|
||||
executor.submit(upload_and_verify, i, content, hash)
|
||||
for i, (content, hash) in enumerate(files_data)
|
||||
]
|
||||
for future in as_completed(futures):
|
||||
pass
|
||||
|
||||
assert len(errors) == 0, f"Errors: {errors}"
|
||||
assert len(results) == num_files
|
||||
322
backend/tests/integration/test_error_handling.py
Normal file
322
backend/tests/integration/test_error_handling.py
Normal file
@@ -0,0 +1,322 @@
|
||||
"""
|
||||
Integration tests for error handling in upload and download operations.
|
||||
|
||||
Tests cover:
|
||||
- Timeout handling
|
||||
- Invalid request handling
|
||||
- Resource cleanup on failures
|
||||
- Graceful error responses
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import io
|
||||
import time
|
||||
from tests.factories import (
|
||||
compute_sha256,
|
||||
upload_test_file,
|
||||
generate_content_with_hash,
|
||||
)
|
||||
|
||||
|
||||
class TestUploadErrorHandling:
|
||||
"""Tests for upload error handling."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_to_nonexistent_project_returns_404(
|
||||
self, integration_client, unique_test_id
|
||||
):
|
||||
"""Test upload to nonexistent project returns 404."""
|
||||
content = b"test content for nonexistent project"
|
||||
|
||||
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/nonexistent-project-{unique_test_id}/nonexistent-pkg/upload",
|
||||
files=files,
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_to_nonexistent_package_returns_404(
|
||||
self, integration_client, test_project, unique_test_id
|
||||
):
|
||||
"""Test upload to nonexistent package returns 404."""
|
||||
content = b"test content for nonexistent package"
|
||||
|
||||
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload",
|
||||
files=files,
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_empty_file_rejected(self, integration_client, test_package):
|
||||
"""Test empty file upload is rejected."""
|
||||
project, package = test_package
|
||||
|
||||
files = {"file": ("empty.bin", io.BytesIO(b""), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
)
|
||||
assert response.status_code in [400, 422]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_missing_file_returns_422(self, integration_client, test_package):
|
||||
"""Test upload without file field returns 422."""
|
||||
project, package = test_package
|
||||
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
data={"tag": "no-file-provided"},
|
||||
)
|
||||
assert response.status_code == 422
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_invalid_checksum_format_returns_400(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test upload with invalid checksum format returns 400."""
|
||||
project, package = test_package
|
||||
content = b"checksum format test"
|
||||
|
||||
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
headers={"X-Checksum-SHA256": "invalid-hash-format"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_checksum_mismatch_returns_422(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test upload with mismatched checksum returns 422."""
|
||||
project, package = test_package
|
||||
content = b"checksum mismatch test"
|
||||
wrong_hash = "0" * 64 # Valid format but wrong hash
|
||||
|
||||
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
headers={"X-Checksum-SHA256": wrong_hash},
|
||||
)
|
||||
assert response.status_code == 422
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_with_correct_checksum_succeeds(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test upload with correct checksum succeeds."""
|
||||
project, package = test_package
|
||||
content = b"correct checksum test"
|
||||
correct_hash = compute_sha256(content)
|
||||
|
||||
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
headers={"X-Checksum-SHA256": correct_hash},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json()["artifact_id"] == correct_hash
|
||||
|
||||
|
||||
class TestDownloadErrorHandling:
|
||||
"""Tests for download error handling."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_nonexistent_tag_returns_404(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test download of nonexistent tag returns 404."""
|
||||
project, package = test_package
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/nonexistent-tag-xyz"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_nonexistent_artifact_returns_404(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test download of nonexistent artifact ID returns 404."""
|
||||
project, package = test_package
|
||||
fake_hash = "a" * 64
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/artifact:{fake_hash}"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_invalid_artifact_id_format(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test download with invalid artifact ID format."""
|
||||
project, package = test_package
|
||||
|
||||
# Too short
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/artifact:abc123"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_from_nonexistent_project_returns_404(
|
||||
self, integration_client, unique_test_id
|
||||
):
|
||||
"""Test download from nonexistent project returns 404."""
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/nonexistent-{unique_test_id}/pkg/+/tag"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_from_nonexistent_package_returns_404(
|
||||
self, integration_client, test_project, unique_test_id
|
||||
):
|
||||
"""Test download from nonexistent package returns 404."""
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{test_project}/nonexistent-{unique_test_id}/+/tag"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
class TestTimeoutBehavior:
|
||||
"""Tests for timeout behavior (integration level)."""
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.slow
|
||||
def test_large_upload_completes_within_reasonable_time(
|
||||
self, integration_client, test_package, sized_content
|
||||
):
|
||||
"""Test that a 10MB upload completes within reasonable time."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(10 * 1024 * 1024, seed=999) # 10MB
|
||||
|
||||
start_time = time.time()
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content, tag="timeout-test"
|
||||
)
|
||||
elapsed = time.time() - start_time
|
||||
|
||||
assert result["artifact_id"] == expected_hash
|
||||
# Should complete within 60 seconds for 10MB on local docker
|
||||
assert elapsed < 60, f"Upload took too long: {elapsed:.2f}s"
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.slow
|
||||
def test_large_download_completes_within_reasonable_time(
|
||||
self, integration_client, test_package, sized_content
|
||||
):
|
||||
"""Test that a 10MB download completes within reasonable time."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(10 * 1024 * 1024, seed=998) # 10MB
|
||||
|
||||
# First upload
|
||||
upload_test_file(
|
||||
integration_client, project, package, content, tag="download-timeout-test"
|
||||
)
|
||||
|
||||
# Then download and time it
|
||||
start_time = time.time()
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/download-timeout-test",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
elapsed = time.time() - start_time
|
||||
|
||||
assert response.status_code == 200
|
||||
assert len(response.content) == len(content)
|
||||
# Should complete within 60 seconds for 10MB on local docker
|
||||
assert elapsed < 60, f"Download took too long: {elapsed:.2f}s"
|
||||
|
||||
|
||||
class TestResourceCleanup:
|
||||
"""Tests for proper resource cleanup on failures.
|
||||
|
||||
Note: More comprehensive cleanup tests are in test_upload_download_api.py
|
||||
(TestUploadFailureCleanup class) including S3 object cleanup verification.
|
||||
"""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_checksum_mismatch_no_orphaned_artifact(
|
||||
self, integration_client, test_package, unique_test_id
|
||||
):
|
||||
"""Test checksum mismatch doesn't leave orphaned artifact."""
|
||||
project, package = test_package
|
||||
# Use unique content to ensure artifact doesn't exist from prior tests
|
||||
content = f"checksum mismatch orphan test {unique_test_id}".encode()
|
||||
wrong_hash = "0" * 64
|
||||
actual_hash = compute_sha256(content)
|
||||
|
||||
# Verify artifact doesn't exist before test
|
||||
pre_check = integration_client.get(f"/api/v1/artifact/{actual_hash}")
|
||||
assert pre_check.status_code == 404, "Artifact should not exist before test"
|
||||
|
||||
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
headers={"X-Checksum-SHA256": wrong_hash},
|
||||
)
|
||||
assert response.status_code == 422
|
||||
|
||||
# Verify no artifact was created with either hash
|
||||
response1 = integration_client.get(f"/api/v1/artifact/{wrong_hash}")
|
||||
response2 = integration_client.get(f"/api/v1/artifact/{actual_hash}")
|
||||
assert response1.status_code == 404
|
||||
assert response2.status_code == 404
|
||||
|
||||
|
||||
class TestGracefulErrorResponses:
|
||||
"""Tests for graceful and informative error responses."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_404_response_has_detail_message(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test 404 responses include a detail message."""
|
||||
project, package = test_package
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/nonexistent-tag"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
data = response.json()
|
||||
assert "detail" in data
|
||||
assert len(data["detail"]) > 0
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_422_response_has_detail_message(self, integration_client, test_package):
|
||||
"""Test 422 responses include a detail message."""
|
||||
project, package = test_package
|
||||
|
||||
# Upload with mismatched checksum
|
||||
content = b"detail message test"
|
||||
wrong_hash = "0" * 64
|
||||
|
||||
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
headers={"X-Checksum-SHA256": wrong_hash},
|
||||
)
|
||||
assert response.status_code == 422
|
||||
data = response.json()
|
||||
assert "detail" in data
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_error_response_is_json(self, integration_client, unique_test_id):
|
||||
"""Test error responses are valid JSON."""
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/nonexistent-{unique_test_id}/pkg/+/tag"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
# Should not raise exception - valid JSON
|
||||
data = response.json()
|
||||
assert isinstance(data, dict)
|
||||
583
backend/tests/integration/test_size_boundary.py
Normal file
583
backend/tests/integration/test_size_boundary.py
Normal file
@@ -0,0 +1,583 @@
|
||||
"""
|
||||
Integration tests for upload/download with various file sizes.
|
||||
|
||||
Tests cover:
|
||||
- Small files (0B - 100KB)
|
||||
- Medium files (1MB - 50MB)
|
||||
- Large files (100MB - 1GB) - marked as slow/large
|
||||
- Exact chunk boundaries
|
||||
- Data integrity verification across all sizes
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import io
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from tests.factories import (
|
||||
compute_sha256,
|
||||
upload_test_file,
|
||||
generate_content,
|
||||
generate_content_with_hash,
|
||||
)
|
||||
from tests.conftest import (
|
||||
SIZE_1B,
|
||||
SIZE_1KB,
|
||||
SIZE_10KB,
|
||||
SIZE_100KB,
|
||||
SIZE_1MB,
|
||||
SIZE_5MB,
|
||||
SIZE_10MB,
|
||||
SIZE_50MB,
|
||||
SIZE_100MB,
|
||||
SIZE_250MB,
|
||||
SIZE_500MB,
|
||||
SIZE_1GB,
|
||||
CHUNK_SIZE,
|
||||
MULTIPART_THRESHOLD,
|
||||
)
|
||||
|
||||
|
||||
class TestSmallFileSizes:
|
||||
"""Tests for small file uploads/downloads (0B - 100KB)."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_download_1_byte(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download of 1 byte file."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(SIZE_1B, seed=1)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="1byte.bin", tag="1byte"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == SIZE_1B
|
||||
|
||||
# Download and verify
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/1byte",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
assert len(response.content) == SIZE_1B
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_download_1kb(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download of 1KB file."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(SIZE_1KB, seed=2)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="1kb.bin", tag="1kb"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == SIZE_1KB
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/1kb",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_download_10kb(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download of 10KB file."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(SIZE_10KB, seed=3)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="10kb.bin", tag="10kb"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == SIZE_10KB
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/10kb",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_download_100kb(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download of 100KB file."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(SIZE_100KB, seed=4)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="100kb.bin", tag="100kb"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == SIZE_100KB
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/100kb",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
|
||||
|
||||
class TestMediumFileSizes:
|
||||
"""Tests for medium file uploads/downloads (1MB - 50MB)."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_download_1mb(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download of 1MB file."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(SIZE_1MB, seed=10)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="1mb.bin", tag="1mb"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == SIZE_1MB
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/1mb",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert len(response.content) == SIZE_1MB
|
||||
assert compute_sha256(response.content) == expected_hash
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_download_5mb(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download of 5MB file (multipart threshold boundary area)."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(SIZE_5MB, seed=11)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="5mb.bin", tag="5mb"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == SIZE_5MB
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/5mb",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert len(response.content) == SIZE_5MB
|
||||
assert compute_sha256(response.content) == expected_hash
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.slow
|
||||
def test_upload_download_10mb(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download of 10MB file."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(SIZE_10MB, seed=12)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="10mb.bin", tag="10mb"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == SIZE_10MB
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/10mb",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert len(response.content) == SIZE_10MB
|
||||
assert compute_sha256(response.content) == expected_hash
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.slow
|
||||
def test_upload_download_50mb(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download of 50MB file."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(SIZE_50MB, seed=13)
|
||||
|
||||
start_time = time.time()
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="50mb.bin", tag="50mb"
|
||||
)
|
||||
upload_time = time.time() - start_time
|
||||
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == SIZE_50MB
|
||||
|
||||
start_time = time.time()
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/50mb",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
download_time = time.time() - start_time
|
||||
|
||||
assert response.status_code == 200
|
||||
assert len(response.content) == SIZE_50MB
|
||||
assert compute_sha256(response.content) == expected_hash
|
||||
|
||||
# Log timing for performance tracking
|
||||
print(f"\n50MB upload: {upload_time:.2f}s, download: {download_time:.2f}s")
|
||||
|
||||
|
||||
class TestLargeFileSizes:
|
||||
"""Tests for large file uploads/downloads (100MB - 1GB).
|
||||
|
||||
These tests are marked as slow and large, skipped by default.
|
||||
Run with: pytest -m "large" to include these tests.
|
||||
"""
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.slow
|
||||
@pytest.mark.large
|
||||
def test_upload_download_100mb(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download of 100MB file (multipart threshold)."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(SIZE_100MB, seed=100)
|
||||
|
||||
start_time = time.time()
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="100mb.bin", tag="100mb"
|
||||
)
|
||||
upload_time = time.time() - start_time
|
||||
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == SIZE_100MB
|
||||
|
||||
start_time = time.time()
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/100mb",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
download_time = time.time() - start_time
|
||||
|
||||
assert response.status_code == 200
|
||||
assert len(response.content) == SIZE_100MB
|
||||
assert compute_sha256(response.content) == expected_hash
|
||||
|
||||
print(f"\n100MB upload: {upload_time:.2f}s, download: {download_time:.2f}s")
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.slow
|
||||
@pytest.mark.large
|
||||
def test_upload_download_250mb(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download of 250MB file."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(SIZE_250MB, seed=250)
|
||||
|
||||
start_time = time.time()
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="250mb.bin", tag="250mb"
|
||||
)
|
||||
upload_time = time.time() - start_time
|
||||
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == SIZE_250MB
|
||||
|
||||
start_time = time.time()
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/250mb",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
download_time = time.time() - start_time
|
||||
|
||||
assert response.status_code == 200
|
||||
assert len(response.content) == SIZE_250MB
|
||||
assert compute_sha256(response.content) == expected_hash
|
||||
|
||||
print(f"\n250MB upload: {upload_time:.2f}s, download: {download_time:.2f}s")
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.slow
|
||||
@pytest.mark.large
|
||||
def test_upload_download_500mb(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download of 500MB file."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(SIZE_500MB, seed=500)
|
||||
|
||||
start_time = time.time()
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="500mb.bin", tag="500mb"
|
||||
)
|
||||
upload_time = time.time() - start_time
|
||||
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == SIZE_500MB
|
||||
|
||||
start_time = time.time()
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/500mb",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
download_time = time.time() - start_time
|
||||
|
||||
assert response.status_code == 200
|
||||
assert len(response.content) == SIZE_500MB
|
||||
assert compute_sha256(response.content) == expected_hash
|
||||
|
||||
print(f"\n500MB upload: {upload_time:.2f}s, download: {download_time:.2f}s")
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.slow
|
||||
@pytest.mark.large
|
||||
def test_upload_download_1gb(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download of 1GB file.
|
||||
|
||||
This test may take several minutes depending on network/disk speed.
|
||||
"""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(SIZE_1GB, seed=1024)
|
||||
|
||||
start_time = time.time()
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="1gb.bin", tag="1gb"
|
||||
)
|
||||
upload_time = time.time() - start_time
|
||||
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == SIZE_1GB
|
||||
|
||||
start_time = time.time()
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/1gb",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
download_time = time.time() - start_time
|
||||
|
||||
assert response.status_code == 200
|
||||
assert len(response.content) == SIZE_1GB
|
||||
assert compute_sha256(response.content) == expected_hash
|
||||
|
||||
print(f"\n1GB upload: {upload_time:.2f}s, download: {download_time:.2f}s")
|
||||
|
||||
|
||||
class TestChunkBoundaries:
|
||||
"""Tests for exact chunk size boundaries."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_download_at_chunk_size(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download at exact chunk size (64KB)."""
|
||||
project, package = test_package
|
||||
content, expected_hash = sized_content(CHUNK_SIZE, seed=64)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="chunk.bin", tag="chunk-exact"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == CHUNK_SIZE
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/chunk-exact",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_download_chunk_size_plus_1(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download at chunk size + 1 byte."""
|
||||
project, package = test_package
|
||||
size = CHUNK_SIZE + 1
|
||||
content, expected_hash = sized_content(size, seed=65)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="chunk_plus.bin", tag="chunk-plus"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == size
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/chunk-plus",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_download_chunk_size_minus_1(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download at chunk size - 1 byte."""
|
||||
project, package = test_package
|
||||
size = CHUNK_SIZE - 1
|
||||
content, expected_hash = sized_content(size, seed=63)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="chunk_minus.bin", tag="chunk-minus"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == size
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/chunk-minus",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_download_multiple_chunks(self, integration_client, test_package, sized_content):
|
||||
"""Test upload/download spanning multiple chunks."""
|
||||
project, package = test_package
|
||||
size = CHUNK_SIZE * 3 + 1000 # 3 full chunks + partial
|
||||
content, expected_hash = sized_content(size, seed=300)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="multi_chunk.bin", tag="multi-chunk"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["size"] == size
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/multi-chunk",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
|
||||
|
||||
class TestDataIntegrity:
|
||||
"""Tests for data integrity with various content types."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_binary_content_integrity(self, integration_client, test_package):
|
||||
"""Test binary content (all byte values 0-255) integrity."""
|
||||
project, package = test_package
|
||||
# Content with all 256 possible byte values
|
||||
content = bytes(range(256)) * 100 # 25.6KB
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="binary.bin", tag="binary"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/binary",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_text_content_integrity(self, integration_client, test_package):
|
||||
"""Test UTF-8 text content integrity."""
|
||||
project, package = test_package
|
||||
content = "Hello, World! 你好世界 🌍 مرحبا العالم".encode("utf-8")
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="text.txt", tag="text"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/text",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
assert response.content.decode("utf-8") == "Hello, World! 你好世界 🌍 مرحبا العالم"
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_null_bytes_content_integrity(self, integration_client, test_package):
|
||||
"""Test content with null bytes."""
|
||||
project, package = test_package
|
||||
content = b"before\x00null\x00bytes\x00after"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="nulls.bin", tag="nulls"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/nulls",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
assert b"\x00" in response.content
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_unicode_filename_integrity(self, integration_client, test_package):
|
||||
"""Test file with unicode filename."""
|
||||
project, package = test_package
|
||||
content = b"unicode filename test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="文件名.txt", tag="unicode-name"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["original_name"] == "文件名.txt"
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/unicode-name",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_compressed_content_integrity(self, integration_client, test_package):
|
||||
"""Test gzip-compressed content integrity."""
|
||||
import gzip
|
||||
|
||||
project, package = test_package
|
||||
original = b"This is some text that will be compressed " * 100
|
||||
content = gzip.compress(original)
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="data.gz", tag="compressed"
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/compressed",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
# Verify we can decompress
|
||||
assert gzip.decompress(response.content) == original
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_hash_verification_matches(self, integration_client, test_package, sized_content):
|
||||
"""Test that computed hash matches artifact_id for various sizes."""
|
||||
project, package = test_package
|
||||
|
||||
sizes = [SIZE_1B, SIZE_1KB, SIZE_10KB, SIZE_100KB, SIZE_1MB]
|
||||
|
||||
for i, size in enumerate(sizes):
|
||||
content, expected_hash = sized_content(size, seed=1000 + i)
|
||||
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename=f"hash_test_{size}.bin", tag=f"hash-{size}"
|
||||
)
|
||||
|
||||
# Verify artifact_id matches expected hash
|
||||
assert result["artifact_id"] == expected_hash
|
||||
|
||||
# Download and verify hash of downloaded content
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/hash-{size}",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
downloaded_hash = compute_sha256(response.content)
|
||||
assert downloaded_hash == expected_hash
|
||||
@@ -25,6 +25,19 @@ from tests.factories import (
|
||||
class TestUploadBasics:
|
||||
"""Tests for basic upload functionality."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_returns_200(self, integration_client, test_package):
|
||||
"""Test upload with valid file returns 200."""
|
||||
project, package = test_package
|
||||
content = b"valid file upload test"
|
||||
|
||||
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_returns_artifact_id(self, integration_client, test_package):
|
||||
"""Test upload returns the artifact ID (SHA256 hash)."""
|
||||
@@ -101,6 +114,82 @@ class TestUploadBasics:
|
||||
assert "created_at" in result
|
||||
assert result["created_at"] is not None
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_without_tag_succeeds(self, integration_client, test_package):
|
||||
"""Test upload without tag succeeds (no tag created)."""
|
||||
project, package = test_package
|
||||
content = b"upload without tag test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
files = {"file": ("no_tag.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
# No tag parameter
|
||||
)
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
assert result["artifact_id"] == expected_hash
|
||||
|
||||
# Verify no tag was created - list tags and check
|
||||
tags_response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/tags"
|
||||
)
|
||||
assert tags_response.status_code == 200
|
||||
tags = tags_response.json()
|
||||
# Filter for tags pointing to this artifact
|
||||
artifact_tags = [t for t in tags.get("items", tags) if t.get("artifact_id") == expected_hash]
|
||||
assert len(artifact_tags) == 0, "Tag should not be created when not specified"
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_creates_artifact_in_database(self, integration_client, test_package):
|
||||
"""Test upload creates artifact record in database."""
|
||||
project, package = test_package
|
||||
content = b"database artifact test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
upload_test_file(integration_client, project, package, content)
|
||||
|
||||
# Verify artifact exists via API
|
||||
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert response.status_code == 200
|
||||
artifact = response.json()
|
||||
assert artifact["id"] == expected_hash
|
||||
assert artifact["size"] == len(content)
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_creates_object_in_s3(self, integration_client, test_package):
|
||||
"""Test upload creates object in S3 storage."""
|
||||
project, package = test_package
|
||||
content = b"s3 object creation test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
upload_test_file(integration_client, project, package, content)
|
||||
|
||||
# Verify S3 object exists
|
||||
assert s3_object_exists(expected_hash), "S3 object should exist after upload"
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_with_tag_creates_tag_record(self, integration_client, test_package):
|
||||
"""Test upload with tag creates tag record."""
|
||||
project, package = test_package
|
||||
content = b"tag creation test"
|
||||
expected_hash = compute_sha256(content)
|
||||
tag_name = "my-tag-v1"
|
||||
|
||||
upload_test_file(
|
||||
integration_client, project, package, content, tag=tag_name
|
||||
)
|
||||
|
||||
# Verify tag exists
|
||||
tags_response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/tags"
|
||||
)
|
||||
assert tags_response.status_code == 200
|
||||
tags = tags_response.json()
|
||||
tag_names = [t["name"] for t in tags.get("items", tags)]
|
||||
assert tag_name in tag_names
|
||||
|
||||
|
||||
class TestDuplicateUploads:
|
||||
"""Tests for duplicate upload deduplication behavior."""
|
||||
@@ -248,6 +337,23 @@ class TestDownload:
|
||||
assert response.status_code == 200
|
||||
assert response.content == original_content
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_by_tag_prefix(self, integration_client, test_package):
|
||||
"""Test downloading artifact using tag: prefix."""
|
||||
project, package = test_package
|
||||
original_content = b"download by tag prefix test"
|
||||
|
||||
upload_test_file(
|
||||
integration_client, project, package, original_content, tag="prefix-tag"
|
||||
)
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/tag:prefix-tag",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == original_content
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_nonexistent_tag(self, integration_client, test_package):
|
||||
"""Test downloading nonexistent tag returns 404."""
|
||||
@@ -258,6 +364,33 @@ class TestDownload:
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_nonexistent_artifact(self, integration_client, test_package):
|
||||
"""Test downloading nonexistent artifact ID returns 404."""
|
||||
project, package = test_package
|
||||
fake_hash = "0" * 64
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/artifact:{fake_hash}"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_from_nonexistent_project(self, integration_client, unique_test_id):
|
||||
"""Test downloading from nonexistent project returns 404."""
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/nonexistent-project-{unique_test_id}/somepackage/+/sometag"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_from_nonexistent_package(self, integration_client, test_project, unique_test_id):
|
||||
"""Test downloading from nonexistent package returns 404."""
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/+/sometag"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_content_matches_original(self, integration_client, test_package):
|
||||
"""Test downloaded content matches original exactly."""
|
||||
@@ -275,6 +408,111 @@ class TestDownload:
|
||||
assert response.content == original_content
|
||||
|
||||
|
||||
class TestDownloadHeaders:
|
||||
"""Tests for download response headers."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_content_type_header(self, integration_client, test_package):
|
||||
"""Test download returns correct Content-Type header."""
|
||||
project, package = test_package
|
||||
content = b"content type header test"
|
||||
|
||||
upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename="test.txt", tag="content-type-test"
|
||||
)
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/content-type-test",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
# Content-Type should be set (either text/plain or application/octet-stream)
|
||||
assert "content-type" in response.headers
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_content_length_header(self, integration_client, test_package):
|
||||
"""Test download returns correct Content-Length header."""
|
||||
project, package = test_package
|
||||
content = b"content length header test - exactly 41 bytes!"
|
||||
expected_length = len(content)
|
||||
|
||||
upload_test_file(
|
||||
integration_client, project, package, content, tag="content-length-test"
|
||||
)
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/content-length-test",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "content-length" in response.headers
|
||||
assert int(response.headers["content-length"]) == expected_length
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_content_disposition_header(self, integration_client, test_package):
|
||||
"""Test download returns correct Content-Disposition header."""
|
||||
project, package = test_package
|
||||
content = b"content disposition test"
|
||||
filename = "my-test-file.bin"
|
||||
|
||||
upload_test_file(
|
||||
integration_client, project, package, content,
|
||||
filename=filename, tag="disposition-test"
|
||||
)
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/disposition-test",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "content-disposition" in response.headers
|
||||
disposition = response.headers["content-disposition"]
|
||||
assert "attachment" in disposition
|
||||
assert filename in disposition
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_checksum_headers(self, integration_client, test_package):
|
||||
"""Test download returns checksum headers."""
|
||||
project, package = test_package
|
||||
content = b"checksum header test content"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
upload_test_file(
|
||||
integration_client, project, package, content, tag="checksum-headers"
|
||||
)
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/checksum-headers",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
# Check for checksum headers
|
||||
assert "x-checksum-sha256" in response.headers
|
||||
assert response.headers["x-checksum-sha256"] == expected_hash
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_etag_header(self, integration_client, test_package):
|
||||
"""Test download returns ETag header (artifact ID)."""
|
||||
project, package = test_package
|
||||
content = b"etag header test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
upload_test_file(
|
||||
integration_client, project, package, content, tag="etag-test"
|
||||
)
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/etag-test",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "etag" in response.headers
|
||||
# ETag should contain the artifact ID (hash)
|
||||
etag = response.headers["etag"].strip('"')
|
||||
assert etag == expected_hash
|
||||
|
||||
|
||||
class TestConcurrentUploads:
|
||||
"""Tests for concurrent upload handling."""
|
||||
|
||||
|
||||
347
backend/tests/integration/test_version_api.py
Normal file
347
backend/tests/integration/test_version_api.py
Normal file
@@ -0,0 +1,347 @@
|
||||
"""
|
||||
Integration tests for package version API endpoints.
|
||||
|
||||
Tests cover:
|
||||
- Version creation via upload
|
||||
- Version auto-detection from filename
|
||||
- Version listing and retrieval
|
||||
- Download by version prefix
|
||||
- Version deletion
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import io
|
||||
from tests.factories import (
|
||||
compute_sha256,
|
||||
upload_test_file,
|
||||
)
|
||||
|
||||
|
||||
class TestVersionCreation:
|
||||
"""Tests for creating versions via upload."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_with_explicit_version(self, integration_client, test_package):
|
||||
"""Test upload with explicit version parameter creates version record."""
|
||||
project, package = test_package
|
||||
content = b"version creation test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
files = {"file": ("app.tar.gz", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"version": "1.0.0"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result.get("version") == "1.0.0"
|
||||
assert result.get("version_source") == "explicit"
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_with_version_and_tag(self, integration_client, test_package):
|
||||
"""Test upload with both version and tag creates both records."""
|
||||
project, package = test_package
|
||||
content = b"version and tag test"
|
||||
|
||||
files = {"file": ("app.tar.gz", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"version": "2.0.0", "tag": "latest"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
assert result.get("version") == "2.0.0"
|
||||
|
||||
# Verify tag was also created
|
||||
tags_response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/tags"
|
||||
)
|
||||
assert tags_response.status_code == 200
|
||||
tags = tags_response.json()
|
||||
tag_names = [t["name"] for t in tags.get("items", tags)]
|
||||
assert "latest" in tag_names
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_duplicate_version_same_content_succeeds(self, integration_client, test_package):
|
||||
"""Test uploading same version with same content succeeds (deduplication)."""
|
||||
project, package = test_package
|
||||
content = b"version dedup test"
|
||||
|
||||
# First upload with version
|
||||
files1 = {"file": ("app1.tar.gz", io.BytesIO(content), "application/octet-stream")}
|
||||
response1 = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files1,
|
||||
data={"version": "3.0.0"},
|
||||
)
|
||||
assert response1.status_code == 200
|
||||
|
||||
# Second upload with same version and same content succeeds
|
||||
files2 = {"file": ("app2.tar.gz", io.BytesIO(content), "application/octet-stream")}
|
||||
response2 = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files2,
|
||||
data={"version": "3.0.0"},
|
||||
)
|
||||
# This succeeds because it's the same artifact (deduplication)
|
||||
assert response2.status_code == 200
|
||||
|
||||
|
||||
class TestVersionAutoDetection:
|
||||
"""Tests for automatic version detection from filename."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_version_detected_from_filename_tarball(self, integration_client, test_package):
|
||||
"""Test version is auto-detected from tarball filename or metadata."""
|
||||
project, package = test_package
|
||||
content = b"auto detect version tarball"
|
||||
|
||||
files = {"file": ("myapp-1.2.3.tar.gz", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
assert result.get("version") == "1.2.3"
|
||||
# Version source can be 'filename' or 'metadata' depending on detection order
|
||||
assert result.get("version_source") in ["filename", "metadata"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_version_detected_from_filename_zip(self, integration_client, test_package):
|
||||
"""Test version is auto-detected from zip filename."""
|
||||
project, package = test_package
|
||||
content = b"auto detect version zip"
|
||||
|
||||
files = {"file": ("package-2.0.0.zip", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
assert result.get("version") == "2.0.0"
|
||||
assert result.get("version_source") == "filename"
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_explicit_version_overrides_filename(self, integration_client, test_package):
|
||||
"""Test explicit version parameter overrides filename detection."""
|
||||
project, package = test_package
|
||||
content = b"explicit override test"
|
||||
|
||||
files = {"file": ("myapp-1.0.0.tar.gz", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"version": "9.9.9"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
assert result.get("version") == "9.9.9"
|
||||
assert result.get("version_source") == "explicit"
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_no_version_detected_from_plain_filename(self, integration_client, test_package):
|
||||
"""Test no version is created for filenames without version pattern."""
|
||||
project, package = test_package
|
||||
content = b"no version in filename"
|
||||
|
||||
files = {"file": ("plain-file.bin", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
result = response.json()
|
||||
# Version should be None or not present
|
||||
assert result.get("version") is None
|
||||
|
||||
|
||||
class TestVersionListing:
|
||||
"""Tests for listing and retrieving versions."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_list_versions(self, integration_client, test_package):
|
||||
"""Test listing all versions for a package."""
|
||||
project, package = test_package
|
||||
|
||||
# Create multiple versions
|
||||
for ver in ["1.0.0", "1.1.0", "2.0.0"]:
|
||||
content = f"version {ver} content".encode()
|
||||
files = {"file": (f"app-{ver}.tar.gz", io.BytesIO(content), "application/octet-stream")}
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"version": ver},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
# List versions
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/versions"
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
versions = [v["version"] for v in data.get("items", data)]
|
||||
assert "1.0.0" in versions
|
||||
assert "1.1.0" in versions
|
||||
assert "2.0.0" in versions
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_get_specific_version(self, integration_client, test_package):
|
||||
"""Test getting details for a specific version."""
|
||||
project, package = test_package
|
||||
content = b"specific version test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Create version
|
||||
files = {"file": ("app-4.0.0.tar.gz", io.BytesIO(content), "application/octet-stream")}
|
||||
integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"version": "4.0.0"},
|
||||
)
|
||||
|
||||
# Get version details
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/versions/4.0.0"
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["version"] == "4.0.0"
|
||||
assert data["artifact_id"] == expected_hash
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_get_nonexistent_version_returns_404(self, integration_client, test_package):
|
||||
"""Test getting nonexistent version returns 404."""
|
||||
project, package = test_package
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/versions/99.99.99"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
class TestDownloadByVersion:
|
||||
"""Tests for downloading artifacts by version."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_by_version_prefix(self, integration_client, test_package):
|
||||
"""Test downloading artifact using version: prefix."""
|
||||
project, package = test_package
|
||||
content = b"download by version test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Upload with version
|
||||
files = {"file": ("app.tar.gz", io.BytesIO(content), "application/octet-stream")}
|
||||
integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"version": "5.0.0"},
|
||||
)
|
||||
|
||||
# Download by version prefix
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/version:5.0.0",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == content
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_download_nonexistent_version_returns_404(self, integration_client, test_package):
|
||||
"""Test downloading nonexistent version returns 404."""
|
||||
project, package = test_package
|
||||
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/version:99.0.0"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_version_resolution_priority(self, integration_client, test_package):
|
||||
"""Test that version: prefix explicitly resolves to version, not tag."""
|
||||
project, package = test_package
|
||||
version_content = b"this is the version content"
|
||||
tag_content = b"this is the tag content"
|
||||
|
||||
# Create a version 6.0.0
|
||||
files1 = {"file": ("app-v.tar.gz", io.BytesIO(version_content), "application/octet-stream")}
|
||||
integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files1,
|
||||
data={"version": "6.0.0"},
|
||||
)
|
||||
|
||||
# Create a tag named "6.0.0" pointing to different content
|
||||
files2 = {"file": ("app-t.tar.gz", io.BytesIO(tag_content), "application/octet-stream")}
|
||||
integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files2,
|
||||
data={"tag": "6.0.0"},
|
||||
)
|
||||
|
||||
# Download with version: prefix should get version content
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/version:6.0.0",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.content == version_content
|
||||
|
||||
# Download with tag: prefix should get tag content
|
||||
response2 = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/tag:6.0.0",
|
||||
params={"mode": "proxy"},
|
||||
)
|
||||
assert response2.status_code == 200
|
||||
assert response2.content == tag_content
|
||||
|
||||
|
||||
class TestVersionDeletion:
|
||||
"""Tests for deleting versions."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_delete_version(self, integration_client, test_package):
|
||||
"""Test deleting a version."""
|
||||
project, package = test_package
|
||||
content = b"delete version test"
|
||||
|
||||
# Create version
|
||||
files = {"file": ("app.tar.gz", io.BytesIO(content), "application/octet-stream")}
|
||||
integration_client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"version": "7.0.0"},
|
||||
)
|
||||
|
||||
# Verify version exists
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/versions/7.0.0"
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
# Delete version - returns 204 No Content on success
|
||||
delete_response = integration_client.delete(
|
||||
f"/api/v1/project/{project}/{package}/versions/7.0.0"
|
||||
)
|
||||
assert delete_response.status_code == 204
|
||||
|
||||
# Verify version no longer exists
|
||||
response2 = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/versions/7.0.0"
|
||||
)
|
||||
assert response2.status_code == 404
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_delete_nonexistent_version_returns_404(self, integration_client, test_package):
|
||||
"""Test deleting nonexistent version returns 404."""
|
||||
project, package = test_package
|
||||
|
||||
response = integration_client.delete(
|
||||
f"/api/v1/project/{project}/{package}/versions/99.0.0"
|
||||
)
|
||||
assert response.status_code == 404
|
||||
Reference in New Issue
Block a user