Add ref_count management for deletions with atomic operations and error handling

This commit is contained in:
Mondo Diaz
2026-01-06 13:44:23 -06:00
parent 66622caf5d
commit 7e68baed08
24 changed files with 6888 additions and 329 deletions

View File

@@ -0,0 +1 @@
# Test package

414
backend/tests/conftest.py Normal file
View File

@@ -0,0 +1,414 @@
"""
Test configuration and fixtures for Orchard backend tests.
This module provides:
- Database fixtures with test isolation
- Mock S3 storage using moto
- Test data factories for common scenarios
"""
import os
import pytest
import hashlib
from typing import Generator, BinaryIO
from unittest.mock import MagicMock, patch
import io
# Set test environment defaults before importing app modules
# Use setdefault to NOT override existing env vars (from docker-compose)
os.environ.setdefault("ORCHARD_DATABASE_HOST", "localhost")
os.environ.setdefault("ORCHARD_DATABASE_PORT", "5432")
os.environ.setdefault("ORCHARD_DATABASE_USER", "test")
os.environ.setdefault("ORCHARD_DATABASE_PASSWORD", "test")
os.environ.setdefault("ORCHARD_DATABASE_DBNAME", "orchard_test")
os.environ.setdefault("ORCHARD_S3_ENDPOINT", "http://localhost:9000")
os.environ.setdefault("ORCHARD_S3_BUCKET", "test-bucket")
os.environ.setdefault("ORCHARD_S3_ACCESS_KEY_ID", "test")
os.environ.setdefault("ORCHARD_S3_SECRET_ACCESS_KEY", "test")
# =============================================================================
# Test Data Factories
# =============================================================================
def create_test_file(content: bytes = None, size: int = 1024) -> io.BytesIO:
"""
Create a test file with known content.
Args:
content: Specific content to use, or None to generate random-ish content
size: Size of generated content if content is None
Returns:
BytesIO object with the content
"""
if content is None:
content = os.urandom(size)
return io.BytesIO(content)
def compute_sha256(content: bytes) -> str:
"""Compute SHA256 hash of content as lowercase hex string."""
return hashlib.sha256(content).hexdigest()
def compute_md5(content: bytes) -> str:
"""Compute MD5 hash of content as lowercase hex string."""
return hashlib.md5(content).hexdigest()
def compute_sha1(content: bytes) -> str:
"""Compute SHA1 hash of content as lowercase hex string."""
return hashlib.sha1(content).hexdigest()
# Known test data with pre-computed hashes
TEST_CONTENT_HELLO = b"Hello, World!"
TEST_HASH_HELLO = "dffd6021bb2bd5b0af676290809ec3a53191dd81c7f70a4b28688a362182986f"
TEST_MD5_HELLO = "65a8e27d8879283831b664bd8b7f0ad4"
TEST_SHA1_HELLO = "0a0a9f2a6772942557ab5355d76af442f8f65e01"
TEST_CONTENT_EMPTY = b""
# Note: Empty content should be rejected by the storage layer
TEST_CONTENT_BINARY = bytes(range(256))
TEST_HASH_BINARY = compute_sha256(TEST_CONTENT_BINARY)
# =============================================================================
# Mock Storage Fixtures
# =============================================================================
class MockS3Client:
"""Mock S3 client for unit testing without actual S3/MinIO."""
def __init__(self):
self.objects = {} # key -> content
self.bucket = "test-bucket"
def put_object(self, Bucket: str, Key: str, Body: bytes) -> dict:
self.objects[Key] = Body
return {"ETag": f'"{compute_md5(Body)}"'}
def get_object(self, Bucket: str, Key: str, **kwargs) -> dict:
if Key not in self.objects:
raise Exception("NoSuchKey")
content = self.objects[Key]
return {
"Body": io.BytesIO(content),
"ContentLength": len(content),
}
def head_object(self, Bucket: str, Key: str) -> dict:
if Key not in self.objects:
from botocore.exceptions import ClientError
error_response = {"Error": {"Code": "404", "Message": "Not Found"}}
raise ClientError(error_response, "HeadObject")
content = self.objects[Key]
return {
"ContentLength": len(content),
"ETag": f'"{compute_md5(content)}"',
}
def delete_object(self, Bucket: str, Key: str) -> dict:
if Key in self.objects:
del self.objects[Key]
return {}
def head_bucket(self, Bucket: str) -> dict:
return {}
def create_multipart_upload(self, Bucket: str, Key: str) -> dict:
return {"UploadId": "test-upload-id"}
def upload_part(
self, Bucket: str, Key: str, UploadId: str, PartNumber: int, Body: bytes
) -> dict:
return {"ETag": f'"{compute_md5(Body)}"'}
def complete_multipart_upload(
self, Bucket: str, Key: str, UploadId: str, MultipartUpload: dict
) -> dict:
return {"ETag": '"test-etag"'}
def abort_multipart_upload(self, Bucket: str, Key: str, UploadId: str) -> dict:
return {}
def generate_presigned_url(
self, ClientMethod: str, Params: dict, ExpiresIn: int
) -> str:
return f"https://test-bucket.s3.amazonaws.com/{Params['Key']}?presigned=true"
@pytest.fixture
def mock_s3_client() -> MockS3Client:
"""Provide a mock S3 client for unit tests."""
return MockS3Client()
@pytest.fixture
def mock_storage(mock_s3_client):
"""
Provide a mock storage instance for unit tests.
Uses the MockS3Client to avoid actual S3/MinIO calls.
"""
from app.storage import S3Storage
storage = S3Storage.__new__(S3Storage)
storage.client = mock_s3_client
storage.bucket = "test-bucket"
storage._active_uploads = {}
return storage
# =============================================================================
# Database Fixtures (for integration tests)
# =============================================================================
@pytest.fixture(scope="session")
def test_db_url():
"""Get the test database URL."""
return (
f"postgresql://{os.environ['ORCHARD_DATABASE_USER']}:"
f"{os.environ['ORCHARD_DATABASE_PASSWORD']}@"
f"{os.environ['ORCHARD_DATABASE_HOST']}:"
f"{os.environ['ORCHARD_DATABASE_PORT']}/"
f"{os.environ['ORCHARD_DATABASE_DBNAME']}"
)
# =============================================================================
# HTTP Client Fixtures (for API tests)
# =============================================================================
@pytest.fixture
def test_app():
"""
Create a test FastAPI application.
Note: This requires the database to be available for integration tests.
For unit tests, use mock_storage fixture instead.
"""
from fastapi.testclient import TestClient
from app.main import app
return TestClient(app)
# =============================================================================
# Integration Test Fixtures
# =============================================================================
@pytest.fixture
def integration_client():
"""
Create a test client for integration tests.
Uses the real database and MinIO from docker-compose.local.yml.
"""
from httpx import Client
# Connect to the running orchard-server container
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
with Client(base_url=base_url, timeout=30.0) as client:
yield client
@pytest.fixture
def unique_test_id():
"""Generate a unique ID for test isolation."""
import uuid
return f"test-{uuid.uuid4().hex[:8]}"
@pytest.fixture
def test_project(integration_client, unique_test_id):
"""
Create a test project and clean it up after the test.
Yields the project name.
"""
project_name = f"test-project-{unique_test_id}"
# Create project
response = integration_client.post(
"/api/v1/projects",
json={"name": project_name, "description": "Test project", "is_public": True},
)
assert response.status_code == 200, f"Failed to create project: {response.text}"
yield project_name
# Cleanup: delete project
try:
integration_client.delete(f"/api/v1/projects/{project_name}")
except Exception:
pass # Ignore cleanup errors
@pytest.fixture
def test_package(integration_client, test_project, unique_test_id):
"""
Create a test package within a test project.
Yields (project_name, package_name) tuple.
"""
package_name = f"test-package-{unique_test_id}"
# Create package
response = integration_client.post(
f"/api/v1/project/{test_project}/packages",
json={"name": package_name, "description": "Test package"},
)
assert response.status_code == 200, f"Failed to create package: {response.text}"
yield (test_project, package_name)
# Cleanup handled by test_project fixture (cascade delete)
@pytest.fixture
def test_content():
"""
Generate unique test content for each test.
Returns (content_bytes, expected_sha256) tuple.
"""
import uuid
content = f"test-content-{uuid.uuid4().hex}".encode()
sha256 = compute_sha256(content)
return (content, sha256)
def upload_test_file(
client,
project: str,
package: str,
content: bytes,
filename: str = "test.bin",
tag: str = None,
) -> dict:
"""
Helper function to upload a test file.
Returns the upload response as a dict.
"""
files = {"file": (filename, io.BytesIO(content), "application/octet-stream")}
data = {}
if tag:
data["tag"] = tag
response = client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
data=data if data else None,
)
assert response.status_code == 200, f"Upload failed: {response.text}"
return response.json()
# =============================================================================
# S3 Direct Access Helpers (for integration tests)
# =============================================================================
def get_s3_client():
"""
Create a boto3 S3 client for direct S3 access in integration tests.
Uses environment variables for configuration (same as the app).
Note: When running in container, S3 endpoint should be 'minio:9000' not 'localhost:9000'.
"""
import boto3
from botocore.config import Config
config = Config(s3={"addressing_style": "path"})
# Use the same endpoint as the app (minio:9000 in container, localhost:9000 locally)
endpoint = os.environ.get("ORCHARD_S3_ENDPOINT", "http://minio:9000")
return boto3.client(
"s3",
endpoint_url=endpoint,
region_name=os.environ.get("ORCHARD_S3_REGION", "us-east-1"),
aws_access_key_id=os.environ.get("ORCHARD_S3_ACCESS_KEY_ID", "minioadmin"),
aws_secret_access_key=os.environ.get(
"ORCHARD_S3_SECRET_ACCESS_KEY", "minioadmin"
),
config=config,
)
def get_s3_bucket():
"""Get the S3 bucket name from environment."""
return os.environ.get("ORCHARD_S3_BUCKET", "orchard-artifacts")
def list_s3_objects_by_hash(sha256_hash: str) -> list:
"""
List S3 objects that match a specific SHA256 hash.
Uses the fruits/{hash[:2]}/{hash[2:4]}/{hash} key pattern.
Returns list of matching object keys.
"""
client = get_s3_client()
bucket = get_s3_bucket()
prefix = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
response = client.list_objects_v2(Bucket=bucket, Prefix=prefix)
if "Contents" not in response:
return []
return [obj["Key"] for obj in response["Contents"]]
def count_s3_objects_by_prefix(prefix: str) -> int:
"""
Count S3 objects with a given prefix.
Useful for checking if duplicate uploads created multiple objects.
"""
client = get_s3_client()
bucket = get_s3_bucket()
response = client.list_objects_v2(Bucket=bucket, Prefix=prefix)
if "Contents" not in response:
return 0
return len(response["Contents"])
def s3_object_exists(sha256_hash: str) -> bool:
"""
Check if an S3 object exists for a given SHA256 hash.
"""
objects = list_s3_objects_by_hash(sha256_hash)
return len(objects) > 0
def delete_s3_object_by_hash(sha256_hash: str) -> bool:
"""
Delete an S3 object by its SHA256 hash (for test cleanup).
"""
client = get_s3_client()
bucket = get_s3_bucket()
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
try:
client.delete_object(Bucket=bucket, Key=s3_key)
return True
except Exception:
return False

