- Update CacheRequest test to use version field - Fix upload_test_file calls that still used tag parameter - Update artifact history test to check versions instead of tags - Update artifact stats tests to check versions instead of tags - Fix garbage collection tests to delete versions instead of tags - Remove TestGlobalTags class (endpoint removed) - Update project/package stats tests to check version_count - Fix upload_test_file fixture in test_download_verification
927 lines
34 KiB
Python
927 lines
34 KiB
Python
"""
|
|
Integration tests for upload and download API endpoints.
|
|
|
|
Tests cover:
|
|
- Upload functionality and deduplication
|
|
- Download by tag and artifact ID
|
|
- Concurrent upload handling
|
|
- File size validation
|
|
- Upload failure cleanup
|
|
- S3 storage verification
|
|
"""
|
|
|
|
import os
|
|
import pytest
|
|
import io
|
|
import threading
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from tests.factories import (
|
|
compute_sha256,
|
|
upload_test_file,
|
|
list_s3_objects_by_hash,
|
|
s3_object_exists,
|
|
)
|
|
|
|
|
|
class TestUploadBasics:
|
|
"""Tests for basic upload functionality."""
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_returns_200(self, integration_client, test_package):
|
|
"""Test upload with valid file returns 200."""
|
|
project, package = test_package
|
|
content = b"valid file upload test"
|
|
|
|
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_returns_artifact_id(self, integration_client, test_package):
|
|
"""Test upload returns the artifact ID (SHA256 hash)."""
|
|
project_name, package_name = test_package
|
|
content = b"basic upload test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
result = upload_test_file(
|
|
integration_client, project_name, package_name, content, version="v1"
|
|
)
|
|
|
|
assert result["artifact_id"] == expected_hash
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_response_has_upload_id(self, integration_client, test_package):
|
|
"""Test upload response includes upload_id."""
|
|
project_name, package_name = test_package
|
|
|
|
result = upload_test_file(
|
|
integration_client,
|
|
project_name,
|
|
package_name,
|
|
b"upload id test",
|
|
"uploadid.txt",
|
|
)
|
|
|
|
assert "upload_id" in result
|
|
assert result["upload_id"] is not None
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_response_has_content_type(self, integration_client, test_package):
|
|
"""Test upload response includes content_type."""
|
|
project_name, package_name = test_package
|
|
|
|
result = upload_test_file(
|
|
integration_client,
|
|
project_name,
|
|
package_name,
|
|
b"content type test",
|
|
"content.txt",
|
|
)
|
|
|
|
assert "content_type" in result
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_response_has_original_name(self, integration_client, test_package):
|
|
"""Test upload response includes original_name."""
|
|
project_name, package_name = test_package
|
|
|
|
result = upload_test_file(
|
|
integration_client,
|
|
project_name,
|
|
package_name,
|
|
b"original name test",
|
|
"originalname.txt",
|
|
)
|
|
|
|
assert "original_name" in result
|
|
assert result["original_name"] == "originalname.txt"
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_response_has_created_at(self, integration_client, test_package):
|
|
"""Test upload response includes created_at."""
|
|
project_name, package_name = test_package
|
|
|
|
result = upload_test_file(
|
|
integration_client,
|
|
project_name,
|
|
package_name,
|
|
b"created at test",
|
|
"createdat.txt",
|
|
)
|
|
|
|
assert "created_at" in result
|
|
assert result["created_at"] is not None
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_without_version_succeeds(self, integration_client, test_package):
|
|
"""Test upload without version succeeds (no version created)."""
|
|
project, package = test_package
|
|
content = b"upload without version test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
files = {"file": ("no_version.bin", io.BytesIO(content), "application/octet-stream")}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
# No version parameter
|
|
)
|
|
assert response.status_code == 200
|
|
result = response.json()
|
|
assert result["artifact_id"] == expected_hash
|
|
# Version should be None when not specified
|
|
assert result.get("version") is None
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_creates_artifact_in_database(self, integration_client, test_package):
|
|
"""Test upload creates artifact record in database."""
|
|
project, package = test_package
|
|
content = b"database artifact test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
upload_test_file(integration_client, project, package, content)
|
|
|
|
# Verify artifact exists via API
|
|
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
|
assert response.status_code == 200
|
|
artifact = response.json()
|
|
assert artifact["id"] == expected_hash
|
|
assert artifact["size"] == len(content)
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_direct_s3
|
|
def test_upload_creates_object_in_s3(self, integration_client, test_package):
|
|
"""Test upload creates object in S3 storage."""
|
|
project, package = test_package
|
|
content = b"s3 object creation test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
upload_test_file(integration_client, project, package, content)
|
|
|
|
# Verify S3 object exists
|
|
assert s3_object_exists(expected_hash), "S3 object should exist after upload"
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_with_version_creates_version_record(self, integration_client, test_package):
|
|
"""Test upload with version creates version record."""
|
|
project, package = test_package
|
|
content = b"version creation test"
|
|
expected_hash = compute_sha256(content)
|
|
version_name = "1.0.0"
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version=version_name
|
|
)
|
|
|
|
# Verify version was created
|
|
assert result.get("version") == version_name
|
|
assert result["artifact_id"] == expected_hash
|
|
|
|
# Verify version exists in versions list
|
|
versions_response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/versions"
|
|
)
|
|
assert versions_response.status_code == 200
|
|
versions = versions_response.json()
|
|
version_names = [v["version"] for v in versions.get("items", [])]
|
|
assert version_name in version_names
|
|
|
|
|
|
class TestDuplicateUploads:
|
|
"""Tests for duplicate upload deduplication behavior."""
|
|
|
|
@pytest.mark.integration
|
|
def test_same_file_twice_returns_same_artifact_id(
|
|
self, integration_client, test_package
|
|
):
|
|
"""Test uploading same file twice returns same artifact_id."""
|
|
project, package = test_package
|
|
content = b"content uploaded twice for same artifact test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
# First upload
|
|
result1 = upload_test_file(
|
|
integration_client, project, package, content, version="first"
|
|
)
|
|
assert result1["artifact_id"] == expected_hash
|
|
|
|
# Second upload
|
|
result2 = upload_test_file(
|
|
integration_client, project, package, content, version="second"
|
|
)
|
|
assert result2["artifact_id"] == expected_hash
|
|
assert result1["artifact_id"] == result2["artifact_id"]
|
|
|
|
@pytest.mark.integration
|
|
def test_same_file_twice_increments_ref_count(
|
|
self, integration_client, test_package
|
|
):
|
|
"""Test uploading same file twice increments ref_count to 2."""
|
|
project, package = test_package
|
|
content = b"content for ref count increment test"
|
|
|
|
# First upload
|
|
result1 = upload_test_file(
|
|
integration_client, project, package, content, version="v1"
|
|
)
|
|
assert result1["ref_count"] == 1
|
|
|
|
# Second upload
|
|
result2 = upload_test_file(
|
|
integration_client, project, package, content, version="v2"
|
|
)
|
|
assert result2["ref_count"] == 2
|
|
|
|
@pytest.mark.integration
|
|
def test_same_file_different_packages_shares_artifact(
|
|
self, integration_client, test_project, unique_test_id
|
|
):
|
|
"""Test uploading same file to different packages shares artifact."""
|
|
project = test_project
|
|
content = f"content shared across packages {unique_test_id}".encode()
|
|
expected_hash = compute_sha256(content)
|
|
|
|
# Create two packages
|
|
pkg1 = f"package-a-{unique_test_id}"
|
|
pkg2 = f"package-b-{unique_test_id}"
|
|
|
|
integration_client.post(
|
|
f"/api/v1/project/{project}/packages",
|
|
json={"name": pkg1, "description": "Package A"},
|
|
)
|
|
integration_client.post(
|
|
f"/api/v1/project/{project}/packages",
|
|
json={"name": pkg2, "description": "Package B"},
|
|
)
|
|
|
|
# Upload to first package
|
|
result1 = upload_test_file(integration_client, project, pkg1, content, version="v1")
|
|
assert result1["artifact_id"] == expected_hash
|
|
assert result1["deduplicated"] is False
|
|
|
|
# Upload to second package
|
|
result2 = upload_test_file(integration_client, project, pkg2, content, version="v1")
|
|
assert result2["artifact_id"] == expected_hash
|
|
assert result2["deduplicated"] is True
|
|
|
|
@pytest.mark.integration
|
|
def test_same_file_different_filenames_shares_artifact(
|
|
self, integration_client, test_package
|
|
):
|
|
"""Test uploading same file with different filenames shares artifact."""
|
|
project, package = test_package
|
|
content = b"content with different filenames"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
# Upload with filename1
|
|
result1 = upload_test_file(
|
|
integration_client,
|
|
project,
|
|
package,
|
|
content,
|
|
filename="file1.bin",
|
|
version="v1",
|
|
)
|
|
assert result1["artifact_id"] == expected_hash
|
|
|
|
# Upload with filename2
|
|
result2 = upload_test_file(
|
|
integration_client,
|
|
project,
|
|
package,
|
|
content,
|
|
filename="file2.bin",
|
|
version="v2",
|
|
)
|
|
assert result2["artifact_id"] == expected_hash
|
|
assert result2["deduplicated"] is True
|
|
|
|
|
|
class TestDownload:
|
|
"""Tests for download functionality."""
|
|
|
|
@pytest.mark.integration
|
|
def test_download_by_version(self, integration_client, test_package):
|
|
"""Test downloading artifact by version."""
|
|
project, package = test_package
|
|
original_content = b"download by version test"
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, original_content, version="1.0.0"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/1.0.0",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.content == original_content
|
|
|
|
@pytest.mark.integration
|
|
def test_download_by_artifact_id(self, integration_client, test_package):
|
|
"""Test downloading artifact by artifact ID."""
|
|
project, package = test_package
|
|
original_content = b"download by id test"
|
|
expected_hash = compute_sha256(original_content)
|
|
|
|
upload_test_file(integration_client, project, package, original_content)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/artifact:{expected_hash}",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.content == original_content
|
|
|
|
@pytest.mark.integration
|
|
def test_download_by_version_prefix(self, integration_client, test_package):
|
|
"""Test downloading artifact using version: prefix."""
|
|
project, package = test_package
|
|
original_content = b"download by version prefix test"
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, original_content, version="2.0.0"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/version:2.0.0",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.content == original_content
|
|
|
|
@pytest.mark.integration
|
|
def test_download_nonexistent_version(self, integration_client, test_package):
|
|
"""Test downloading nonexistent version returns 404."""
|
|
project, package = test_package
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/nonexistent-version"
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
@pytest.mark.integration
|
|
def test_download_nonexistent_artifact(self, integration_client, test_package):
|
|
"""Test downloading nonexistent artifact ID returns 404."""
|
|
project, package = test_package
|
|
fake_hash = "0" * 64
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/artifact:{fake_hash}"
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
@pytest.mark.integration
|
|
def test_download_from_nonexistent_project(self, integration_client, unique_test_id):
|
|
"""Test downloading from nonexistent project returns 404."""
|
|
response = integration_client.get(
|
|
f"/api/v1/project/nonexistent-project-{unique_test_id}/somepackage/+/sometag"
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
@pytest.mark.integration
|
|
def test_download_from_nonexistent_package(self, integration_client, test_project, unique_test_id):
|
|
"""Test downloading from nonexistent package returns 404."""
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/+/sometag"
|
|
)
|
|
assert response.status_code == 404
|
|
|
|
@pytest.mark.integration
|
|
def test_content_matches_original(self, integration_client, test_package):
|
|
"""Test downloaded content matches original exactly."""
|
|
project, package = test_package
|
|
original_content = b"exact content verification test data 12345"
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, original_content, version="verify"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/verify", params={"mode": "proxy"}
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.content == original_content
|
|
|
|
|
|
class TestDownloadHeaders:
|
|
"""Tests for download response headers."""
|
|
|
|
@pytest.mark.integration
|
|
def test_download_content_type_header(self, integration_client, test_package):
|
|
"""Test download returns correct Content-Type header."""
|
|
project, package = test_package
|
|
content = b"content type header test"
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content,
|
|
filename="test.txt", version="content-type-test"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/content-type-test",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
# Content-Type should be set (either text/plain or application/octet-stream)
|
|
assert "content-type" in response.headers
|
|
|
|
@pytest.mark.integration
|
|
def test_download_content_length_header(self, integration_client, test_package):
|
|
"""Test download returns correct Content-Length header."""
|
|
project, package = test_package
|
|
content = b"content length header test - exactly 41 bytes!"
|
|
expected_length = len(content)
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content, version="content-length-test"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/content-length-test",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "content-length" in response.headers
|
|
assert int(response.headers["content-length"]) == expected_length
|
|
|
|
@pytest.mark.integration
|
|
def test_download_content_disposition_header(self, integration_client, test_package):
|
|
"""Test download returns correct Content-Disposition header."""
|
|
project, package = test_package
|
|
content = b"content disposition test"
|
|
filename = "my-test-file.bin"
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content,
|
|
filename=filename, version="disposition-test"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/disposition-test",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "content-disposition" in response.headers
|
|
disposition = response.headers["content-disposition"]
|
|
assert "attachment" in disposition
|
|
assert filename in disposition
|
|
|
|
@pytest.mark.integration
|
|
def test_download_checksum_headers(self, integration_client, test_package):
|
|
"""Test download returns checksum headers."""
|
|
project, package = test_package
|
|
content = b"checksum header test content"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content, version="checksum-headers"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/checksum-headers",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
# Check for checksum headers
|
|
assert "x-checksum-sha256" in response.headers
|
|
assert response.headers["x-checksum-sha256"] == expected_hash
|
|
|
|
@pytest.mark.integration
|
|
def test_download_etag_header(self, integration_client, test_package):
|
|
"""Test download returns ETag header (artifact ID)."""
|
|
project, package = test_package
|
|
content = b"etag header test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
upload_test_file(
|
|
integration_client, project, package, content, version="etag-test"
|
|
)
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/etag-test",
|
|
params={"mode": "proxy"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert "etag" in response.headers
|
|
# ETag should contain the artifact ID (hash)
|
|
etag = response.headers["etag"].strip('"')
|
|
assert etag == expected_hash
|
|
|
|
|
|
class TestConcurrentUploads:
|
|
"""Tests for concurrent upload handling."""
|
|
|
|
@pytest.mark.integration
|
|
def test_concurrent_uploads_same_file(self, integration_client, test_package):
|
|
"""Test concurrent uploads of same file handle deduplication correctly."""
|
|
project, package = test_package
|
|
content = b"content for concurrent upload test"
|
|
expected_hash = compute_sha256(content)
|
|
num_concurrent = 5
|
|
|
|
# Create an API key for worker threads
|
|
api_key_response = integration_client.post(
|
|
"/api/v1/auth/keys",
|
|
json={"name": "concurrent-test-key"},
|
|
)
|
|
assert api_key_response.status_code == 200, f"Failed to create API key: {api_key_response.text}"
|
|
api_key = api_key_response.json()["key"]
|
|
|
|
results = []
|
|
errors = []
|
|
|
|
def upload_worker(tag_suffix):
|
|
try:
|
|
from httpx import Client
|
|
|
|
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
|
with Client(base_url=base_url, timeout=30.0) as client:
|
|
files = {
|
|
"file": (
|
|
f"concurrent-{tag_suffix}.bin",
|
|
io.BytesIO(content),
|
|
"application/octet-stream",
|
|
)
|
|
}
|
|
response = client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
data={"version": f"concurrent-{tag_suffix}"},
|
|
headers={"Authorization": f"Bearer {api_key}"},
|
|
)
|
|
if response.status_code == 200:
|
|
results.append(response.json())
|
|
else:
|
|
errors.append(f"Status {response.status_code}: {response.text}")
|
|
except Exception as e:
|
|
errors.append(str(e))
|
|
|
|
with ThreadPoolExecutor(max_workers=num_concurrent) as executor:
|
|
futures = [executor.submit(upload_worker, i) for i in range(num_concurrent)]
|
|
for future in as_completed(futures):
|
|
pass
|
|
|
|
assert len(errors) == 0, f"Errors during concurrent uploads: {errors}"
|
|
assert len(results) == num_concurrent
|
|
|
|
# All should have same artifact_id
|
|
artifact_ids = set(r["artifact_id"] for r in results)
|
|
assert len(artifact_ids) == 1
|
|
assert expected_hash in artifact_ids
|
|
|
|
# Verify final ref_count
|
|
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
|
assert response.status_code == 200
|
|
assert response.json()["ref_count"] == num_concurrent
|
|
|
|
|
|
class TestFileSizeValidation:
|
|
"""Tests for file size limits and empty file rejection."""
|
|
|
|
@pytest.mark.integration
|
|
def test_empty_file_rejected(self, integration_client, test_package):
|
|
"""Test empty files are rejected with appropriate error."""
|
|
project, package = test_package
|
|
|
|
files = {"file": ("empty.txt", io.BytesIO(b""), "application/octet-stream")}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
)
|
|
|
|
assert response.status_code in [422, 400]
|
|
|
|
@pytest.mark.integration
|
|
def test_small_valid_file_accepted(self, integration_client, test_package):
|
|
"""Test small (1 byte) files are accepted."""
|
|
project, package = test_package
|
|
content = b"X"
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="tiny"
|
|
)
|
|
|
|
assert result["artifact_id"] is not None
|
|
assert result["size"] == 1
|
|
|
|
@pytest.mark.integration
|
|
def test_file_size_reported_correctly(
|
|
self, integration_client, test_package, unique_test_id
|
|
):
|
|
"""Test file size is correctly reported in response."""
|
|
project, package = test_package
|
|
content = f"Test content for size check {unique_test_id}".encode()
|
|
expected_size = len(content)
|
|
|
|
result = upload_test_file(
|
|
integration_client, project, package, content, version="size-test"
|
|
)
|
|
|
|
assert result["size"] == expected_size
|
|
|
|
# Also verify via artifact endpoint
|
|
artifact_response = integration_client.get(
|
|
f"/api/v1/artifact/{result['artifact_id']}"
|
|
)
|
|
assert artifact_response.json()["size"] == expected_size
|
|
|
|
|
|
class TestUploadFailureCleanup:
|
|
"""Tests for cleanup when uploads fail."""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_direct_s3
|
|
def test_upload_failure_invalid_project_no_orphaned_s3(
|
|
self, integration_client, unique_test_id
|
|
):
|
|
"""Test upload to non-existent project doesn't leave orphaned S3 objects."""
|
|
content = f"content for orphan s3 test {unique_test_id}".encode()
|
|
expected_hash = compute_sha256(content)
|
|
|
|
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/nonexistent-project-{unique_test_id}/nonexistent-pkg/upload",
|
|
files=files,
|
|
data={"version": "test"},
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
|
|
# Verify no S3 object was created
|
|
assert not s3_object_exists(expected_hash), (
|
|
"Orphaned S3 object found after failed upload"
|
|
)
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_direct_s3
|
|
def test_upload_failure_invalid_package_no_orphaned_s3(
|
|
self, integration_client, test_project, unique_test_id
|
|
):
|
|
"""Test upload to non-existent package doesn't leave orphaned S3 objects."""
|
|
content = f"content for orphan s3 test pkg {unique_test_id}".encode()
|
|
expected_hash = compute_sha256(content)
|
|
|
|
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload",
|
|
files=files,
|
|
data={"version": "test"},
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
|
|
assert not s3_object_exists(expected_hash), (
|
|
"Orphaned S3 object found after failed upload"
|
|
)
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_failure_no_orphaned_database_records(
|
|
self, integration_client, test_project, unique_test_id
|
|
):
|
|
"""Test failed upload doesn't leave orphaned database records."""
|
|
content = f"content for db orphan test {unique_test_id}".encode()
|
|
expected_hash = compute_sha256(content)
|
|
|
|
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload",
|
|
files=files,
|
|
data={"version": "test"},
|
|
)
|
|
|
|
assert response.status_code == 404
|
|
|
|
artifact_response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
|
assert artifact_response.status_code == 404, (
|
|
"Orphaned artifact record found after failed upload"
|
|
)
|
|
|
|
|
|
class TestS3StorageVerification:
|
|
"""Tests to verify S3 storage behavior."""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_direct_s3
|
|
def test_s3_single_object_after_duplicates(
|
|
self, integration_client, test_package, unique_test_id
|
|
):
|
|
"""Test S3 bucket contains only one object after duplicate uploads."""
|
|
project, package = test_package
|
|
content = f"content for s3 object count test {unique_test_id}".encode()
|
|
expected_hash = compute_sha256(content)
|
|
|
|
# Upload same content multiple times
|
|
for tag in ["s3test1", "s3test2", "s3test3"]:
|
|
upload_test_file(integration_client, project, package, content, version=tag)
|
|
|
|
# Verify only one S3 object exists
|
|
s3_objects = list_s3_objects_by_hash(expected_hash)
|
|
assert len(s3_objects) == 1, (
|
|
f"Expected 1 S3 object, found {len(s3_objects)}: {s3_objects}"
|
|
)
|
|
|
|
# Verify object key follows expected pattern
|
|
expected_key = (
|
|
f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
|
|
)
|
|
assert s3_objects[0] == expected_key
|
|
|
|
@pytest.mark.integration
|
|
def test_artifact_table_single_row_after_duplicates(
|
|
self, integration_client, test_package
|
|
):
|
|
"""Test artifact table contains only one row after duplicate uploads."""
|
|
project, package = test_package
|
|
content = b"content for single row test"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
# Upload same content multiple times
|
|
for tag in ["v1", "v2", "v3"]:
|
|
upload_test_file(integration_client, project, package, content, version=tag)
|
|
|
|
# Query artifact
|
|
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
|
assert response.status_code == 200
|
|
artifact = response.json()
|
|
assert artifact["id"] == expected_hash
|
|
assert artifact["ref_count"] == 3
|
|
|
|
|
|
class TestSecurityPathTraversal:
|
|
"""Tests for path traversal attack prevention.
|
|
|
|
Note: Orchard uses content-addressable storage where files are stored by
|
|
SHA256 hash, not filename. Filenames are metadata only and never used in
|
|
file path construction, so path traversal in filenames is not a security
|
|
vulnerability. These tests verify the system handles unusual inputs safely.
|
|
"""
|
|
|
|
@pytest.mark.integration
|
|
@pytest.mark.requires_direct_s3
|
|
def test_path_traversal_in_filename_stored_safely(
|
|
self, integration_client, test_package
|
|
):
|
|
"""Test filenames with path traversal are stored safely (as metadata only)."""
|
|
project, package = test_package
|
|
content = b"path traversal test content"
|
|
expected_hash = compute_sha256(content)
|
|
|
|
files = {
|
|
"file": (
|
|
"../../../etc/passwd",
|
|
io.BytesIO(content),
|
|
"application/octet-stream",
|
|
)
|
|
}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
data={"version": "traversal-test"},
|
|
)
|
|
assert response.status_code == 200
|
|
result = response.json()
|
|
assert result["artifact_id"] == expected_hash
|
|
s3_objects = list_s3_objects_by_hash(expected_hash)
|
|
assert len(s3_objects) == 1
|
|
assert ".." not in s3_objects[0]
|
|
|
|
@pytest.mark.integration
|
|
def test_path_traversal_in_package_name(self, integration_client, test_project):
|
|
"""Test package names with path traversal sequences are rejected."""
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{test_project}/packages/../../../etc/passwd"
|
|
)
|
|
assert response.status_code in [400, 404, 422]
|
|
|
|
@pytest.mark.integration
|
|
def test_path_traversal_in_version_name(self, integration_client, test_package):
|
|
"""Test version names with path traversal are handled safely."""
|
|
project, package = test_package
|
|
content = b"version traversal test"
|
|
|
|
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
data={"version": "../../../etc/passwd"},
|
|
)
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
@pytest.mark.integration
|
|
def test_download_path_traversal_in_ref(self, integration_client, test_package):
|
|
"""Test download ref with path traversal is rejected."""
|
|
project, package = test_package
|
|
|
|
response = integration_client.get(
|
|
f"/api/v1/project/{project}/{package}/+/../../../etc/passwd"
|
|
)
|
|
assert response.status_code in [400, 404, 422]
|
|
|
|
|
|
class TestSecurityMalformedRequests:
|
|
"""Tests for malformed request handling."""
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_missing_file_field(self, integration_client, test_package):
|
|
"""Test upload without file field returns appropriate error."""
|
|
project, package = test_package
|
|
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
data={"version": "no-file"},
|
|
)
|
|
assert response.status_code == 422
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_null_bytes_in_filename(self, integration_client, test_package):
|
|
"""Test filename with null bytes is handled safely."""
|
|
project, package = test_package
|
|
content = b"null byte test"
|
|
|
|
files = {
|
|
"file": ("test\x00.bin", io.BytesIO(content), "application/octet-stream")
|
|
}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
)
|
|
assert response.status_code in [200, 400, 422]
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_very_long_filename(self, integration_client, test_package):
|
|
"""Test very long filename is handled (truncated or rejected)."""
|
|
project, package = test_package
|
|
content = b"long filename test"
|
|
long_filename = "a" * 1000 + ".bin"
|
|
|
|
files = {
|
|
"file": (long_filename, io.BytesIO(content), "application/octet-stream")
|
|
}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
)
|
|
assert response.status_code in [200, 400, 413, 422]
|
|
|
|
@pytest.mark.integration
|
|
def test_upload_special_characters_in_filename(
|
|
self, integration_client, test_package
|
|
):
|
|
"""Test filenames with special characters are handled safely."""
|
|
project, package = test_package
|
|
content = b"special char test"
|
|
|
|
special_filenames = [
|
|
"test<script>.bin",
|
|
'test"quote.bin',
|
|
"test'apostrophe.bin",
|
|
"test;semicolon.bin",
|
|
"test|pipe.bin",
|
|
]
|
|
|
|
for filename in special_filenames:
|
|
files = {
|
|
"file": (filename, io.BytesIO(content), "application/octet-stream")
|
|
}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
)
|
|
assert response.status_code in [200, 400, 422], (
|
|
f"Unexpected status {response.status_code} for filename: {filename}"
|
|
)
|
|
|
|
@pytest.mark.integration
|
|
def test_invalid_checksum_header_format(self, integration_client, test_package):
|
|
"""Test invalid X-Checksum-SHA256 header format is rejected."""
|
|
project, package = test_package
|
|
content = b"checksum test"
|
|
|
|
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
headers={"X-Checksum-SHA256": "not-a-valid-hash"},
|
|
)
|
|
assert response.status_code == 400
|
|
assert "Invalid" in response.json().get("detail", "")
|
|
|
|
@pytest.mark.integration
|
|
def test_checksum_mismatch_rejected(self, integration_client, test_package):
|
|
"""Test upload with wrong checksum is rejected."""
|
|
project, package = test_package
|
|
content = b"checksum mismatch test"
|
|
wrong_hash = "0" * 64
|
|
|
|
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
|
|
response = integration_client.post(
|
|
f"/api/v1/project/{project}/{package}/upload",
|
|
files=files,
|
|
headers={"X-Checksum-SHA256": wrong_hash},
|
|
)
|
|
assert response.status_code == 422
|
|
assert "verification failed" in response.json().get("detail", "").lower()
|