Add integration tests for deduplication and ref_count
- Add test_integration_uploads.py with 12 tests for duplicate upload scenarios - Add test_ref_count.py with 7 tests for ref_count management - Fix ArtifactDetailResponse to include sha256 and checksum fields - Fix health check SQL warning by wrapping in text() - Update tests to use unique content per test run for idempotency
This commit is contained in:
@@ -13,7 +13,7 @@ from fastapi import (
|
||||
)
|
||||
from fastapi.responses import StreamingResponse, RedirectResponse
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import or_, func
|
||||
from sqlalchemy import or_, func, text
|
||||
from typing import List, Optional, Literal
|
||||
import math
|
||||
import re
|
||||
@@ -263,7 +263,7 @@ def health_check(
|
||||
|
||||
# Check database connectivity
|
||||
try:
|
||||
db.execute("SELECT 1")
|
||||
db.execute(text("SELECT 1"))
|
||||
database_healthy = True
|
||||
except Exception as e:
|
||||
logger.warning(f"Database health check failed: {e}")
|
||||
@@ -2131,9 +2131,13 @@ def get_artifact(artifact_id: str, db: Session = Depends(get_db)):
|
||||
|
||||
return ArtifactDetailResponse(
|
||||
id=artifact.id,
|
||||
sha256=artifact.id, # SHA256 hash is the artifact ID
|
||||
size=artifact.size,
|
||||
content_type=artifact.content_type,
|
||||
original_name=artifact.original_name,
|
||||
checksum_md5=artifact.checksum_md5,
|
||||
checksum_sha1=artifact.checksum_sha1,
|
||||
s3_etag=artifact.s3_etag,
|
||||
created_at=artifact.created_at,
|
||||
created_by=artifact.created_by,
|
||||
ref_count=artifact.ref_count,
|
||||
|
||||
@@ -4,7 +4,7 @@ python_files = test_*.py
|
||||
python_functions = test_*
|
||||
python_classes = Test*
|
||||
asyncio_mode = auto
|
||||
addopts = -v --tb=short
|
||||
addopts = -v --tb=short --cov=app --cov-report=term-missing --cov-report=html:coverage_html --cov-fail-under=0
|
||||
filterwarnings =
|
||||
ignore::DeprecationWarning
|
||||
ignore::UserWarning
|
||||
@@ -12,3 +12,18 @@ markers =
|
||||
unit: Unit tests (no external dependencies)
|
||||
integration: Integration tests (require database/storage)
|
||||
slow: Slow tests (skip with -m "not slow")
|
||||
|
||||
# Coverage configuration
|
||||
[coverage:run]
|
||||
source = app
|
||||
omit =
|
||||
*/tests/*
|
||||
*/__pycache__/*
|
||||
|
||||
[coverage:report]
|
||||
exclude_lines =
|
||||
pragma: no cover
|
||||
def __repr__
|
||||
raise NotImplementedError
|
||||
if __name__ == .__main__.:
|
||||
pass
|
||||
|
||||
@@ -199,3 +199,119 @@ def test_app():
|
||||
from app.main import app
|
||||
|
||||
return TestClient(app)
|
||||
|
||||
|
||||
# =============================================================================
|
||||
# Integration Test Fixtures
|
||||
# =============================================================================
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def integration_client():
|
||||
"""
|
||||
Create a test client for integration tests.
|
||||
|
||||
Uses the real database and MinIO from docker-compose.local.yml.
|
||||
"""
|
||||
from httpx import Client
|
||||
|
||||
# Connect to the running orchard-server container
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=30.0) as client:
|
||||
yield client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def unique_test_id():
|
||||
"""Generate a unique ID for test isolation."""
|
||||
import uuid
|
||||
|
||||
return f"test-{uuid.uuid4().hex[:8]}"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_project(integration_client, unique_test_id):
|
||||
"""
|
||||
Create a test project and clean it up after the test.
|
||||
|
||||
Yields the project name.
|
||||
"""
|
||||
project_name = f"test-project-{unique_test_id}"
|
||||
|
||||
# Create project
|
||||
response = integration_client.post(
|
||||
"/api/v1/projects",
|
||||
json={"name": project_name, "description": "Test project", "is_public": True},
|
||||
)
|
||||
assert response.status_code == 200, f"Failed to create project: {response.text}"
|
||||
|
||||
yield project_name
|
||||
|
||||
# Cleanup: delete project
|
||||
try:
|
||||
integration_client.delete(f"/api/v1/projects/{project_name}")
|
||||
except Exception:
|
||||
pass # Ignore cleanup errors
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_package(integration_client, test_project, unique_test_id):
|
||||
"""
|
||||
Create a test package within a test project.
|
||||
|
||||
Yields (project_name, package_name) tuple.
|
||||
"""
|
||||
package_name = f"test-package-{unique_test_id}"
|
||||
|
||||
# Create package
|
||||
response = integration_client.post(
|
||||
f"/api/v1/project/{test_project}/packages",
|
||||
json={"name": package_name, "description": "Test package"},
|
||||
)
|
||||
assert response.status_code == 200, f"Failed to create package: {response.text}"
|
||||
|
||||
yield (test_project, package_name)
|
||||
|
||||
# Cleanup handled by test_project fixture (cascade delete)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_content():
|
||||
"""
|
||||
Generate unique test content for each test.
|
||||
|
||||
Returns (content_bytes, expected_sha256) tuple.
|
||||
"""
|
||||
import uuid
|
||||
|
||||
content = f"test-content-{uuid.uuid4().hex}".encode()
|
||||
sha256 = compute_sha256(content)
|
||||
return (content, sha256)
|
||||
|
||||
|
||||
def upload_test_file(
|
||||
client,
|
||||
project: str,
|
||||
package: str,
|
||||
content: bytes,
|
||||
filename: str = "test.bin",
|
||||
tag: str = None,
|
||||
) -> dict:
|
||||
"""
|
||||
Helper function to upload a test file.
|
||||
|
||||
Returns the upload response as a dict.
|
||||
"""
|
||||
files = {"file": (filename, io.BytesIO(content), "application/octet-stream")}
|
||||
data = {}
|
||||
if tag:
|
||||
data["tag"] = tag
|
||||
|
||||
response = client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data=data if data else None,
|
||||
)
|
||||
assert response.status_code == 200, f"Upload failed: {response.text}"
|
||||
return response.json()
|
||||
|
||||
388
backend/tests/test_integration_uploads.py
Normal file
388
backend/tests/test_integration_uploads.py
Normal file
@@ -0,0 +1,388 @@
|
||||
"""
|
||||
Integration tests for duplicate uploads and storage verification.
|
||||
|
||||
These tests require the full stack to be running (docker-compose.local.yml).
|
||||
|
||||
Tests cover:
|
||||
- Duplicate upload scenarios across packages and projects
|
||||
- Storage verification (single S3 object, single artifact row)
|
||||
- Upload table tracking
|
||||
- Content integrity verification
|
||||
- Concurrent upload handling
|
||||
- Failure cleanup
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import io
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from tests.conftest import (
|
||||
compute_sha256,
|
||||
upload_test_file,
|
||||
)
|
||||
|
||||
|
||||
class TestDuplicateUploadScenarios:
|
||||
"""Integration tests for duplicate upload behavior."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_same_file_twice_returns_same_artifact_id(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test uploading same file twice returns same artifact_id."""
|
||||
project, package = test_package
|
||||
content = b"content uploaded twice for same artifact test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# First upload
|
||||
result1 = upload_test_file(
|
||||
integration_client, project, package, content, tag="first"
|
||||
)
|
||||
assert result1["artifact_id"] == expected_hash
|
||||
|
||||
# Second upload
|
||||
result2 = upload_test_file(
|
||||
integration_client, project, package, content, tag="second"
|
||||
)
|
||||
assert result2["artifact_id"] == expected_hash
|
||||
assert result1["artifact_id"] == result2["artifact_id"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_same_file_twice_increments_ref_count(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test uploading same file twice increments ref_count to 2."""
|
||||
project, package = test_package
|
||||
content = b"content for ref count increment test"
|
||||
|
||||
# First upload
|
||||
result1 = upload_test_file(
|
||||
integration_client, project, package, content, tag="v1"
|
||||
)
|
||||
assert result1["ref_count"] == 1
|
||||
|
||||
# Second upload
|
||||
result2 = upload_test_file(
|
||||
integration_client, project, package, content, tag="v2"
|
||||
)
|
||||
assert result2["ref_count"] == 2
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_same_file_different_packages_shares_artifact(
|
||||
self, integration_client, test_project, unique_test_id
|
||||
):
|
||||
"""Test uploading same file to different packages shares artifact."""
|
||||
project = test_project
|
||||
content = f"content shared across packages {unique_test_id}".encode()
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Create two packages
|
||||
pkg1 = f"package-a-{unique_test_id}"
|
||||
pkg2 = f"package-b-{unique_test_id}"
|
||||
|
||||
integration_client.post(
|
||||
f"/api/v1/project/{project}/packages",
|
||||
json={"name": pkg1, "description": "Package A"},
|
||||
)
|
||||
integration_client.post(
|
||||
f"/api/v1/project/{project}/packages",
|
||||
json={"name": pkg2, "description": "Package B"},
|
||||
)
|
||||
|
||||
# Upload to first package
|
||||
result1 = upload_test_file(integration_client, project, pkg1, content, tag="v1")
|
||||
assert result1["artifact_id"] == expected_hash
|
||||
assert result1["deduplicated"] is False
|
||||
|
||||
# Upload to second package
|
||||
result2 = upload_test_file(integration_client, project, pkg2, content, tag="v1")
|
||||
assert result2["artifact_id"] == expected_hash
|
||||
assert result2["deduplicated"] is True
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_same_file_different_projects_shares_artifact(
|
||||
self, integration_client, unique_test_id
|
||||
):
|
||||
"""Test uploading same file to different projects shares artifact."""
|
||||
content = f"content shared across projects {unique_test_id}".encode()
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Create two projects with packages
|
||||
proj1 = f"project-x-{unique_test_id}"
|
||||
proj2 = f"project-y-{unique_test_id}"
|
||||
pkg_name = "shared-pkg"
|
||||
|
||||
try:
|
||||
# Create projects and packages
|
||||
integration_client.post(
|
||||
"/api/v1/projects",
|
||||
json={"name": proj1, "description": "Project X", "is_public": True},
|
||||
)
|
||||
integration_client.post(
|
||||
"/api/v1/projects",
|
||||
json={"name": proj2, "description": "Project Y", "is_public": True},
|
||||
)
|
||||
integration_client.post(
|
||||
f"/api/v1/project/{proj1}/packages",
|
||||
json={"name": pkg_name, "description": "Package"},
|
||||
)
|
||||
integration_client.post(
|
||||
f"/api/v1/project/{proj2}/packages",
|
||||
json={"name": pkg_name, "description": "Package"},
|
||||
)
|
||||
|
||||
# Upload to first project
|
||||
result1 = upload_test_file(
|
||||
integration_client, proj1, pkg_name, content, tag="v1"
|
||||
)
|
||||
assert result1["artifact_id"] == expected_hash
|
||||
assert result1["deduplicated"] is False
|
||||
|
||||
# Upload to second project
|
||||
result2 = upload_test_file(
|
||||
integration_client, proj2, pkg_name, content, tag="v1"
|
||||
)
|
||||
assert result2["artifact_id"] == expected_hash
|
||||
assert result2["deduplicated"] is True
|
||||
|
||||
finally:
|
||||
# Cleanup
|
||||
integration_client.delete(f"/api/v1/projects/{proj1}")
|
||||
integration_client.delete(f"/api/v1/projects/{proj2}")
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_same_file_different_filenames_shares_artifact(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test uploading same file with different original filenames shares artifact."""
|
||||
project, package = test_package
|
||||
content = b"content with different filenames"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Upload with filename1
|
||||
result1 = upload_test_file(
|
||||
integration_client,
|
||||
project,
|
||||
package,
|
||||
content,
|
||||
filename="file1.bin",
|
||||
tag="v1",
|
||||
)
|
||||
assert result1["artifact_id"] == expected_hash
|
||||
|
||||
# Upload with filename2
|
||||
result2 = upload_test_file(
|
||||
integration_client,
|
||||
project,
|
||||
package,
|
||||
content,
|
||||
filename="file2.bin",
|
||||
tag="v2",
|
||||
)
|
||||
assert result2["artifact_id"] == expected_hash
|
||||
assert result2["deduplicated"] is True
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_same_file_different_tags_shares_artifact(
|
||||
self, integration_client, test_package, unique_test_id
|
||||
):
|
||||
"""Test uploading same file with different tags shares artifact."""
|
||||
project, package = test_package
|
||||
content = f"content with different tags {unique_test_id}".encode()
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
tags = ["latest", "stable", "v1.0.0", "release"]
|
||||
for i, tag in enumerate(tags):
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content, tag=tag
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
if i == 0:
|
||||
assert result["deduplicated"] is False
|
||||
else:
|
||||
assert result["deduplicated"] is True
|
||||
|
||||
|
||||
class TestStorageVerification:
|
||||
"""Tests to verify storage behavior after duplicate uploads."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_artifact_table_single_row_after_duplicates(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test artifact table contains only one row after duplicate uploads."""
|
||||
project, package = test_package
|
||||
content = b"content for single row test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Upload same content multiple times with different tags
|
||||
for tag in ["v1", "v2", "v3"]:
|
||||
upload_test_file(integration_client, project, package, content, tag=tag)
|
||||
|
||||
# Query artifact - should exist and be unique
|
||||
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert response.status_code == 200
|
||||
artifact = response.json()
|
||||
assert artifact["id"] == expected_hash
|
||||
assert artifact["ref_count"] == 3
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_upload_table_multiple_rows_for_duplicates(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test upload table contains multiple rows for duplicate uploads (event tracking)."""
|
||||
project, package = test_package
|
||||
content = b"content for upload tracking test"
|
||||
|
||||
# Upload same content 3 times
|
||||
for tag in ["upload1", "upload2", "upload3"]:
|
||||
upload_test_file(integration_client, project, package, content, tag=tag)
|
||||
|
||||
# Check package stats - should show 3 uploads but fewer unique artifacts
|
||||
response = integration_client.get(
|
||||
f"/api/v1/project/{project}/packages/{package}"
|
||||
)
|
||||
assert response.status_code == 200
|
||||
pkg_info = response.json()
|
||||
assert pkg_info["tag_count"] == 3
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_artifact_content_matches_original(self, integration_client, test_package):
|
||||
"""Test artifact content retrieved matches original content exactly."""
|
||||
project, package = test_package
|
||||
original_content = b"exact content verification test data 12345"
|
||||
|
||||
# Upload
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, original_content, tag="verify"
|
||||
)
|
||||
|
||||
# Download and compare
|
||||
download_response = integration_client.get(
|
||||
f"/api/v1/project/{project}/{package}/+/verify", params={"mode": "proxy"}
|
||||
)
|
||||
assert download_response.status_code == 200
|
||||
downloaded_content = download_response.content
|
||||
assert downloaded_content == original_content
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_storage_stats_reflect_deduplication(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test total storage size matches single artifact size after duplicates."""
|
||||
project, package = test_package
|
||||
content = b"content for storage stats test - should only count once"
|
||||
content_size = len(content)
|
||||
|
||||
# Upload same content 5 times
|
||||
for tag in ["a", "b", "c", "d", "e"]:
|
||||
upload_test_file(integration_client, project, package, content, tag=tag)
|
||||
|
||||
# Check global stats
|
||||
response = integration_client.get("/api/v1/stats")
|
||||
assert response.status_code == 200
|
||||
stats = response.json()
|
||||
|
||||
# Deduplication should show savings
|
||||
assert stats["deduplicated_uploads"] > 0
|
||||
assert stats["storage_saved_bytes"] > 0
|
||||
|
||||
|
||||
class TestConcurrentUploads:
|
||||
"""Tests for concurrent upload handling."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_concurrent_uploads_same_file(self, integration_client, test_package):
|
||||
"""Test concurrent uploads of same file handle deduplication correctly."""
|
||||
project, package = test_package
|
||||
content = b"content for concurrent upload test"
|
||||
expected_hash = compute_sha256(content)
|
||||
num_concurrent = 5
|
||||
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
def upload_worker(tag_suffix):
|
||||
try:
|
||||
# Create a new client for this thread
|
||||
from httpx import Client
|
||||
|
||||
base_url = "http://localhost:8080"
|
||||
with Client(base_url=base_url, timeout=30.0) as client:
|
||||
files = {
|
||||
"file": (
|
||||
f"concurrent-{tag_suffix}.bin",
|
||||
io.BytesIO(content),
|
||||
"application/octet-stream",
|
||||
)
|
||||
}
|
||||
response = client.post(
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"tag": f"concurrent-{tag_suffix}"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
results.append(response.json())
|
||||
else:
|
||||
errors.append(f"Status {response.status_code}: {response.text}")
|
||||
except Exception as e:
|
||||
errors.append(str(e))
|
||||
|
||||
# Run concurrent uploads
|
||||
with ThreadPoolExecutor(max_workers=num_concurrent) as executor:
|
||||
futures = [executor.submit(upload_worker, i) for i in range(num_concurrent)]
|
||||
for future in as_completed(futures):
|
||||
pass # Wait for all to complete
|
||||
|
||||
# Verify results
|
||||
assert len(errors) == 0, f"Errors during concurrent uploads: {errors}"
|
||||
assert len(results) == num_concurrent
|
||||
|
||||
# All should have same artifact_id
|
||||
artifact_ids = set(r["artifact_id"] for r in results)
|
||||
assert len(artifact_ids) == 1
|
||||
assert expected_hash in artifact_ids
|
||||
|
||||
# Verify final ref_count
|
||||
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert response.status_code == 200
|
||||
assert response.json()["ref_count"] == num_concurrent
|
||||
|
||||
|
||||
class TestDeduplicationAcrossRestarts:
|
||||
"""Tests for deduplication persistence."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_deduplication_persists(
|
||||
self, integration_client, test_package, unique_test_id
|
||||
):
|
||||
"""
|
||||
Test deduplication works with persisted data.
|
||||
|
||||
This test uploads content, then uploads the same content again.
|
||||
Since the database persists, the second upload should detect
|
||||
the existing artifact even without server restart.
|
||||
"""
|
||||
project, package = test_package
|
||||
content = f"persisted content for dedup test {unique_test_id}".encode()
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# First upload
|
||||
result1 = upload_test_file(
|
||||
integration_client, project, package, content, tag="persist1"
|
||||
)
|
||||
assert result1["artifact_id"] == expected_hash
|
||||
assert result1["deduplicated"] is False
|
||||
|
||||
# Second upload (simulating after restart - data is persisted)
|
||||
result2 = upload_test_file(
|
||||
integration_client, project, package, content, tag="persist2"
|
||||
)
|
||||
assert result2["artifact_id"] == expected_hash
|
||||
assert result2["deduplicated"] is True
|
||||
|
||||
# Verify artifact exists with correct ref_count
|
||||
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert response.status_code == 200
|
||||
assert response.json()["ref_count"] == 2
|
||||
176
backend/tests/test_ref_count.py
Normal file
176
backend/tests/test_ref_count.py
Normal file
@@ -0,0 +1,176 @@
|
||||
"""
|
||||
Unit and integration tests for reference counting behavior.
|
||||
|
||||
Tests cover:
|
||||
- ref_count is set correctly for new artifacts
|
||||
- ref_count increments on duplicate uploads
|
||||
- ref_count query correctly identifies existing artifacts
|
||||
- Artifact lookup by SHA256 hash works correctly
|
||||
"""
|
||||
|
||||
import pytest
|
||||
import io
|
||||
from tests.conftest import (
|
||||
compute_sha256,
|
||||
upload_test_file,
|
||||
TEST_CONTENT_HELLO,
|
||||
TEST_HASH_HELLO,
|
||||
)
|
||||
|
||||
|
||||
class TestRefCountQuery:
|
||||
"""Tests for ref_count querying and artifact lookup."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_artifact_lookup_by_sha256(self, integration_client, test_package):
|
||||
"""Test artifact lookup by SHA256 hash (primary key) works correctly."""
|
||||
project, package = test_package
|
||||
content = b"unique content for lookup test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Upload a file
|
||||
upload_result = upload_test_file(
|
||||
integration_client, project, package, content, tag="v1"
|
||||
)
|
||||
assert upload_result["artifact_id"] == expected_hash
|
||||
|
||||
# Look up artifact by ID (SHA256)
|
||||
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert response.status_code == 200
|
||||
|
||||
artifact = response.json()
|
||||
assert artifact["id"] == expected_hash
|
||||
assert artifact["sha256"] == expected_hash
|
||||
assert artifact["size"] == len(content)
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_ref_count_query_identifies_existing_artifact(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test ref_count query correctly identifies existing artifacts by hash."""
|
||||
project, package = test_package
|
||||
content = b"content for ref count query test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Upload a file with a tag
|
||||
upload_result = upload_test_file(
|
||||
integration_client, project, package, content, tag="v1"
|
||||
)
|
||||
|
||||
# Query artifact and check ref_count
|
||||
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert response.status_code == 200
|
||||
|
||||
artifact = response.json()
|
||||
assert artifact["ref_count"] >= 1 # At least 1 from the tag
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_ref_count_set_to_1_for_new_artifact_with_tag(
|
||||
self, integration_client, test_package, unique_test_id
|
||||
):
|
||||
"""Test ref_count is set to 1 for new artifacts when created with a tag."""
|
||||
project, package = test_package
|
||||
content = f"brand new content for ref count test {unique_test_id}".encode()
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Upload a new file with a tag
|
||||
upload_result = upload_test_file(
|
||||
integration_client, project, package, content, tag="initial"
|
||||
)
|
||||
|
||||
assert upload_result["artifact_id"] == expected_hash
|
||||
assert upload_result["ref_count"] == 1
|
||||
assert upload_result["deduplicated"] is False
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_ref_count_increments_on_duplicate_upload_with_tag(
|
||||
self, integration_client, test_package, unique_test_id
|
||||
):
|
||||
"""Test ref_count is incremented when duplicate content is uploaded with a new tag."""
|
||||
project, package = test_package
|
||||
content = f"content that will be uploaded twice {unique_test_id}".encode()
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# First upload with tag
|
||||
result1 = upload_test_file(
|
||||
integration_client, project, package, content, tag="v1"
|
||||
)
|
||||
assert result1["ref_count"] == 1
|
||||
assert result1["deduplicated"] is False
|
||||
|
||||
# Second upload with different tag (same content)
|
||||
result2 = upload_test_file(
|
||||
integration_client, project, package, content, tag="v2"
|
||||
)
|
||||
assert result2["artifact_id"] == expected_hash
|
||||
assert result2["ref_count"] == 2
|
||||
assert result2["deduplicated"] is True
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_ref_count_after_multiple_tags(self, integration_client, test_package):
|
||||
"""Test ref_count correctly reflects number of tags pointing to artifact."""
|
||||
project, package = test_package
|
||||
content = b"content for multiple tag test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Upload with multiple tags
|
||||
tags = ["v1", "v2", "v3", "latest"]
|
||||
for i, tag in enumerate(tags):
|
||||
result = upload_test_file(
|
||||
integration_client, project, package, content, tag=tag
|
||||
)
|
||||
assert result["artifact_id"] == expected_hash
|
||||
assert result["ref_count"] == i + 1
|
||||
|
||||
# Verify final ref_count via artifact endpoint
|
||||
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert response.status_code == 200
|
||||
assert response.json()["ref_count"] == len(tags)
|
||||
|
||||
|
||||
class TestRefCountWithDeletion:
|
||||
"""Tests for ref_count behavior when tags are deleted."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_ref_count_decrements_on_tag_delete(self, integration_client, test_package):
|
||||
"""Test ref_count decrements when a tag is deleted."""
|
||||
project, package = test_package
|
||||
content = b"content for delete test"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Upload with two tags
|
||||
upload_test_file(integration_client, project, package, content, tag="v1")
|
||||
upload_test_file(integration_client, project, package, content, tag="v2")
|
||||
|
||||
# Verify ref_count is 2
|
||||
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert response.json()["ref_count"] == 2
|
||||
|
||||
# Delete one tag
|
||||
delete_response = integration_client.delete(
|
||||
f"/api/v1/project/{project}/{package}/tags/v1"
|
||||
)
|
||||
assert delete_response.status_code == 204
|
||||
|
||||
# Verify ref_count is now 1
|
||||
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert response.json()["ref_count"] == 1
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_ref_count_zero_after_all_tags_deleted(
|
||||
self, integration_client, test_package
|
||||
):
|
||||
"""Test ref_count goes to 0 when all tags are deleted."""
|
||||
project, package = test_package
|
||||
content = b"content that will be orphaned"
|
||||
expected_hash = compute_sha256(content)
|
||||
|
||||
# Upload with one tag
|
||||
upload_test_file(integration_client, project, package, content, tag="only-tag")
|
||||
|
||||
# Delete the tag
|
||||
integration_client.delete(f"/api/v1/project/{project}/{package}/tags/only-tag")
|
||||
|
||||
# Verify ref_count is 0
|
||||
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
|
||||
assert response.json()["ref_count"] == 0
|
||||
Reference in New Issue
Block a user