View File

@@ -0,0 +1,207 @@
"""
Unit tests for duplicate detection and deduplication logic.
Tests cover:
- _exists() method correctly identifies existing S3 keys
- S3 key generation follows expected pattern
- Storage layer skips upload when artifact already exists
- Storage layer performs upload when artifact does not exist
"""
import pytest
import io
from unittest.mock import MagicMock, patch
from tests.conftest import (
compute_sha256,
TEST_CONTENT_HELLO,
TEST_HASH_HELLO,
)
class TestExistsMethod:
"""Tests for the _exists() method that checks S3 object existence."""
@pytest.mark.unit
def test_exists_returns_true_for_existing_key(self, mock_storage, mock_s3_client):
"""Test _exists() returns True when object exists."""
# Pre-populate the mock storage
test_key = "fruits/df/fd/test-hash"
mock_s3_client.objects[test_key] = b"content"
result = mock_storage._exists(test_key)
assert result is True
@pytest.mark.unit
def test_exists_returns_false_for_nonexistent_key(self, mock_storage):
"""Test _exists() returns False when object doesn't exist."""
result = mock_storage._exists("fruits/no/ne/nonexistent-key")
assert result is False
@pytest.mark.unit
def test_exists_handles_404_error(self, mock_storage):
"""Test _exists() handles 404 errors gracefully."""
# The mock client raises ClientError for nonexistent keys
result = mock_storage._exists("fruits/xx/yy/does-not-exist")
assert result is False
class TestS3KeyGeneration:
"""Tests for S3 key pattern generation."""
@pytest.mark.unit
def test_s3_key_pattern(self):
"""Test S3 key follows pattern: fruits/{hash[:2]}/{hash[2:4]}/{hash}"""
test_hash = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
expected_key = f"fruits/{test_hash[:2]}/{test_hash[2:4]}/{test_hash}"
# Expected: fruits/ab/cd/abcdef1234567890...
assert expected_key == f"fruits/ab/cd/{test_hash}"
@pytest.mark.unit
def test_s3_key_generation_in_storage(self, mock_storage):
"""Test storage layer generates correct S3 key."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
expected_key = (
f"fruits/{TEST_HASH_HELLO[:2]}/{TEST_HASH_HELLO[2:4]}/{TEST_HASH_HELLO}"
)
assert result.s3_key == expected_key
@pytest.mark.unit
def test_s3_key_uses_sha256_hash(self, mock_storage):
"""Test S3 key is derived from SHA256 hash."""
content = b"unique test content for key test"
file_obj = io.BytesIO(content)
expected_hash = compute_sha256(content)
result = mock_storage._store_simple(file_obj)
# Key should contain the hash
assert expected_hash in result.s3_key
class TestDeduplicationBehavior:
"""Tests for deduplication (skip upload when exists)."""
@pytest.mark.unit
def test_skips_upload_when_exists(self, mock_storage, mock_s3_client):
"""Test storage skips S3 upload when artifact already exists."""
content = TEST_CONTENT_HELLO
s3_key = (
f"fruits/{TEST_HASH_HELLO[:2]}/{TEST_HASH_HELLO[2:4]}/{TEST_HASH_HELLO}"
)
# Pre-populate storage (simulate existing artifact)
mock_s3_client.objects[s3_key] = content
# Track put_object calls
original_put = mock_s3_client.put_object
put_called = []
def tracked_put(*args, **kwargs):
put_called.append(True)
return original_put(*args, **kwargs)
mock_s3_client.put_object = tracked_put
# Store the same content
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
# put_object should NOT have been called (deduplication)
assert len(put_called) == 0
assert result.sha256 == TEST_HASH_HELLO
@pytest.mark.unit
def test_uploads_when_not_exists(self, mock_storage, mock_s3_client):
"""Test storage uploads to S3 when artifact doesn't exist."""
content = b"brand new unique content"
content_hash = compute_sha256(content)
s3_key = f"fruits/{content_hash[:2]}/{content_hash[2:4]}/{content_hash}"
# Ensure object doesn't exist
assert s3_key not in mock_s3_client.objects
# Store the content
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
# Object should now exist in mock storage
assert s3_key in mock_s3_client.objects
assert mock_s3_client.objects[s3_key] == content
@pytest.mark.unit
def test_returns_same_hash_for_duplicate(self, mock_storage, mock_s3_client):
"""Test storing same content twice returns same hash."""
content = b"content to be stored twice"
# First store
file1 = io.BytesIO(content)
result1 = mock_storage._store_simple(file1)
# Second store (duplicate)
file2 = io.BytesIO(content)
result2 = mock_storage._store_simple(file2)
assert result1.sha256 == result2.sha256
assert result1.s3_key == result2.s3_key
@pytest.mark.unit
def test_different_content_different_keys(self, mock_storage):
"""Test different content produces different S3 keys."""
content1 = b"first content"
content2 = b"second content"
file1 = io.BytesIO(content1)
result1 = mock_storage._store_simple(file1)
file2 = io.BytesIO(content2)
result2 = mock_storage._store_simple(file2)
assert result1.sha256 != result2.sha256
assert result1.s3_key != result2.s3_key
class TestDeduplicationEdgeCases:
"""Edge case tests for deduplication."""
@pytest.mark.unit
def test_same_content_different_filenames(self, mock_storage):
"""Test same content with different metadata is deduplicated."""
content = b"identical content"
# Store with "filename1"
file1 = io.BytesIO(content)
result1 = mock_storage._store_simple(file1)
# Store with "filename2" (same content)
file2 = io.BytesIO(content)
result2 = mock_storage._store_simple(file2)
# Both should have same hash (content-addressable)
assert result1.sha256 == result2.sha256
@pytest.mark.unit
def test_whitespace_only_difference(self, mock_storage):
"""Test content differing only by whitespace produces different hashes."""
content1 = b"test content"
content2 = b"test content" # Extra space
content3 = b"test content " # Trailing space
file1 = io.BytesIO(content1)
file2 = io.BytesIO(content2)
file3 = io.BytesIO(content3)
result1 = mock_storage._store_simple(file1)
result2 = mock_storage._store_simple(file2)
result3 = mock_storage._store_simple(file3)
# All should be different (content-addressable)
assert len({result1.sha256, result2.sha256, result3.sha256}) == 3

View File

@@ -0,0 +1,168 @@
"""
Integration tests for garbage collection functionality.
Tests cover:
- Listing orphaned artifacts (ref_count=0)
- Garbage collection in dry-run mode
- Garbage collection actual deletion
- Verifying artifacts with refs are not deleted
"""
import pytest
from tests.conftest import (
compute_sha256,
upload_test_file,
)
class TestOrphanedArtifactsEndpoint:
"""Tests for GET /api/v1/admin/orphaned-artifacts endpoint."""
@pytest.mark.integration
def test_list_orphaned_artifacts_returns_list(self, integration_client):
"""Test orphaned artifacts endpoint returns a list."""
response = integration_client.get("/api/v1/admin/orphaned-artifacts")
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.integration
def test_orphaned_artifact_has_required_fields(self, integration_client):
"""Test orphaned artifact response has required fields."""
response = integration_client.get("/api/v1/admin/orphaned-artifacts?limit=1")
assert response.status_code == 200
data = response.json()
if len(data) > 0:
artifact = data[0]
assert "id" in artifact
assert "size" in artifact
assert "created_at" in artifact
assert "created_by" in artifact
assert "original_name" in artifact
@pytest.mark.integration
def test_orphaned_artifacts_respects_limit(self, integration_client):
"""Test orphaned artifacts endpoint respects limit parameter."""
response = integration_client.get("/api/v1/admin/orphaned-artifacts?limit=5")
assert response.status_code == 200
assert len(response.json()) <= 5
@pytest.mark.integration
def test_artifact_becomes_orphaned_when_tag_deleted(
self, integration_client, test_package, unique_test_id
):
"""Test artifact appears in orphaned list after tag is deleted."""
project, package = test_package
content = f"orphan test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload with tag
upload_test_file(integration_client, project, package, content, tag="temp-tag")
# Verify not in orphaned list (has ref_count=1)
response = integration_client.get("/api/v1/admin/orphaned-artifacts?limit=1000")
orphaned_ids = [a["id"] for a in response.json()]
assert expected_hash not in orphaned_ids
# Delete the tag
integration_client.delete(f"/api/v1/project/{project}/{package}/tags/temp-tag")
# Verify now in orphaned list (ref_count=0)
response = integration_client.get("/api/v1/admin/orphaned-artifacts?limit=1000")
orphaned_ids = [a["id"] for a in response.json()]
assert expected_hash in orphaned_ids
class TestGarbageCollectionEndpoint:
"""Tests for POST /api/v1/admin/garbage-collect endpoint."""
@pytest.mark.integration
def test_garbage_collect_dry_run_returns_response(self, integration_client):
"""Test garbage collection dry run returns valid response."""
response = integration_client.post("/api/v1/admin/garbage-collect?dry_run=true")
assert response.status_code == 200
data = response.json()
assert "artifacts_deleted" in data
assert "bytes_freed" in data
assert "artifact_ids" in data
assert "dry_run" in data
assert data["dry_run"] is True
@pytest.mark.integration
def test_garbage_collect_dry_run_doesnt_delete(
self, integration_client, test_package, unique_test_id
):
"""Test garbage collection dry run doesn't actually delete artifacts."""
project, package = test_package
content = f"dry run test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload and delete tag to create orphan
upload_test_file(integration_client, project, package, content, tag="dry-run")
integration_client.delete(f"/api/v1/project/{project}/{package}/tags/dry-run")
# Verify artifact exists
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
# Run garbage collection in dry-run mode
gc_response = integration_client.post(
"/api/v1/admin/garbage-collect?dry_run=true&limit=1000"
)
assert gc_response.status_code == 200
assert expected_hash in gc_response.json()["artifact_ids"]
# Verify artifact STILL exists (dry run didn't delete)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
@pytest.mark.integration
def test_garbage_collect_preserves_referenced_artifacts(
self, integration_client, test_package, unique_test_id
):
"""Test garbage collection doesn't delete artifacts with ref_count > 0."""
project, package = test_package
content = f"preserve test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload with tag (ref_count=1)
upload_test_file(integration_client, project, package, content, tag="keep-this")
# Verify artifact exists with ref_count=1
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
assert response.json()["ref_count"] == 1
# Run garbage collection (dry_run to not affect other tests)
gc_response = integration_client.post(
"/api/v1/admin/garbage-collect?dry_run=true&limit=1000"
)
assert gc_response.status_code == 200
# Verify artifact was NOT in delete list (has ref_count > 0)
assert expected_hash not in gc_response.json()["artifact_ids"]
# Verify artifact still exists
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
assert response.json()["ref_count"] == 1
@pytest.mark.integration
def test_garbage_collect_respects_limit(self, integration_client):
"""Test garbage collection respects limit parameter."""
response = integration_client.post(
"/api/v1/admin/garbage-collect?dry_run=true&limit=5"
)
assert response.status_code == 200
assert response.json()["artifacts_deleted"] <= 5
@pytest.mark.integration
def test_garbage_collect_returns_bytes_freed(self, integration_client):
"""Test garbage collection returns accurate bytes_freed."""
response = integration_client.post("/api/v1/admin/garbage-collect?dry_run=true")
assert response.status_code == 200
data = response.json()
assert data["bytes_freed"] >= 0
assert isinstance(data["bytes_freed"], int)

View File

@@ -0,0 +1,215 @@
"""
Unit tests for SHA256 hash calculation and deduplication logic.
Tests cover:
- Hash computation produces consistent results
- Hash is always 64 character lowercase hexadecimal
- Different content produces different hashes
- Binary content handling
- Large file handling (streaming)
"""
import pytest
import hashlib
import io
from tests.conftest import (
create_test_file,
compute_sha256,
TEST_CONTENT_HELLO,
TEST_HASH_HELLO,
TEST_CONTENT_BINARY,
TEST_HASH_BINARY,
)
class TestHashComputation:
"""Unit tests for hash calculation functionality."""
@pytest.mark.unit
def test_sha256_consistent_results(self):
"""Test SHA256 hash produces consistent results for identical content."""
content = b"test content for hashing"
# Compute hash multiple times
hash1 = compute_sha256(content)
hash2 = compute_sha256(content)
hash3 = compute_sha256(content)
assert hash1 == hash2 == hash3
@pytest.mark.unit
def test_sha256_different_content_different_hash(self):
"""Test SHA256 produces different hashes for different content."""
content1 = b"content version 1"
content2 = b"content version 2"
hash1 = compute_sha256(content1)
hash2 = compute_sha256(content2)
assert hash1 != hash2
@pytest.mark.unit
def test_sha256_format_64_char_hex(self):
"""Test SHA256 hash is always 64 character lowercase hexadecimal."""
test_cases = [
b"", # Empty
b"a", # Single char
b"Hello, World!", # Normal string
bytes(range(256)), # All byte values
b"x" * 10000, # Larger content
]
for content in test_cases:
hash_value = compute_sha256(content)
# Check length
assert len(hash_value) == 64, (
f"Hash length should be 64, got {len(hash_value)}"
)
# Check lowercase
assert hash_value == hash_value.lower(), "Hash should be lowercase"
# Check hexadecimal
assert all(c in "0123456789abcdef" for c in hash_value), (
"Hash should be hex"
)
@pytest.mark.unit
def test_sha256_known_value(self):
"""Test SHA256 produces expected hash for known input."""
assert compute_sha256(TEST_CONTENT_HELLO) == TEST_HASH_HELLO
@pytest.mark.unit
def test_sha256_binary_content(self):
"""Test SHA256 handles binary content correctly."""
assert compute_sha256(TEST_CONTENT_BINARY) == TEST_HASH_BINARY
# Test with null bytes
content_with_nulls = b"\x00\x00test\x00\x00"
hash_value = compute_sha256(content_with_nulls)
assert len(hash_value) == 64
@pytest.mark.unit
def test_sha256_streaming_computation(self):
"""Test SHA256 can be computed in chunks (streaming)."""
# Large content
chunk_size = 8192
total_size = chunk_size * 10 # 80KB
content = b"x" * total_size
# Direct computation
direct_hash = compute_sha256(content)
# Streaming computation
hasher = hashlib.sha256()
for i in range(0, total_size, chunk_size):
hasher.update(content[i : i + chunk_size])
streaming_hash = hasher.hexdigest()
assert direct_hash == streaming_hash
@pytest.mark.unit
def test_sha256_order_matters(self):
"""Test that content order affects hash (not just content set)."""
content1 = b"AB"
content2 = b"BA"
assert compute_sha256(content1) != compute_sha256(content2)
class TestStorageHashComputation:
"""Tests for hash computation in the storage layer."""
@pytest.mark.unit
def test_storage_computes_sha256(self, mock_storage):
"""Test storage layer correctly computes SHA256 hash."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
assert result.sha256 == TEST_HASH_HELLO
@pytest.mark.unit
def test_storage_computes_md5(self, mock_storage):
"""Test storage layer also computes MD5 hash."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
expected_md5 = hashlib.md5(content).hexdigest()
assert result.md5 == expected_md5
@pytest.mark.unit
def test_storage_computes_sha1(self, mock_storage):
"""Test storage layer also computes SHA1 hash."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
expected_sha1 = hashlib.sha1(content).hexdigest()
assert result.sha1 == expected_sha1
@pytest.mark.unit
def test_storage_returns_correct_size(self, mock_storage):
"""Test storage layer returns correct file size."""
content = b"test content with known size"
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
assert result.size == len(content)
@pytest.mark.unit
def test_storage_generates_correct_s3_key(self, mock_storage):
"""Test storage layer generates correct S3 key pattern."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
# Key should be: fruits/{hash[:2]}/{hash[2:4]}/{hash}
expected_key = (
f"fruits/{TEST_HASH_HELLO[:2]}/{TEST_HASH_HELLO[2:4]}/{TEST_HASH_HELLO}"
)
assert result.s3_key == expected_key
class TestHashEdgeCases:
"""Edge case tests for hash computation."""
@pytest.mark.unit
def test_hash_empty_content_rejected(self, mock_storage):
"""Test that empty content is rejected."""
from app.storage import HashComputationError
file_obj = io.BytesIO(b"")
with pytest.raises(HashComputationError):
mock_storage._store_simple(file_obj)
@pytest.mark.unit
def test_hash_large_file_streaming(self, mock_storage):
"""Test hash computation for large files uses streaming."""
# Create a 10MB file
size = 10 * 1024 * 1024
content = b"x" * size
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
expected_hash = compute_sha256(content)
assert result.sha256 == expected_hash
@pytest.mark.unit
def test_hash_special_bytes(self):
"""Test hash handles all byte values correctly."""
# All possible byte values
content = bytes(range(256))
hash_value = compute_sha256(content)
assert len(hash_value) == 64
assert hash_value == TEST_HASH_BINARY

View File

@@ -0,0 +1,604 @@
"""
Integration tests for duplicate uploads and storage verification.
These tests require the full stack to be running (docker-compose.local.yml).
Tests cover:
- Duplicate upload scenarios across packages and projects
- Storage verification (single S3 object, single artifact row)
- Upload table tracking
- Content integrity verification
- Concurrent upload handling
- Failure cleanup
"""
import pytest
import io
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tests.conftest import (
compute_sha256,
upload_test_file,
list_s3_objects_by_hash,
s3_object_exists,
delete_s3_object_by_hash,
)
class TestDuplicateUploadScenarios:
"""Integration tests for duplicate upload behavior."""
@pytest.mark.integration
def test_same_file_twice_returns_same_artifact_id(
self, integration_client, test_package
):
"""Test uploading same file twice returns same artifact_id."""
project, package = test_package
content = b"content uploaded twice for same artifact test"
expected_hash = compute_sha256(content)
# First upload
result1 = upload_test_file(
integration_client, project, package, content, tag="first"
)
assert result1["artifact_id"] == expected_hash
# Second upload
result2 = upload_test_file(
integration_client, project, package, content, tag="second"
)
assert result2["artifact_id"] == expected_hash
assert result1["artifact_id"] == result2["artifact_id"]
@pytest.mark.integration
def test_same_file_twice_increments_ref_count(
self, integration_client, test_package
):
"""Test uploading same file twice increments ref_count to 2."""
project, package = test_package
content = b"content for ref count increment test"
# First upload
result1 = upload_test_file(
integration_client, project, package, content, tag="v1"
)
assert result1["ref_count"] == 1
# Second upload
result2 = upload_test_file(
integration_client, project, package, content, tag="v2"
)
assert result2["ref_count"] == 2
@pytest.mark.integration
def test_same_file_different_packages_shares_artifact(
self, integration_client, test_project, unique_test_id
):
"""Test uploading same file to different packages shares artifact."""
project = test_project
content = f"content shared across packages {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Create two packages
pkg1 = f"package-a-{unique_test_id}"
pkg2 = f"package-b-{unique_test_id}"
integration_client.post(
f"/api/v1/project/{project}/packages",
json={"name": pkg1, "description": "Package A"},
)
integration_client.post(
f"/api/v1/project/{project}/packages",
json={"name": pkg2, "description": "Package B"},
)
# Upload to first package
result1 = upload_test_file(integration_client, project, pkg1, content, tag="v1")
assert result1["artifact_id"] == expected_hash
assert result1["deduplicated"] is False
# Upload to second package
result2 = upload_test_file(integration_client, project, pkg2, content, tag="v1")
assert result2["artifact_id"] == expected_hash
assert result2["deduplicated"] is True
@pytest.mark.integration
def test_same_file_different_projects_shares_artifact(
self, integration_client, unique_test_id
):
"""Test uploading same file to different projects shares artifact."""
content = f"content shared across projects {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Create two projects with packages
proj1 = f"project-x-{unique_test_id}"
proj2 = f"project-y-{unique_test_id}"
pkg_name = "shared-pkg"
try:
# Create projects and packages
integration_client.post(
"/api/v1/projects",
json={"name": proj1, "description": "Project X", "is_public": True},
)
integration_client.post(
"/api/v1/projects",
json={"name": proj2, "description": "Project Y", "is_public": True},
)
integration_client.post(
f"/api/v1/project/{proj1}/packages",
json={"name": pkg_name, "description": "Package"},
)
integration_client.post(
f"/api/v1/project/{proj2}/packages",
json={"name": pkg_name, "description": "Package"},
)
# Upload to first project
result1 = upload_test_file(
integration_client, proj1, pkg_name, content, tag="v1"
)
assert result1["artifact_id"] == expected_hash
assert result1["deduplicated"] is False
# Upload to second project
result2 = upload_test_file(
integration_client, proj2, pkg_name, content, tag="v1"
)
assert result2["artifact_id"] == expected_hash
assert result2["deduplicated"] is True
finally:
# Cleanup
integration_client.delete(f"/api/v1/projects/{proj1}")
integration_client.delete(f"/api/v1/projects/{proj2}")
@pytest.mark.integration
def test_same_file_different_filenames_shares_artifact(
self, integration_client, test_package
):
"""Test uploading same file with different original filenames shares artifact."""
project, package = test_package
content = b"content with different filenames"
expected_hash = compute_sha256(content)
# Upload with filename1
result1 = upload_test_file(
integration_client,
project,
package,
content,
filename="file1.bin",
tag="v1",
)
assert result1["artifact_id"] == expected_hash
# Upload with filename2
result2 = upload_test_file(
integration_client,
project,
package,
content,
filename="file2.bin",
tag="v2",
)
assert result2["artifact_id"] == expected_hash
assert result2["deduplicated"] is True
@pytest.mark.integration
def test_same_file_different_tags_shares_artifact(
self, integration_client, test_package, unique_test_id
):
"""Test uploading same file with different tags shares artifact."""
project, package = test_package
content = f"content with different tags {unique_test_id}".encode()
expected_hash = compute_sha256(content)
tags = ["latest", "stable", "v1.0.0", "release"]
for i, tag in enumerate(tags):
result = upload_test_file(
integration_client, project, package, content, tag=tag
)
assert result["artifact_id"] == expected_hash
if i == 0:
assert result["deduplicated"] is False
else:
assert result["deduplicated"] is True
class TestStorageVerification:
"""Tests to verify storage behavior after duplicate uploads."""
@pytest.mark.integration
def test_artifact_table_single_row_after_duplicates(
self, integration_client, test_package
):
"""Test artifact table contains only one row after duplicate uploads."""
project, package = test_package
content = b"content for single row test"
expected_hash = compute_sha256(content)
# Upload same content multiple times with different tags
for tag in ["v1", "v2", "v3"]:
upload_test_file(integration_client, project, package, content, tag=tag)
# Query artifact - should exist and be unique
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
artifact = response.json()
assert artifact["id"] == expected_hash
assert artifact["ref_count"] == 3
@pytest.mark.integration
def test_upload_table_multiple_rows_for_duplicates(
self, integration_client, test_package
):
"""Test upload table contains multiple rows for duplicate uploads (event tracking)."""
project, package = test_package
content = b"content for upload tracking test"
# Upload same content 3 times
for tag in ["upload1", "upload2", "upload3"]:
upload_test_file(integration_client, project, package, content, tag=tag)
# Check package stats - should show 3 uploads but fewer unique artifacts
response = integration_client.get(
f"/api/v1/project/{project}/packages/{package}"
)
assert response.status_code == 200
pkg_info = response.json()
assert pkg_info["tag_count"] == 3
@pytest.mark.integration
def test_artifact_content_matches_original(self, integration_client, test_package):
"""Test artifact content retrieved matches original content exactly."""
project, package = test_package
original_content = b"exact content verification test data 12345"
# Upload
result = upload_test_file(
integration_client, project, package, original_content, tag="verify"
)
# Download and compare
download_response = integration_client.get(
f"/api/v1/project/{project}/{package}/+/verify", params={"mode": "proxy"}
)
assert download_response.status_code == 200
downloaded_content = download_response.content
assert downloaded_content == original_content
@pytest.mark.integration
def test_storage_stats_reflect_deduplication(
self, integration_client, test_package
):
"""Test total storage size matches single artifact size after duplicates."""
project, package = test_package
content = b"content for storage stats test - should only count once"
content_size = len(content)
# Upload same content 5 times
for tag in ["a", "b", "c", "d", "e"]:
upload_test_file(integration_client, project, package, content, tag=tag)
# Check global stats
response = integration_client.get("/api/v1/stats")
assert response.status_code == 200
stats = response.json()
# Deduplication should show savings
assert stats["deduplicated_uploads"] > 0
assert stats["storage_saved_bytes"] > 0
class TestConcurrentUploads:
"""Tests for concurrent upload handling."""
@pytest.mark.integration
def test_concurrent_uploads_same_file(self, integration_client, test_package):
"""Test concurrent uploads of same file handle deduplication correctly."""
project, package = test_package
content = b"content for concurrent upload test"
expected_hash = compute_sha256(content)
num_concurrent = 5
results = []
errors = []
def upload_worker(tag_suffix):
try:
# Create a new client for this thread
from httpx import Client
base_url = "http://localhost:8080"
with Client(base_url=base_url, timeout=30.0) as client:
files = {
"file": (
f"concurrent-{tag_suffix}.bin",
io.BytesIO(content),
"application/octet-stream",
)
}
response = client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
data={"tag": f"concurrent-{tag_suffix}"},
)
if response.status_code == 200:
results.append(response.json())
else:
errors.append(f"Status {response.status_code}: {response.text}")
except Exception as e:
errors.append(str(e))
# Run concurrent uploads
with ThreadPoolExecutor(max_workers=num_concurrent) as executor:
futures = [executor.submit(upload_worker, i) for i in range(num_concurrent)]
for future in as_completed(futures):
pass # Wait for all to complete
# Verify results
assert len(errors) == 0, f"Errors during concurrent uploads: {errors}"
assert len(results) == num_concurrent
# All should have same artifact_id
artifact_ids = set(r["artifact_id"] for r in results)
assert len(artifact_ids) == 1
assert expected_hash in artifact_ids
# Verify final ref_count
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
assert response.json()["ref_count"] == num_concurrent
class TestDeduplicationAcrossRestarts:
"""Tests for deduplication persistence."""
@pytest.mark.integration
def test_deduplication_persists(
self, integration_client, test_package, unique_test_id
):
"""
Test deduplication works with persisted data.
This test uploads content, then uploads the same content again.
Since the database persists, the second upload should detect
the existing artifact even without server restart.
"""
project, package = test_package
content = f"persisted content for dedup test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# First upload
result1 = upload_test_file(
integration_client, project, package, content, tag="persist1"
)
assert result1["artifact_id"] == expected_hash
assert result1["deduplicated"] is False
# Second upload (simulating after restart - data is persisted)
result2 = upload_test_file(
integration_client, project, package, content, tag="persist2"
)
assert result2["artifact_id"] == expected_hash
assert result2["deduplicated"] is True
# Verify artifact exists with correct ref_count
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
assert response.json()["ref_count"] == 2
class TestS3ObjectVerification:
"""Tests to verify S3 storage behavior directly."""
@pytest.mark.integration
def test_s3_bucket_single_object_after_duplicates(
self, integration_client, test_package, unique_test_id
):
"""Test S3 bucket contains only one object after duplicate uploads."""
project, package = test_package
content = f"content for s3 object count test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload same content multiple times with different tags
for tag in ["s3test1", "s3test2", "s3test3"]:
upload_test_file(integration_client, project, package, content, tag=tag)
# Verify only one S3 object exists for this hash
s3_objects = list_s3_objects_by_hash(expected_hash)
assert len(s3_objects) == 1, (
f"Expected 1 S3 object, found {len(s3_objects)}: {s3_objects}"
)
# Verify the object key follows expected pattern
expected_key = (
f"fruits/{expected_hash[:2]}/{expected_hash[2:4]}/{expected_hash}"
)
assert s3_objects[0] == expected_key
class TestUploadFailureCleanup:
"""Tests for cleanup when uploads fail."""
@pytest.mark.integration
def test_upload_failure_invalid_project_no_orphaned_s3(
self, integration_client, unique_test_id
):
"""Test upload to non-existent project doesn't leave orphaned S3 objects."""
content = f"content for orphan s3 test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Attempt upload to non-existent project
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
response = integration_client.post(
f"/api/v1/project/nonexistent-project-{unique_test_id}/nonexistent-pkg/upload",
files=files,
data={"tag": "test"},
)
# Upload should fail
assert response.status_code == 404
# Verify no S3 object was created
assert not s3_object_exists(expected_hash), (
"Orphaned S3 object found after failed upload"
)
@pytest.mark.integration
def test_upload_failure_invalid_package_no_orphaned_s3(
self, integration_client, test_project, unique_test_id
):
"""Test upload to non-existent package doesn't leave orphaned S3 objects."""
content = f"content for orphan s3 test pkg {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Attempt upload to non-existent package
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
response = integration_client.post(
f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload",
files=files,
data={"tag": "test"},
)
# Upload should fail
assert response.status_code == 404
# Verify no S3 object was created
assert not s3_object_exists(expected_hash), (
"Orphaned S3 object found after failed upload"
)
@pytest.mark.integration
def test_upload_failure_empty_file_no_orphaned_s3(
self, integration_client, test_package, unique_test_id
):
"""Test upload of empty file doesn't leave orphaned S3 objects or DB records."""
project, package = test_package
content = b"" # Empty content
# Attempt upload of empty file
files = {"file": ("empty.bin", io.BytesIO(content), "application/octet-stream")}
response = integration_client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
data={"tag": f"empty-{unique_test_id}"},
)
# Upload should fail (empty files are rejected)
assert response.status_code in (400, 422), (
f"Expected 400/422, got {response.status_code}"
)
@pytest.mark.integration
def test_upload_failure_no_orphaned_database_records(
self, integration_client, test_project, unique_test_id
):
"""Test failed upload doesn't leave orphaned database records."""
content = f"content for db orphan test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Attempt upload to non-existent package (should fail before DB insert)
files = {"file": ("test.bin", io.BytesIO(content), "application/octet-stream")}
response = integration_client.post(
f"/api/v1/project/{test_project}/nonexistent-package-{unique_test_id}/upload",
files=files,
data={"tag": "test"},
)
# Upload should fail
assert response.status_code == 404
# Verify no artifact record was created
artifact_response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert artifact_response.status_code == 404, (
"Orphaned artifact record found after failed upload"
)
@pytest.mark.integration
def test_duplicate_tag_upload_handles_gracefully(
self, integration_client, test_package, unique_test_id
):
"""Test uploading with duplicate tag is handled without orphaned data."""
project, package = test_package
content1 = f"content version 1 {unique_test_id}".encode()
content2 = f"content version 2 {unique_test_id}".encode()
tag = f"duplicate-tag-{unique_test_id}"
# First upload with tag
result1 = upload_test_file(
integration_client, project, package, content1, tag=tag
)
hash1 = result1["artifact_id"]
# Second upload with same tag (should update the tag to point to new artifact)
result2 = upload_test_file(
integration_client, project, package, content2, tag=tag
)
hash2 = result2["artifact_id"]
# Both artifacts should exist
assert integration_client.get(f"/api/v1/artifact/{hash1}").status_code == 200
assert integration_client.get(f"/api/v1/artifact/{hash2}").status_code == 200
# Tag should point to the second artifact
tag_response = integration_client.get(
f"/api/v1/project/{project}/{package}/tags/{tag}"
)
assert tag_response.status_code == 200
assert tag_response.json()["artifact_id"] == hash2
class TestFileSizeValidation:
"""Tests for file size limits and empty file rejection."""
@pytest.mark.integration
def test_empty_file_rejected(self, integration_client, test_package):
"""Test that empty files are rejected with appropriate error."""
project, package = test_package
# Try to upload empty content
files = {"file": ("empty.txt", io.BytesIO(b""), "application/octet-stream")}
response = integration_client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
)
# Should be rejected (422 from storage layer or validation)
assert response.status_code in [422, 400]
@pytest.mark.integration
def test_small_valid_file_accepted(self, integration_client, test_package):
"""Test that small (1 byte) files are accepted."""
project, package = test_package
content = b"X" # Single byte
result = upload_test_file(
integration_client, project, package, content, tag="tiny"
)
assert result["artifact_id"] is not None
assert result["size"] == 1
@pytest.mark.integration
def test_file_size_reported_correctly(
self, integration_client, test_package, unique_test_id
):
"""Test that file size is correctly reported in response."""
project, package = test_package
content = f"Test content for size check {unique_test_id}".encode()
expected_size = len(content)
result = upload_test_file(
integration_client, project, package, content, tag="size-test"
)
assert result["size"] == expected_size
# Also verify via artifact endpoint
artifact_response = integration_client.get(
f"/api/v1/artifact/{result['artifact_id']}"
)
assert artifact_response.json()["size"] == expected_size

View File

@@ -0,0 +1,458 @@
"""
Unit and integration tests for reference counting behavior.
Tests cover:
- ref_count is set correctly for new artifacts
- ref_count increments on duplicate uploads
- ref_count query correctly identifies existing artifacts
- Artifact lookup by SHA256 hash works correctly
"""
import pytest
import io
from tests.conftest import (
compute_sha256,
upload_test_file,
TEST_CONTENT_HELLO,
TEST_HASH_HELLO,
)
class TestRefCountQuery:
"""Tests for ref_count querying and artifact lookup."""
@pytest.mark.integration
def test_artifact_lookup_by_sha256(self, integration_client, test_package):
"""Test artifact lookup by SHA256 hash (primary key) works correctly."""
project, package = test_package
content = b"unique content for lookup test"
expected_hash = compute_sha256(content)
# Upload a file
upload_result = upload_test_file(
integration_client, project, package, content, tag="v1"
)
assert upload_result["artifact_id"] == expected_hash
# Look up artifact by ID (SHA256)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
artifact = response.json()
assert artifact["id"] == expected_hash
assert artifact["sha256"] == expected_hash
assert artifact["size"] == len(content)
@pytest.mark.integration
def test_ref_count_query_identifies_existing_artifact(
self, integration_client, test_package
):
"""Test ref_count query correctly identifies existing artifacts by hash."""
project, package = test_package
content = b"content for ref count query test"
expected_hash = compute_sha256(content)
# Upload a file with a tag
upload_result = upload_test_file(
integration_client, project, package, content, tag="v1"
)
# Query artifact and check ref_count
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
artifact = response.json()
assert artifact["ref_count"] >= 1 # At least 1 from the tag
@pytest.mark.integration
def test_ref_count_set_to_1_for_new_artifact_with_tag(
self, integration_client, test_package, unique_test_id
):
"""Test ref_count is set to 1 for new artifacts when created with a tag."""
project, package = test_package
content = f"brand new content for ref count test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload a new file with a tag
upload_result = upload_test_file(
integration_client, project, package, content, tag="initial"
)
assert upload_result["artifact_id"] == expected_hash
assert upload_result["ref_count"] == 1
assert upload_result["deduplicated"] is False
@pytest.mark.integration
def test_ref_count_increments_on_duplicate_upload_with_tag(
self, integration_client, test_package, unique_test_id
):
"""Test ref_count is incremented when duplicate content is uploaded with a new tag."""
project, package = test_package
content = f"content that will be uploaded twice {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# First upload with tag
result1 = upload_test_file(
integration_client, project, package, content, tag="v1"
)
assert result1["ref_count"] == 1
assert result1["deduplicated"] is False
# Second upload with different tag (same content)
result2 = upload_test_file(
integration_client, project, package, content, tag="v2"
)
assert result2["artifact_id"] == expected_hash
assert result2["ref_count"] == 2
assert result2["deduplicated"] is True
@pytest.mark.integration
def test_ref_count_after_multiple_tags(self, integration_client, test_package):
"""Test ref_count correctly reflects number of tags pointing to artifact."""
project, package = test_package
content = b"content for multiple tag test"
expected_hash = compute_sha256(content)
# Upload with multiple tags
tags = ["v1", "v2", "v3", "latest"]
for i, tag in enumerate(tags):
result = upload_test_file(
integration_client, project, package, content, tag=tag
)
assert result["artifact_id"] == expected_hash
assert result["ref_count"] == i + 1
# Verify final ref_count via artifact endpoint
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
assert response.json()["ref_count"] == len(tags)
class TestRefCountWithDeletion:
"""Tests for ref_count behavior when tags are deleted."""
@pytest.mark.integration
def test_ref_count_decrements_on_tag_delete(self, integration_client, test_package):
"""Test ref_count decrements when a tag is deleted."""
project, package = test_package
content = b"content for delete test"
expected_hash = compute_sha256(content)
# Upload with two tags
upload_test_file(integration_client, project, package, content, tag="v1")
upload_test_file(integration_client, project, package, content, tag="v2")
# Verify ref_count is 2
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 2
# Delete one tag
delete_response = integration_client.delete(
f"/api/v1/project/{project}/{package}/tags/v1"
)
assert delete_response.status_code == 204
# Verify ref_count is now 1
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 1
@pytest.mark.integration
def test_ref_count_zero_after_all_tags_deleted(
self, integration_client, test_package
):
"""Test ref_count goes to 0 when all tags are deleted."""
project, package = test_package
content = b"content that will be orphaned"
expected_hash = compute_sha256(content)
# Upload with one tag
upload_test_file(integration_client, project, package, content, tag="only-tag")
# Delete the tag
integration_client.delete(f"/api/v1/project/{project}/{package}/tags/only-tag")
# Verify ref_count is 0
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 0
class TestRefCountCascadeDelete:
"""Tests for ref_count behavior during cascade deletions."""
@pytest.mark.integration
def test_ref_count_decrements_on_package_delete(
self, integration_client, unique_test_id
):
"""Test ref_count decrements for all tags when package is deleted."""
# Create a project and package manually (not using fixtures to control cleanup)
project_name = f"cascade-pkg-{unique_test_id}"
package_name = f"test-pkg-{unique_test_id}"
# Create project
response = integration_client.post(
"/api/v1/projects",
json={
"name": project_name,
"description": "Test project",
"is_public": True,
},
)
assert response.status_code == 200
# Create package
response = integration_client.post(
f"/api/v1/project/{project_name}/packages",
json={"name": package_name, "description": "Test package"},
)
assert response.status_code == 200
# Upload content with multiple tags
content = f"cascade delete test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
upload_test_file(
integration_client, project_name, package_name, content, tag="v1"
)
upload_test_file(
integration_client, project_name, package_name, content, tag="v2"
)
upload_test_file(
integration_client, project_name, package_name, content, tag="v3"
)
# Verify ref_count is 3
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 3
# Delete the package (should cascade delete all tags and decrement ref_count)
delete_response = integration_client.delete(
f"/api/v1/project/{project_name}/packages/{package_name}"
)
assert delete_response.status_code == 204
# Verify ref_count is 0 (all tags were deleted)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 0
# Cleanup: delete the project
integration_client.delete(f"/api/v1/projects/{project_name}")
@pytest.mark.integration
def test_ref_count_decrements_on_project_delete(
self, integration_client, unique_test_id
):
"""Test ref_count decrements for all tags in all packages when project is deleted."""
# Create a project manually (not using fixtures to control cleanup)
project_name = f"cascade-proj-{unique_test_id}"
package1_name = f"pkg1-{unique_test_id}"
package2_name = f"pkg2-{unique_test_id}"
# Create project
response = integration_client.post(
"/api/v1/projects",
json={
"name": project_name,
"description": "Test project",
"is_public": True,
},
)
assert response.status_code == 200
# Create two packages
for pkg_name in [package1_name, package2_name]:
response = integration_client.post(
f"/api/v1/project/{project_name}/packages",
json={"name": pkg_name, "description": "Test package"},
)
assert response.status_code == 200
# Upload same content with tags in both packages
content = f"project cascade test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
upload_test_file(
integration_client, project_name, package1_name, content, tag="v1"
)
upload_test_file(
integration_client, project_name, package1_name, content, tag="v2"
)
upload_test_file(
integration_client, project_name, package2_name, content, tag="latest"
)
upload_test_file(
integration_client, project_name, package2_name, content, tag="stable"
)
# Verify ref_count is 4 (2 tags in each of 2 packages)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 4
# Delete the project (should cascade delete all packages, tags, and decrement ref_count)
delete_response = integration_client.delete(f"/api/v1/projects/{project_name}")
assert delete_response.status_code == 204
# Verify ref_count is 0
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 0
@pytest.mark.integration
def test_shared_artifact_ref_count_partial_decrement(
self, integration_client, unique_test_id
):
"""Test ref_count correctly decrements when artifact is shared across packages."""
# Create project with two packages
project_name = f"shared-artifact-{unique_test_id}"
package1_name = f"pkg1-{unique_test_id}"
package2_name = f"pkg2-{unique_test_id}"
# Create project
response = integration_client.post(
"/api/v1/projects",
json={
"name": project_name,
"description": "Test project",
"is_public": True,
},
)
assert response.status_code == 200
# Create two packages
for pkg_name in [package1_name, package2_name]:
response = integration_client.post(
f"/api/v1/project/{project_name}/packages",
json={"name": pkg_name, "description": "Test package"},
)
assert response.status_code == 200
# Upload same content to both packages
content = f"shared artifact {unique_test_id}".encode()
expected_hash = compute_sha256(content)
upload_test_file(
integration_client, project_name, package1_name, content, tag="v1"
)
upload_test_file(
integration_client, project_name, package2_name, content, tag="v1"
)
# Verify ref_count is 2
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 2
# Delete only package1 (package2 still references the artifact)
delete_response = integration_client.delete(
f"/api/v1/project/{project_name}/packages/{package1_name}"
)
assert delete_response.status_code == 204
# Verify ref_count is 1 (only package2's tag remains)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 1
# Cleanup
integration_client.delete(f"/api/v1/projects/{project_name}")
class TestRefCountTagUpdate:
"""Tests for ref_count behavior when tags are updated to point to different artifacts."""
@pytest.mark.integration
def test_ref_count_adjusts_on_tag_update(
self, integration_client, test_package, unique_test_id
):
"""Test ref_count adjusts when a tag is updated to point to a different artifact."""
project, package = test_package
# Upload two different artifacts
content1 = f"artifact one {unique_test_id}".encode()
content2 = f"artifact two {unique_test_id}".encode()
hash1 = compute_sha256(content1)
hash2 = compute_sha256(content2)
# Upload first artifact with tag "latest"
upload_test_file(integration_client, project, package, content1, tag="latest")
# Verify first artifact has ref_count 1
response = integration_client.get(f"/api/v1/artifact/{hash1}")
assert response.json()["ref_count"] == 1
# Upload second artifact with different tag
upload_test_file(integration_client, project, package, content2, tag="stable")
# Now update "latest" tag to point to second artifact
# This is done by uploading the same content with the same tag
upload_test_file(integration_client, project, package, content2, tag="latest")
# Verify first artifact ref_count decreased to 0 (tag moved away)
response = integration_client.get(f"/api/v1/artifact/{hash1}")
assert response.json()["ref_count"] == 0
# Verify second artifact ref_count increased to 2 (stable + latest)
response = integration_client.get(f"/api/v1/artifact/{hash2}")
assert response.json()["ref_count"] == 2
@pytest.mark.integration
def test_ref_count_unchanged_when_tag_same_artifact(
self, integration_client, test_package, unique_test_id
):
"""Test ref_count doesn't change when tag is 'updated' to same artifact."""
project, package = test_package
content = f"same artifact {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload with tag
upload_test_file(integration_client, project, package, content, tag="v1")
# Verify ref_count is 1
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 1
# Upload same content with same tag (no-op)
upload_test_file(integration_client, project, package, content, tag="v1")
# Verify ref_count is still 1 (no double-counting)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 1
@pytest.mark.integration
def test_tag_via_post_endpoint_increments_ref_count(
self, integration_client, test_package, unique_test_id
):
"""Test creating tag via POST /tags endpoint increments ref_count."""
project, package = test_package
content = f"tag endpoint test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload artifact without tag
result = upload_test_file(
integration_client, project, package, content, filename="test.bin", tag=None
)
artifact_id = result["artifact_id"]
# Verify ref_count is 0 (no tags yet)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 0
# Create tag via POST endpoint
tag_response = integration_client.post(
f"/api/v1/project/{project}/{package}/tags",
json={"name": "v1.0.0", "artifact_id": artifact_id},
)
assert tag_response.status_code == 200
# Verify ref_count is now 1
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 1
# Create another tag via POST endpoint
tag_response = integration_client.post(
f"/api/v1/project/{project}/{package}/tags",
json={"name": "latest", "artifact_id": artifact_id},
)
assert tag_response.status_code == 200
# Verify ref_count is now 2
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 2

View File

@@ -0,0 +1,488 @@
"""
Integration tests for statistics endpoints.
Tests cover:
- Global stats endpoint
- Deduplication stats endpoint
- Cross-project deduplication
- Timeline stats
- Export and report endpoints
- Package and artifact stats
"""
import pytest
from tests.conftest import compute_sha256, upload_test_file
class TestGlobalStats:
"""Tests for GET /api/v1/stats endpoint."""
@pytest.mark.integration
def test_stats_returns_valid_response(self, integration_client):
"""Test stats endpoint returns expected fields."""
response = integration_client.get("/api/v1/stats")
assert response.status_code == 200
data = response.json()
# Check all required fields exist
assert "total_artifacts" in data
assert "total_size_bytes" in data
assert "unique_artifacts" in data
assert "orphaned_artifacts" in data
assert "orphaned_size_bytes" in data
assert "total_uploads" in data
assert "deduplicated_uploads" in data
assert "deduplication_ratio" in data
assert "storage_saved_bytes" in data
@pytest.mark.integration
def test_stats_values_are_non_negative(self, integration_client):
"""Test all stat values are non-negative."""
response = integration_client.get("/api/v1/stats")
assert response.status_code == 200
data = response.json()
assert data["total_artifacts"] >= 0
assert data["total_size_bytes"] >= 0
assert data["unique_artifacts"] >= 0
assert data["orphaned_artifacts"] >= 0
assert data["total_uploads"] >= 0
assert data["deduplicated_uploads"] >= 0
assert data["deduplication_ratio"] >= 0
assert data["storage_saved_bytes"] >= 0
@pytest.mark.integration
def test_stats_update_after_upload(
self, integration_client, test_package, unique_test_id
):
"""Test stats update after uploading an artifact."""
project, package = test_package
# Get initial stats
initial_response = integration_client.get("/api/v1/stats")
initial_stats = initial_response.json()
# Upload a new file
content = f"stats test content {unique_test_id}".encode()
upload_test_file(
integration_client, project, package, content, tag=f"stats-{unique_test_id}"
)
# Get updated stats
updated_response = integration_client.get("/api/v1/stats")
updated_stats = updated_response.json()
# Verify stats increased
assert updated_stats["total_uploads"] >= initial_stats["total_uploads"]
class TestDeduplicationStats:
"""Tests for GET /api/v1/stats/deduplication endpoint."""
@pytest.mark.integration
def test_dedup_stats_returns_valid_response(self, integration_client):
"""Test deduplication stats returns expected fields."""
response = integration_client.get("/api/v1/stats/deduplication")
assert response.status_code == 200
data = response.json()
assert "total_logical_bytes" in data
assert "total_physical_bytes" in data
assert "bytes_saved" in data
assert "savings_percentage" in data
assert "total_uploads" in data
assert "unique_artifacts" in data
assert "duplicate_uploads" in data
assert "average_ref_count" in data
assert "max_ref_count" in data
assert "most_referenced_artifacts" in data
@pytest.mark.integration
def test_most_referenced_artifacts_format(self, integration_client):
"""Test most_referenced_artifacts has correct structure."""
response = integration_client.get("/api/v1/stats/deduplication")
assert response.status_code == 200
data = response.json()
artifacts = data["most_referenced_artifacts"]
assert isinstance(artifacts, list)
if len(artifacts) > 0:
artifact = artifacts[0]
assert "artifact_id" in artifact
assert "ref_count" in artifact
assert "size" in artifact
assert "storage_saved" in artifact
@pytest.mark.integration
def test_dedup_stats_with_top_n_param(self, integration_client):
"""Test deduplication stats respects top_n parameter."""
response = integration_client.get("/api/v1/stats/deduplication?top_n=3")
assert response.status_code == 200
data = response.json()
assert len(data["most_referenced_artifacts"]) <= 3
@pytest.mark.integration
def test_savings_percentage_valid_range(self, integration_client):
"""Test savings percentage is between 0 and 100."""
response = integration_client.get("/api/v1/stats/deduplication")
assert response.status_code == 200
data = response.json()
assert 0 <= data["savings_percentage"] <= 100
class TestCrossProjectStats:
"""Tests for GET /api/v1/stats/cross-project endpoint."""
@pytest.mark.integration
def test_cross_project_returns_valid_response(self, integration_client):
"""Test cross-project stats returns expected fields."""
response = integration_client.get("/api/v1/stats/cross-project")
assert response.status_code == 200
data = response.json()
assert "shared_artifacts_count" in data
assert "total_cross_project_savings" in data
assert "shared_artifacts" in data
assert isinstance(data["shared_artifacts"], list)
@pytest.mark.integration
def test_cross_project_respects_limit(self, integration_client):
"""Test cross-project stats respects limit parameter."""
response = integration_client.get("/api/v1/stats/cross-project?limit=5")
assert response.status_code == 200
data = response.json()
assert len(data["shared_artifacts"]) <= 5
@pytest.mark.integration
def test_cross_project_detects_shared_artifacts(
self, integration_client, unique_test_id
):
"""Test cross-project deduplication is detected."""
content = f"shared across projects {unique_test_id}".encode()
# Create two projects
proj1 = f"cross-proj-a-{unique_test_id}"
proj2 = f"cross-proj-b-{unique_test_id}"
try:
# Create projects and packages
integration_client.post(
"/api/v1/projects",
json={"name": proj1, "description": "Test", "is_public": True},
)
integration_client.post(
"/api/v1/projects",
json={"name": proj2, "description": "Test", "is_public": True},
)
integration_client.post(
f"/api/v1/project/{proj1}/packages",
json={"name": "pkg", "description": "Test"},
)
integration_client.post(
f"/api/v1/project/{proj2}/packages",
json={"name": "pkg", "description": "Test"},
)
# Upload same content to both projects
upload_test_file(integration_client, proj1, "pkg", content, tag="v1")
upload_test_file(integration_client, proj2, "pkg", content, tag="v1")
# Check cross-project stats
response = integration_client.get("/api/v1/stats/cross-project")
assert response.status_code == 200
data = response.json()
assert data["shared_artifacts_count"] >= 1
finally:
# Cleanup
integration_client.delete(f"/api/v1/projects/{proj1}")
integration_client.delete(f"/api/v1/projects/{proj2}")
class TestTimelineStats:
"""Tests for GET /api/v1/stats/timeline endpoint."""
@pytest.mark.integration
def test_timeline_returns_valid_response(self, integration_client):
"""Test timeline stats returns expected fields."""
response = integration_client.get("/api/v1/stats/timeline")
assert response.status_code == 200
data = response.json()
assert "period" in data
assert "start_date" in data
assert "end_date" in data
assert "data_points" in data
assert isinstance(data["data_points"], list)
@pytest.mark.integration
def test_timeline_daily_period(self, integration_client):
"""Test timeline with daily period."""
response = integration_client.get("/api/v1/stats/timeline?period=daily")
assert response.status_code == 200
data = response.json()
assert data["period"] == "daily"
@pytest.mark.integration
def test_timeline_weekly_period(self, integration_client):
"""Test timeline with weekly period."""
response = integration_client.get("/api/v1/stats/timeline?period=weekly")
assert response.status_code == 200
data = response.json()
assert data["period"] == "weekly"
@pytest.mark.integration
def test_timeline_monthly_period(self, integration_client):
"""Test timeline with monthly period."""
response = integration_client.get("/api/v1/stats/timeline?period=monthly")
assert response.status_code == 200
data = response.json()
assert data["period"] == "monthly"
@pytest.mark.integration
def test_timeline_invalid_period_rejected(self, integration_client):
"""Test timeline rejects invalid period."""
response = integration_client.get("/api/v1/stats/timeline?period=invalid")
assert response.status_code == 422
@pytest.mark.integration
def test_timeline_data_point_structure(self, integration_client):
"""Test timeline data points have correct structure."""
response = integration_client.get("/api/v1/stats/timeline")
assert response.status_code == 200
data = response.json()
if len(data["data_points"]) > 0:
point = data["data_points"][0]
assert "date" in point
assert "total_uploads" in point
assert "unique_artifacts" in point
assert "duplicated_uploads" in point
assert "bytes_saved" in point
class TestExportEndpoint:
"""Tests for GET /api/v1/stats/export endpoint."""
@pytest.mark.integration
def test_export_json_format(self, integration_client):
"""Test export with JSON format."""
response = integration_client.get("/api/v1/stats/export?format=json")
assert response.status_code == 200
data = response.json()
assert "total_artifacts" in data
assert "generated_at" in data
@pytest.mark.integration
def test_export_csv_format(self, integration_client):
"""Test export with CSV format."""
response = integration_client.get("/api/v1/stats/export?format=csv")
assert response.status_code == 200
assert "text/csv" in response.headers.get("content-type", "")
content = response.text
assert "Metric,Value" in content
assert "total_artifacts" in content
@pytest.mark.integration
def test_export_invalid_format_rejected(self, integration_client):
"""Test export rejects invalid format."""
response = integration_client.get("/api/v1/stats/export?format=xml")
assert response.status_code == 422
class TestReportEndpoint:
"""Tests for GET /api/v1/stats/report endpoint."""
@pytest.mark.integration
def test_report_markdown_format(self, integration_client):
"""Test report with markdown format."""
response = integration_client.get("/api/v1/stats/report?format=markdown")
assert response.status_code == 200
data = response.json()
assert data["format"] == "markdown"
assert "generated_at" in data
assert "content" in data
assert "# Orchard Storage Report" in data["content"]
@pytest.mark.integration
def test_report_json_format(self, integration_client):
"""Test report with JSON format."""
response = integration_client.get("/api/v1/stats/report?format=json")
assert response.status_code == 200
data = response.json()
assert data["format"] == "json"
assert "content" in data
@pytest.mark.integration
def test_report_contains_sections(self, integration_client):
"""Test markdown report contains expected sections."""
response = integration_client.get("/api/v1/stats/report?format=markdown")
assert response.status_code == 200
content = response.json()["content"]
assert "## Overview" in content
assert "## Storage" in content
assert "## Uploads" in content
class TestProjectStats:
"""Tests for GET /api/v1/projects/:project/stats endpoint."""
@pytest.mark.integration
def test_project_stats_returns_valid_response(
self, integration_client, test_project
):
"""Test project stats returns expected fields."""
response = integration_client.get(f"/api/v1/projects/{test_project}/stats")
assert response.status_code == 200
data = response.json()
assert "project_id" in data
assert "project_name" in data
assert "package_count" in data
assert "tag_count" in data
assert "artifact_count" in data
assert "total_size_bytes" in data
assert "upload_count" in data
assert "deduplicated_uploads" in data
assert "storage_saved_bytes" in data
assert "deduplication_ratio" in data
@pytest.mark.integration
def test_project_stats_not_found(self, integration_client):
"""Test project stats returns 404 for non-existent project."""
response = integration_client.get("/api/v1/projects/nonexistent-project/stats")
assert response.status_code == 404
class TestPackageStats:
"""Tests for GET /api/v1/project/:project/packages/:package/stats endpoint."""
@pytest.mark.integration
def test_package_stats_returns_valid_response(
self, integration_client, test_package
):
"""Test package stats returns expected fields."""
project, package = test_package
response = integration_client.get(
f"/api/v1/project/{project}/packages/{package}/stats"
)
assert response.status_code == 200
data = response.json()
assert "package_id" in data
assert "package_name" in data
assert "project_name" in data
assert "tag_count" in data
assert "artifact_count" in data
assert "total_size_bytes" in data
assert "upload_count" in data
assert "deduplicated_uploads" in data
assert "storage_saved_bytes" in data
assert "deduplication_ratio" in data
@pytest.mark.integration
def test_package_stats_not_found(self, integration_client, test_project):
"""Test package stats returns 404 for non-existent package."""
response = integration_client.get(
f"/api/v1/project/{test_project}/packages/nonexistent-package/stats"
)
assert response.status_code == 404
class TestArtifactStats:
"""Tests for GET /api/v1/artifact/:id/stats endpoint."""
@pytest.mark.integration
def test_artifact_stats_returns_valid_response(
self, integration_client, test_package, unique_test_id
):
"""Test artifact stats returns expected fields."""
project, package = test_package
content = f"artifact stats test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload artifact
upload_test_file(
integration_client, project, package, content, tag=f"art-{unique_test_id}"
)
# Get artifact stats
response = integration_client.get(f"/api/v1/artifact/{expected_hash}/stats")
assert response.status_code == 200
data = response.json()
assert "artifact_id" in data
assert "sha256" in data
assert "size" in data
assert "ref_count" in data
assert "storage_savings" in data
assert "tags" in data
assert "projects" in data
assert "packages" in data
@pytest.mark.integration
def test_artifact_stats_not_found(self, integration_client):
"""Test artifact stats returns 404 for non-existent artifact."""
fake_hash = "0" * 64
response = integration_client.get(f"/api/v1/artifact/{fake_hash}/stats")
assert response.status_code == 404
@pytest.mark.integration
def test_artifact_stats_shows_correct_projects(
self, integration_client, unique_test_id
):
"""Test artifact stats shows all projects using the artifact."""
content = f"multi-project artifact {unique_test_id}".encode()
expected_hash = compute_sha256(content)
proj1 = f"art-stats-a-{unique_test_id}"
proj2 = f"art-stats-b-{unique_test_id}"
try:
# Create projects and packages
integration_client.post(
"/api/v1/projects",
json={"name": proj1, "description": "Test", "is_public": True},
)
integration_client.post(
"/api/v1/projects",
json={"name": proj2, "description": "Test", "is_public": True},
)
integration_client.post(
f"/api/v1/project/{proj1}/packages",
json={"name": "pkg", "description": "Test"},
)
integration_client.post(
f"/api/v1/project/{proj2}/packages",
json={"name": "pkg", "description": "Test"},
)
# Upload same content to both projects
upload_test_file(integration_client, proj1, "pkg", content, tag="v1")
upload_test_file(integration_client, proj2, "pkg", content, tag="v1")
# Check artifact stats
response = integration_client.get(f"/api/v1/artifact/{expected_hash}/stats")
assert response.status_code == 200
data = response.json()
assert len(data["projects"]) == 2
assert proj1 in data["projects"]
assert proj2 in data["projects"]
finally:
integration_client.delete(f"/api/v1/projects/{proj1}")
integration_client.delete(f"/api/v1/projects/{proj2}")