4 Commits

Author SHA1 Message Date
Mondo Diaz
af66fd5845 Add security fixes: SHA256 hash validation and streaming file size enforcement
- Add field_validator to ResumableUploadInitRequest to validate expected_hash
  is a valid 64-character lowercase hex SHA256 hash (normalizes to lowercase)
- Add FileSizeExceededError exception for file size limit violations
- Enforce file size limits in storage layer during streaming (prevents
  Content-Length header spoofing)
- Add FileSizeExceededError handler in upload endpoint returning HTTP 413
- Add node_modules and frontend/dist to .gitignore
2026-01-05 15:43:19 -06:00
Mondo Diaz
55a38ad850 Add deduplication design doc, file size limits, and validation tests
- Add max_file_size (10GB) and min_file_size (1 byte) config options
- Add file size validation to regular and resumable upload endpoints
- Create comprehensive deduplication design document covering:
  - SHA256 algorithm selection rationale and migration path
  - Content-addressable storage model
  - S3 key derivation and prefix sharding
  - Duplicate detection workflow
  - Reference counting lifecycle
  - Edge cases and error handling
  - Collision detection strategy
  - Performance considerations
  - Operations runbook
- Add tests for empty file rejection and file size validation
2026-01-05 15:35:21 -06:00
Mondo Diaz
32115fc1c5 Add integration tests for garbage collection endpoints 2026-01-05 15:24:46 -06:00
Mondo Diaz
4c2e21295f Add comprehensive ref_count tests and fix resumable upload double-counting bug
- Add tests for cascade deletion ref_count (package/project delete)
- Add tests for tag update ref_count adjustments
- Fix resumable upload bug where ref_count was incremented manually AND by SQL trigger
- ref_count is now exclusively managed by SQL triggers on tag INSERT/DELETE/UPDATE
2026-01-05 15:19:05 -06:00
10 changed files with 1172 additions and 6 deletions

4
.gitignore vendored
View File

@@ -37,6 +37,10 @@ Thumbs.db
# Build # Build
/build/ /build/
/dist/ /dist/
frontend/dist/
# Node
node_modules/
# Local config overrides # Local config overrides
config.local.yaml config.local.yaml

View File

@@ -32,10 +32,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added integration tests for upload scenarios and ref_count management (#35) - Added integration tests for upload scenarios and ref_count management (#35)
- Added integration tests for S3 verification and failure cleanup (#35) - Added integration tests for S3 verification and failure cleanup (#35)
- Added integration tests for all stats endpoints (#35) - Added integration tests for all stats endpoints (#35)
- Added integration tests for cascade deletion ref_count behavior (package/project delete) (#35)
- Added integration tests for tag update ref_count adjustments (#35)
- Added integration tests for garbage collection endpoints (#35)
- Added integration tests for file size validation (#35)
- Added test dependencies to requirements.txt (pytest, pytest-asyncio, pytest-cov, httpx, moto) (#35) - Added test dependencies to requirements.txt (pytest, pytest-asyncio, pytest-cov, httpx, moto) (#35)
- Added `ORCHARD_MAX_FILE_SIZE` config option (default: 10GB) for upload size limits (#37)
- Added `ORCHARD_MIN_FILE_SIZE` config option (default: 1 byte, rejects empty files) (#37)
- Added file size validation to upload and resumable upload endpoints (#37)
- Added comprehensive deduplication design document (`docs/design/deduplication-design.md`) (#37)
### Fixed ### Fixed
- Fixed Helm chart `minio.ingress` conflicting with Bitnami MinIO subchart by renaming to `minioIngress` (#48) - Fixed Helm chart `minio.ingress` conflicting with Bitnami MinIO subchart by renaming to `minioIngress` (#48)
- Fixed JSON report serialization error for Decimal types in `GET /api/v1/stats/report` (#34) - Fixed JSON report serialization error for Decimal types in `GET /api/v1/stats/report` (#34)
- Fixed resumable upload double-counting ref_count when tag provided (removed manual increment, SQL triggers handle it) (#35)
## [0.3.0] - 2025-12-15 ## [0.3.0] - 2025-12-15
### Changed ### Changed

View File

@@ -38,6 +38,10 @@ class Settings(BaseSettings):
s3_read_timeout: int = 60 # Read timeout in seconds s3_read_timeout: int = 60 # Read timeout in seconds
s3_max_retries: int = 3 # Max retry attempts for transient failures s3_max_retries: int = 3 # Max retry attempts for transient failures
# Upload settings
max_file_size: int = 10 * 1024 * 1024 * 1024 # 10GB default max file size
min_file_size: int = 1 # Minimum 1 byte (empty files rejected)
# Download settings # Download settings
download_mode: str = "presigned" # "presigned", "redirect", or "proxy" download_mode: str = "presigned" # "presigned", "redirect", or "proxy"
presigned_url_expiry: int = ( presigned_url_expiry: int = (

View File

@@ -28,6 +28,7 @@ from .storage import (
MULTIPART_CHUNK_SIZE, MULTIPART_CHUNK_SIZE,
StorageError, StorageError,
HashComputationError, HashComputationError,
FileSizeExceededError,
S3ExistenceCheckError, S3ExistenceCheckError,
S3UploadError, S3UploadError,
S3StorageUnavailableError, S3StorageUnavailableError,
@@ -973,6 +974,20 @@ def upload_artifact(
if not package: if not package:
raise HTTPException(status_code=404, detail="Package not found") raise HTTPException(status_code=404, detail="Package not found")
# Validate file size
settings = get_settings()
if content_length is not None:
if content_length > settings.max_file_size:
raise HTTPException(
status_code=413,
detail=f"File too large. Maximum size is {settings.max_file_size // (1024 * 1024 * 1024)}GB",
)
if content_length < settings.min_file_size:
raise HTTPException(
status_code=422,
detail="Empty files are not allowed",
)
# Extract format-specific metadata before storing # Extract format-specific metadata before storing
file_metadata = {} file_metadata = {}
if file.filename: if file.filename:
@@ -1019,6 +1034,12 @@ def upload_artifact(
status_code=500, status_code=500,
detail="Data integrity error detected. Please contact support.", detail="Data integrity error detected. Please contact support.",
) )
except FileSizeExceededError as e:
logger.warning(f"File size exceeded during upload: {e}")
raise HTTPException(
status_code=413,
detail=f"File too large. Maximum size is {settings.max_file_size // (1024 * 1024 * 1024)}GB",
)
except StorageError as e: except StorageError as e:
logger.error(f"Storage error during upload: {e}") logger.error(f"Storage error during upload: {e}")
raise HTTPException(status_code=500, detail="Internal storage error") raise HTTPException(status_code=500, detail="Internal storage error")
@@ -1162,13 +1183,30 @@ def init_resumable_upload(
if not package: if not package:
raise HTTPException(status_code=404, detail="Package not found") raise HTTPException(status_code=404, detail="Package not found")
# Validate file size
settings = get_settings()
if init_request.size > settings.max_file_size:
raise HTTPException(
status_code=413,
detail=f"File too large. Maximum size is {settings.max_file_size // (1024 * 1024 * 1024)}GB",
)
if init_request.size < settings.min_file_size:
raise HTTPException(
status_code=422,
detail="Empty files are not allowed",
)
# Check if artifact already exists (deduplication) # Check if artifact already exists (deduplication)
existing_artifact = ( existing_artifact = (
db.query(Artifact).filter(Artifact.id == init_request.expected_hash).first() db.query(Artifact).filter(Artifact.id == init_request.expected_hash).first()
) )
if existing_artifact: if existing_artifact:
# File already exists - use atomic increment for ref count # File already exists - deduplicated upload
_increment_ref_count(db, existing_artifact.id) # NOTE: ref_count is managed by SQL triggers on tag INSERT/DELETE/UPDATE
# We do NOT manually increment here because:
# 1. If a tag is provided, _create_or_update_tag will create/update a tag
# and the SQL trigger will handle ref_count
# 2. If no tag is provided, ref_count shouldn't change (no new reference)
# Record the upload # Record the upload
upload = Upload( upload = Upload(

View File

@@ -1,6 +1,6 @@
from datetime import datetime from datetime import datetime
from typing import Optional, List, Dict, Any, Generic, TypeVar from typing import Optional, List, Dict, Any, Generic, TypeVar
from pydantic import BaseModel from pydantic import BaseModel, field_validator
from uuid import UUID from uuid import UUID
T = TypeVar("T") T = TypeVar("T")
@@ -266,6 +266,18 @@ class ResumableUploadInitRequest(BaseModel):
size: int size: int
tag: Optional[str] = None tag: Optional[str] = None
@field_validator("expected_hash")
@classmethod
def validate_sha256_hash(cls, v: str) -> str:
"""Validate that expected_hash is a valid 64-character lowercase hex SHA256 hash."""
import re
if not re.match(r"^[a-f0-9]{64}$", v.lower()):
raise ValueError(
"expected_hash must be a valid 64-character lowercase hexadecimal SHA256 hash"
)
return v.lower() # Normalize to lowercase
class ResumableUploadInitResponse(BaseModel): class ResumableUploadInitResponse(BaseModel):
"""Response from initiating a resumable upload""" """Response from initiating a resumable upload"""

View File

@@ -176,6 +176,12 @@ class HashComputationError(StorageError):
pass pass
class FileSizeExceededError(StorageError):
"""Raised when file exceeds maximum size during upload"""
pass
class S3ExistenceCheckError(StorageError): class S3ExistenceCheckError(StorageError):
"""Raised when S3 existence check fails after retries""" """Raised when S3 existence check fails after retries"""
@@ -261,6 +267,7 @@ class S3Storage:
Raises: Raises:
HashComputationError: If hash computation fails HashComputationError: If hash computation fails
FileSizeExceededError: If file exceeds maximum size
S3ExistenceCheckError: If S3 existence check fails after retries S3ExistenceCheckError: If S3 existence check fails after retries
S3UploadError: If S3 upload fails S3UploadError: If S3 upload fails
""" """
@@ -270,11 +277,18 @@ class S3Storage:
if not content: if not content:
raise HashComputationError("Empty file content") raise HashComputationError("Empty file content")
size = len(content)
# Enforce file size limit (protection against Content-Length spoofing)
if size > settings.max_file_size:
raise FileSizeExceededError(
f"File size {size} exceeds maximum {settings.max_file_size}"
)
sha256_hash = hashlib.sha256(content).hexdigest() sha256_hash = hashlib.sha256(content).hexdigest()
md5_hash = hashlib.md5(content).hexdigest() md5_hash = hashlib.md5(content).hexdigest()
sha1_hash = hashlib.sha1(content).hexdigest() sha1_hash = hashlib.sha1(content).hexdigest()
size = len(content) except (HashComputationError, FileSizeExceededError):
except HashComputationError:
raise raise
except Exception as e: except Exception as e:
logger.error(f"Hash computation failed: {e}") logger.error(f"Hash computation failed: {e}")
@@ -349,6 +363,7 @@ class S3Storage:
Raises: Raises:
HashComputationError: If hash computation fails HashComputationError: If hash computation fails
FileSizeExceededError: If file exceeds maximum size
S3ExistenceCheckError: If S3 existence check fails after retries S3ExistenceCheckError: If S3 existence check fails after retries
S3UploadError: If S3 upload fails S3UploadError: If S3 upload fails
""" """
@@ -369,13 +384,19 @@ class S3Storage:
sha1_hasher.update(chunk) sha1_hasher.update(chunk)
size += len(chunk) size += len(chunk)
# Enforce file size limit during streaming (protection against spoofing)
if size > settings.max_file_size:
raise FileSizeExceededError(
f"File size exceeds maximum {settings.max_file_size}"
)
if size == 0: if size == 0:
raise HashComputationError("Empty file content") raise HashComputationError("Empty file content")
sha256_hash = sha256_hasher.hexdigest() sha256_hash = sha256_hasher.hexdigest()
md5_hash = md5_hasher.hexdigest() md5_hash = md5_hasher.hexdigest()
sha1_hash = sha1_hasher.hexdigest() sha1_hash = sha1_hasher.hexdigest()
except HashComputationError: except (HashComputationError, FileSizeExceededError):
raise raise
except Exception as e: except Exception as e:
logger.error(f"Hash computation failed for multipart upload: {e}") logger.error(f"Hash computation failed for multipart upload: {e}")

View File

@@ -0,0 +1,168 @@
"""
Integration tests for garbage collection functionality.
Tests cover:
- Listing orphaned artifacts (ref_count=0)
- Garbage collection in dry-run mode
- Garbage collection actual deletion
- Verifying artifacts with refs are not deleted
"""
import pytest
from tests.conftest import (
compute_sha256,
upload_test_file,
)
class TestOrphanedArtifactsEndpoint:
"""Tests for GET /api/v1/admin/orphaned-artifacts endpoint."""
@pytest.mark.integration
def test_list_orphaned_artifacts_returns_list(self, integration_client):
"""Test orphaned artifacts endpoint returns a list."""
response = integration_client.get("/api/v1/admin/orphaned-artifacts")
assert response.status_code == 200
assert isinstance(response.json(), list)
@pytest.mark.integration
def test_orphaned_artifact_has_required_fields(self, integration_client):
"""Test orphaned artifact response has required fields."""
response = integration_client.get("/api/v1/admin/orphaned-artifacts?limit=1")
assert response.status_code == 200
data = response.json()
if len(data) > 0:
artifact = data[0]
assert "id" in artifact
assert "size" in artifact
assert "created_at" in artifact
assert "created_by" in artifact
assert "original_name" in artifact
@pytest.mark.integration
def test_orphaned_artifacts_respects_limit(self, integration_client):
"""Test orphaned artifacts endpoint respects limit parameter."""
response = integration_client.get("/api/v1/admin/orphaned-artifacts?limit=5")
assert response.status_code == 200
assert len(response.json()) <= 5
@pytest.mark.integration
def test_artifact_becomes_orphaned_when_tag_deleted(
self, integration_client, test_package, unique_test_id
):
"""Test artifact appears in orphaned list after tag is deleted."""
project, package = test_package
content = f"orphan test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload with tag
upload_test_file(integration_client, project, package, content, tag="temp-tag")
# Verify not in orphaned list (has ref_count=1)
response = integration_client.get("/api/v1/admin/orphaned-artifacts?limit=1000")
orphaned_ids = [a["id"] for a in response.json()]
assert expected_hash not in orphaned_ids
# Delete the tag
integration_client.delete(f"/api/v1/project/{project}/{package}/tags/temp-tag")
# Verify now in orphaned list (ref_count=0)
response = integration_client.get("/api/v1/admin/orphaned-artifacts?limit=1000")
orphaned_ids = [a["id"] for a in response.json()]
assert expected_hash in orphaned_ids
class TestGarbageCollectionEndpoint:
"""Tests for POST /api/v1/admin/garbage-collect endpoint."""
@pytest.mark.integration
def test_garbage_collect_dry_run_returns_response(self, integration_client):
"""Test garbage collection dry run returns valid response."""
response = integration_client.post("/api/v1/admin/garbage-collect?dry_run=true")
assert response.status_code == 200
data = response.json()
assert "artifacts_deleted" in data
assert "bytes_freed" in data
assert "artifact_ids" in data
assert "dry_run" in data
assert data["dry_run"] is True
@pytest.mark.integration
def test_garbage_collect_dry_run_doesnt_delete(
self, integration_client, test_package, unique_test_id
):
"""Test garbage collection dry run doesn't actually delete artifacts."""
project, package = test_package
content = f"dry run test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload and delete tag to create orphan
upload_test_file(integration_client, project, package, content, tag="dry-run")
integration_client.delete(f"/api/v1/project/{project}/{package}/tags/dry-run")
# Verify artifact exists
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
# Run garbage collection in dry-run mode
gc_response = integration_client.post(
"/api/v1/admin/garbage-collect?dry_run=true&limit=1000"
)
assert gc_response.status_code == 200
assert expected_hash in gc_response.json()["artifact_ids"]
# Verify artifact STILL exists (dry run didn't delete)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
@pytest.mark.integration
def test_garbage_collect_preserves_referenced_artifacts(
self, integration_client, test_package, unique_test_id
):
"""Test garbage collection doesn't delete artifacts with ref_count > 0."""
project, package = test_package
content = f"preserve test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload with tag (ref_count=1)
upload_test_file(integration_client, project, package, content, tag="keep-this")
# Verify artifact exists with ref_count=1
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
assert response.json()["ref_count"] == 1
# Run garbage collection (dry_run to not affect other tests)
gc_response = integration_client.post(
"/api/v1/admin/garbage-collect?dry_run=true&limit=1000"
)
assert gc_response.status_code == 200
# Verify artifact was NOT in delete list (has ref_count > 0)
assert expected_hash not in gc_response.json()["artifact_ids"]
# Verify artifact still exists
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.status_code == 200
assert response.json()["ref_count"] == 1
@pytest.mark.integration
def test_garbage_collect_respects_limit(self, integration_client):
"""Test garbage collection respects limit parameter."""
response = integration_client.post(
"/api/v1/admin/garbage-collect?dry_run=true&limit=5"
)
assert response.status_code == 200
assert response.json()["artifacts_deleted"] <= 5
@pytest.mark.integration
def test_garbage_collect_returns_bytes_freed(self, integration_client):
"""Test garbage collection returns accurate bytes_freed."""
response = integration_client.post("/api/v1/admin/garbage-collect?dry_run=true")
assert response.status_code == 200
data = response.json()
assert data["bytes_freed"] >= 0
assert isinstance(data["bytes_freed"], int)

View File

@@ -549,3 +549,56 @@ class TestUploadFailureCleanup:
) )
assert tag_response.status_code == 200 assert tag_response.status_code == 200
assert tag_response.json()["artifact_id"] == hash2 assert tag_response.json()["artifact_id"] == hash2
class TestFileSizeValidation:
"""Tests for file size limits and empty file rejection."""
@pytest.mark.integration
def test_empty_file_rejected(self, integration_client, test_package):
"""Test that empty files are rejected with appropriate error."""
project, package = test_package
# Try to upload empty content
files = {"file": ("empty.txt", io.BytesIO(b""), "application/octet-stream")}
response = integration_client.post(
f"/api/v1/project/{project}/{package}/upload",
files=files,
)
# Should be rejected (422 from storage layer or validation)
assert response.status_code in [422, 400]
@pytest.mark.integration
def test_small_valid_file_accepted(self, integration_client, test_package):
"""Test that small (1 byte) files are accepted."""
project, package = test_package
content = b"X" # Single byte
result = upload_test_file(
integration_client, project, package, content, tag="tiny"
)
assert result["artifact_id"] is not None
assert result["size"] == 1
@pytest.mark.integration
def test_file_size_reported_correctly(
self, integration_client, test_package, unique_test_id
):
"""Test that file size is correctly reported in response."""
project, package = test_package
content = f"Test content for size check {unique_test_id}".encode()
expected_size = len(content)
result = upload_test_file(
integration_client, project, package, content, tag="size-test"
)
assert result["size"] == expected_size
# Also verify via artifact endpoint
artifact_response = integration_client.get(
f"/api/v1/artifact/{result['artifact_id']}"
)
assert artifact_response.json()["size"] == expected_size

View File

@@ -174,3 +174,285 @@ class TestRefCountWithDeletion:
# Verify ref_count is 0 # Verify ref_count is 0
response = integration_client.get(f"/api/v1/artifact/{expected_hash}") response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 0 assert response.json()["ref_count"] == 0
class TestRefCountCascadeDelete:
"""Tests for ref_count behavior during cascade deletions."""
@pytest.mark.integration
def test_ref_count_decrements_on_package_delete(
self, integration_client, unique_test_id
):
"""Test ref_count decrements for all tags when package is deleted."""
# Create a project and package manually (not using fixtures to control cleanup)
project_name = f"cascade-pkg-{unique_test_id}"
package_name = f"test-pkg-{unique_test_id}"
# Create project
response = integration_client.post(
"/api/v1/projects",
json={
"name": project_name,
"description": "Test project",
"is_public": True,
},
)
assert response.status_code == 200
# Create package
response = integration_client.post(
f"/api/v1/project/{project_name}/packages",
json={"name": package_name, "description": "Test package"},
)
assert response.status_code == 200
# Upload content with multiple tags
content = f"cascade delete test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
upload_test_file(
integration_client, project_name, package_name, content, tag="v1"
)
upload_test_file(
integration_client, project_name, package_name, content, tag="v2"
)
upload_test_file(
integration_client, project_name, package_name, content, tag="v3"
)
# Verify ref_count is 3
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 3
# Delete the package (should cascade delete all tags and decrement ref_count)
delete_response = integration_client.delete(
f"/api/v1/project/{project_name}/packages/{package_name}"
)
assert delete_response.status_code == 204
# Verify ref_count is 0 (all tags were deleted)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 0
# Cleanup: delete the project
integration_client.delete(f"/api/v1/projects/{project_name}")
@pytest.mark.integration
def test_ref_count_decrements_on_project_delete(
self, integration_client, unique_test_id
):
"""Test ref_count decrements for all tags in all packages when project is deleted."""
# Create a project manually (not using fixtures to control cleanup)
project_name = f"cascade-proj-{unique_test_id}"
package1_name = f"pkg1-{unique_test_id}"
package2_name = f"pkg2-{unique_test_id}"
# Create project
response = integration_client.post(
"/api/v1/projects",
json={
"name": project_name,
"description": "Test project",
"is_public": True,
},
)
assert response.status_code == 200
# Create two packages
for pkg_name in [package1_name, package2_name]:
response = integration_client.post(
f"/api/v1/project/{project_name}/packages",
json={"name": pkg_name, "description": "Test package"},
)
assert response.status_code == 200
# Upload same content with tags in both packages
content = f"project cascade test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
upload_test_file(
integration_client, project_name, package1_name, content, tag="v1"
)
upload_test_file(
integration_client, project_name, package1_name, content, tag="v2"
)
upload_test_file(
integration_client, project_name, package2_name, content, tag="latest"
)
upload_test_file(
integration_client, project_name, package2_name, content, tag="stable"
)
# Verify ref_count is 4 (2 tags in each of 2 packages)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 4
# Delete the project (should cascade delete all packages, tags, and decrement ref_count)
delete_response = integration_client.delete(f"/api/v1/projects/{project_name}")
assert delete_response.status_code == 204
# Verify ref_count is 0
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 0
@pytest.mark.integration
def test_shared_artifact_ref_count_partial_decrement(
self, integration_client, unique_test_id
):
"""Test ref_count correctly decrements when artifact is shared across packages."""
# Create project with two packages
project_name = f"shared-artifact-{unique_test_id}"
package1_name = f"pkg1-{unique_test_id}"
package2_name = f"pkg2-{unique_test_id}"
# Create project
response = integration_client.post(
"/api/v1/projects",
json={
"name": project_name,
"description": "Test project",
"is_public": True,
},
)
assert response.status_code == 200
# Create two packages
for pkg_name in [package1_name, package2_name]:
response = integration_client.post(
f"/api/v1/project/{project_name}/packages",
json={"name": pkg_name, "description": "Test package"},
)
assert response.status_code == 200
# Upload same content to both packages
content = f"shared artifact {unique_test_id}".encode()
expected_hash = compute_sha256(content)
upload_test_file(
integration_client, project_name, package1_name, content, tag="v1"
)
upload_test_file(
integration_client, project_name, package2_name, content, tag="v1"
)
# Verify ref_count is 2
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 2
# Delete only package1 (package2 still references the artifact)
delete_response = integration_client.delete(
f"/api/v1/project/{project_name}/packages/{package1_name}"
)
assert delete_response.status_code == 204
# Verify ref_count is 1 (only package2's tag remains)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 1
# Cleanup
integration_client.delete(f"/api/v1/projects/{project_name}")
class TestRefCountTagUpdate:
"""Tests for ref_count behavior when tags are updated to point to different artifacts."""
@pytest.mark.integration
def test_ref_count_adjusts_on_tag_update(
self, integration_client, test_package, unique_test_id
):
"""Test ref_count adjusts when a tag is updated to point to a different artifact."""
project, package = test_package
# Upload two different artifacts
content1 = f"artifact one {unique_test_id}".encode()
content2 = f"artifact two {unique_test_id}".encode()
hash1 = compute_sha256(content1)
hash2 = compute_sha256(content2)
# Upload first artifact with tag "latest"
upload_test_file(integration_client, project, package, content1, tag="latest")
# Verify first artifact has ref_count 1
response = integration_client.get(f"/api/v1/artifact/{hash1}")
assert response.json()["ref_count"] == 1
# Upload second artifact with different tag
upload_test_file(integration_client, project, package, content2, tag="stable")
# Now update "latest" tag to point to second artifact
# This is done by uploading the same content with the same tag
upload_test_file(integration_client, project, package, content2, tag="latest")
# Verify first artifact ref_count decreased to 0 (tag moved away)
response = integration_client.get(f"/api/v1/artifact/{hash1}")
assert response.json()["ref_count"] == 0
# Verify second artifact ref_count increased to 2 (stable + latest)
response = integration_client.get(f"/api/v1/artifact/{hash2}")
assert response.json()["ref_count"] == 2
@pytest.mark.integration
def test_ref_count_unchanged_when_tag_same_artifact(
self, integration_client, test_package, unique_test_id
):
"""Test ref_count doesn't change when tag is 'updated' to same artifact."""
project, package = test_package
content = f"same artifact {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload with tag
upload_test_file(integration_client, project, package, content, tag="v1")
# Verify ref_count is 1
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 1
# Upload same content with same tag (no-op)
upload_test_file(integration_client, project, package, content, tag="v1")
# Verify ref_count is still 1 (no double-counting)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 1
@pytest.mark.integration
def test_tag_via_post_endpoint_increments_ref_count(
self, integration_client, test_package, unique_test_id
):
"""Test creating tag via POST /tags endpoint increments ref_count."""
project, package = test_package
content = f"tag endpoint test {unique_test_id}".encode()
expected_hash = compute_sha256(content)
# Upload artifact without tag
result = upload_test_file(
integration_client, project, package, content, filename="test.bin", tag=None
)
artifact_id = result["artifact_id"]
# Verify ref_count is 0 (no tags yet)
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 0
# Create tag via POST endpoint
tag_response = integration_client.post(
f"/api/v1/project/{project}/{package}/tags",
json={"name": "v1.0.0", "artifact_id": artifact_id},
)
assert tag_response.status_code == 200
# Verify ref_count is now 1
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 1
# Create another tag via POST endpoint
tag_response = integration_client.post(
f"/api/v1/project/{project}/{package}/tags",
json={"name": "latest", "artifact_id": artifact_id},
)
assert tag_response.status_code == 200
# Verify ref_count is now 2
response = integration_client.get(f"/api/v1/artifact/{expected_hash}")
assert response.json()["ref_count"] == 2

View File

@@ -0,0 +1,575 @@
# Deduplication Design Document
This document defines Orchard's content-addressable storage and deduplication approach using SHA256 hashes.
## Table of Contents
1. [Overview](#overview)
2. [Hash Algorithm Selection](#hash-algorithm-selection)
3. [Content-Addressable Storage Model](#content-addressable-storage-model)
4. [S3 Key Derivation](#s3-key-derivation)
5. [Duplicate Detection Strategy](#duplicate-detection-strategy)
6. [Reference Counting Lifecycle](#reference-counting-lifecycle)
7. [Edge Cases and Error Handling](#edge-cases-and-error-handling)
8. [Collision Handling](#collision-handling)
9. [Performance Considerations](#performance-considerations)
10. [Operations Runbook](#operations-runbook)
---
## Overview
Orchard uses **whole-file deduplication** based on content hashing. When a file is uploaded:
1. The SHA256 hash of the entire file content is computed
2. The hash becomes the artifact's primary identifier
3. If a file with the same hash already exists, no duplicate is stored
4. Multiple tags/references can point to the same artifact
**Scope:** Orchard implements whole-file deduplication only. Chunk-level or block-level deduplication is out of scope for MVP.
---
## Hash Algorithm Selection
### Decision: SHA256
| Criteria | SHA256 | SHA1 | MD5 | Blake3 |
|----------|--------|------|-----|--------|
| Security | Strong (256-bit) | Weak (broken) | Weak (broken) | Strong |
| Speed | ~400 MB/s | ~600 MB/s | ~800 MB/s | ~1500 MB/s |
| Collision Resistance | 2^128 | Broken | Broken | 2^128 |
| Industry Adoption | Universal | Legacy | Legacy | Emerging |
| Tool Ecosystem | Excellent | Good | Good | Growing |
### Rationale
1. **Security**: SHA256 has no known practical collision attacks. SHA1 and MD5 are cryptographically broken.
2. **Collision Resistance**: With 256-bit output, the probability of accidental collision is approximately 2^-128 (~10^-38). To have a 50% chance of collision, you would need approximately 2^128 unique files.
3. **Industry Standard**: SHA256 is the de facto standard for content-addressable storage (Git, Docker, npm, etc.).
4. **Performance**: While Blake3 is faster, SHA256 throughput (~400 MB/s) exceeds typical network bandwidth for uploads. The bottleneck is I/O, not hashing.
5. **Tooling**: Universal support in all languages, operating systems, and verification tools.
### Migration Path
If a future algorithm change is needed (e.g., SHA3 or Blake3):
1. **Database**: Add `hash_algorithm` column to artifacts table (default: 'sha256')
2. **S3 Keys**: New algorithm uses different prefix (e.g., `fruits-sha3/` vs `fruits/`)
3. **API**: Accept algorithm hint in upload, return algorithm in responses
4. **Migration**: Background job to re-hash existing artifacts if needed
**Current Implementation**: Single algorithm (SHA256), no algorithm versioning required for MVP.
---
## Content-Addressable Storage Model
### Core Principles
1. **Identity = Content**: The artifact ID IS the SHA256 hash of its content
2. **Immutability**: Content cannot change after storage (same hash = same content)
3. **Deduplication**: Same content uploaded twice results in single storage
4. **Metadata Independence**: Files with identical content but different names/types are deduplicated
### Data Model
```
Artifact {
id: VARCHAR(64) PRIMARY KEY -- SHA256 hash (lowercase hex)
size: BIGINT -- File size in bytes
ref_count: INTEGER -- Number of references
s3_key: VARCHAR(1024) -- S3 storage path
checksum_md5: VARCHAR(32) -- Secondary checksum
checksum_sha1: VARCHAR(40) -- Secondary checksum
...
}
Tag {
id: UUID PRIMARY KEY
name: VARCHAR(255)
package_id: UUID FK
artifact_id: VARCHAR(64) FK -- Points to Artifact.id (SHA256)
}
```
### Hash Format
- Algorithm: SHA256
- Output: 64 lowercase hexadecimal characters
- Example: `dffd6021bb2bd5b0af676290809ec3a53191dd81c7f70a4b28688a362182986f`
---
## S3 Key Derivation
### Key Structure
```
fruits/{hash[0:2]}/{hash[2:4]}/{full_hash}
```
Example for hash `dffd6021bb2bd5b0...`:
```
fruits/df/fd/dffd6021bb2bd5b0af676290809ec3a53191dd81c7f70a4b28688a362182986f
```
### Rationale for Prefix Sharding
1. **S3 Performance**: S3 partitions by key prefix. Distributing across prefixes improves throughput.
2. **Filesystem Compatibility**: When using filesystem-backed storage, avoids single directory with millions of files.
3. **Distribution**: With 2-character prefixes (256 combinations each level), provides 65,536 (256 x 256) top-level buckets.
### Bucket Distribution Analysis
Assuming uniformly distributed SHA256 hashes:
| Artifacts | Files per Prefix (avg) | Max per Prefix (99.9%) |
|-----------|------------------------|------------------------|
| 100,000 | 1.5 | 10 |
| 1,000,000 | 15 | 50 |
| 10,000,000 | 152 | 250 |
| 100,000,000 | 1,525 | 2,000 |
The two-level prefix provides excellent distribution up to hundreds of millions of artifacts.
---
## Duplicate Detection Strategy
### Upload Flow
```
┌─────────────────────────────────────────────────────────────────┐
│ UPLOAD REQUEST │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 1. VALIDATE: Check file size limits (min/max) │
│ - Empty files (0 bytes) → Reject with 422 │
│ - Exceeds max_file_size → Reject with 413 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 2. COMPUTE HASH: Stream file through SHA256/MD5/SHA1 │
│ - Use 8MB chunks for memory efficiency │
│ - Single pass for all three hashes │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 3. DERIVE S3 KEY: fruits/{hash[0:2]}/{hash[2:4]}/{hash} │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 4. CHECK EXISTENCE: HEAD request to S3 for derived key │
│ - Retry up to 3 times on transient failures │
└─────────────────────────────────────────────────────────────────┘
┌───────────────┴───────────────┐
▼ ▼
┌─────────────────────────┐ ┌─────────────────────────────────┐
│ EXISTS: Deduplicated │ │ NOT EXISTS: Upload to S3 │
│ - Verify size matches │ │ - PUT object (or multipart) │
│ - Skip S3 upload │ │ - Abort on failure │
│ - Log saved bytes │ └─────────────────────────────────┘
└─────────────────────────┘ │
│ │
└───────────────┬───────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 5. DATABASE: Create/update artifact record │
│ - Use row locking to prevent race conditions │
│ - ref_count managed by SQL triggers │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ 6. CREATE TAG: If tag provided, create/update tag │
│ - SQL trigger increments ref_count │
└─────────────────────────────────────────────────────────────────┘
```
### Hash Computation
**Memory Requirements:**
- Chunk size: 8MB (`HASH_CHUNK_SIZE`)
- Working memory: ~25MB (8MB chunk + hash states)
- Independent of file size (streaming)
**Throughput:**
- SHA256 alone: ~400 MB/s on modern CPU
- With MD5 + SHA1: ~300 MB/s (parallel computation)
- Typical bottleneck: Network I/O, not CPU
### Multipart Upload Threshold
Files larger than 100MB use S3 multipart upload:
- First pass: Stream to compute hashes
- If not duplicate: Seek to start, upload in 10MB parts
- On failure: Abort multipart upload (no orphaned parts)
---
## Reference Counting Lifecycle
### What Constitutes a "Reference"
A reference is a **Tag** pointing to an artifact. Each tag increments the ref_count by 1.
**Uploads do NOT directly increment ref_count** - only tag creation does.
### Lifecycle
```
┌─────────────────────────────────────────────────────────────────┐
│ CREATE: New artifact uploaded │
│ - ref_count = 0 (no tags yet) │
│ - Artifact exists but is "orphaned" │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ TAG CREATED: Tag points to artifact │
│ - SQL trigger: ref_count += 1 │
│ - Artifact is now referenced │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ TAG UPDATED: Tag moved to different artifact │
│ - SQL trigger on old artifact: ref_count -= 1 │
│ - SQL trigger on new artifact: ref_count += 1 │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ TAG DELETED: Tag removed │
│ - SQL trigger: ref_count -= 1 │
│ - If ref_count = 0, artifact is orphaned │
└─────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────┐
│ GARBAGE COLLECTION: Clean up orphaned artifacts │
│ - Triggered manually via admin endpoint │
│ - Finds artifacts where ref_count = 0 │
│ - Deletes from S3 and database │
└─────────────────────────────────────────────────────────────────┘
```
### SQL Triggers
Three triggers manage ref_count automatically:
1. **`tags_ref_count_insert_trigger`**: On tag INSERT, increment target artifact's ref_count
2. **`tags_ref_count_delete_trigger`**: On tag DELETE, decrement target artifact's ref_count
3. **`tags_ref_count_update_trigger`**: On tag UPDATE (artifact_id changed), decrement old, increment new
### Garbage Collection
**Trigger**: Manual admin endpoint (`POST /api/v1/admin/garbage-collect`)
**Process**:
1. Query artifacts where `ref_count = 0`
2. For each orphan:
- Delete from S3 (`DELETE fruits/xx/yy/hash`)
- Delete from database
- Log deletion
**Safety**:
- Dry-run mode by default (`?dry_run=true`)
- Limit per run (`?limit=100`)
- Check constraint prevents ref_count < 0
---
## Edge Cases and Error Handling
### Empty Files
- **Behavior**: Rejected with HTTP 422
- **Reason**: Empty content has deterministic hash but provides no value
- **Error**: "Empty files are not allowed"
### Maximum File Size
- **Default Limit**: 10GB (`ORCHARD_MAX_FILE_SIZE`)
- **Configurable**: Via environment variable
- **Behavior**: Rejected with HTTP 413 before upload begins
- **Error**: "File too large. Maximum size is 10GB"
### Concurrent Upload of Same Content
**Race Condition Scenario**: Two clients upload identical content simultaneously.
**Handling**:
1. **S3 Level**: Both compute same hash, both check existence, both may upload
2. **Database Level**: Row-level locking with `SELECT ... FOR UPDATE`
3. **Outcome**: One creates artifact, other sees it exists, both succeed
4. **Trigger Safety**: SQL triggers are atomic per row
**No Data Corruption**: S3 is eventually consistent; identical content = identical result.
### Upload Interrupted
**Scenario**: Upload fails after hash computed but before S3 write completes.
**Simple Upload**:
- S3 put_object is atomic - either completes or fails entirely
- No cleanup needed
**Multipart Upload**:
- On any failure, `abort_multipart_upload` is called
- S3 cleans up partial parts
- No orphaned data
### DB Exists but S3 Missing
**Detection**: Download request finds artifact in DB but S3 returns 404.
**Current Behavior**: Return 500 error to client.
**Recovery Options** (not yet implemented):
1. Mark artifact for re-upload (set flag, notify admins)
2. Decrement ref_count to trigger garbage collection
3. Return specific error code for client retry
**Recommended**: Log critical alert, return 503 with retry hint.
### S3 Exists but DB Missing
**Detection**: Orphan - file in S3 with no corresponding DB record.
**Cause**:
- Failed transaction after S3 upload
- Manual S3 manipulation
- Database restore from backup
**Recovery**:
- Garbage collection won't delete (no DB record to query)
- Requires S3 bucket scan + DB reconciliation
- Manual admin task (out of scope for MVP)
### Network Timeout During Existence Check
**Behavior**: Retry up to 3 times with adaptive backoff.
**After Retries Exhausted**: Raise `S3ExistenceCheckError`, return 503 to client.
**Rationale**: Don't upload without knowing if duplicate exists (prevents orphans).
---
## Collision Handling
### SHA256 Collision Probability
For random inputs, the probability of collision is:
```
P(collision) ≈ n² / 2^257
Where n = number of unique files
```
| Files | Collision Probability |
|-------|----------------------|
| 10^9 (1 billion) | 10^-59 |
| 10^12 (1 trillion) | 10^-53 |
| 10^18 | 10^-41 |
**Practical Assessment**: You would need to store more files than atoms in the observable universe to have meaningful collision risk.
### Detection Mechanism
Despite near-zero probability, we detect potential collisions by:
1. **Size Comparison**: If hash matches but sizes differ, CRITICAL alert
2. **ETag Verification**: S3 ETag provides secondary check
### Handling Procedure
If collision detected (size mismatch):
1. **Log CRITICAL alert** with full details
2. **Reject upload** with 500 error
3. **Do NOT overwrite** existing content
4. **Notify operations** for manual investigation
```python
raise HashCollisionError(
f"Hash collision detected for {sha256_hash}: size mismatch"
)
```
### MVP Position
For MVP, we:
- Detect collisions via size mismatch
- Log and alert on detection
- Reject conflicting upload
- Accept that true collisions are practically impossible
No active mitigation (e.g., storing hash + size as composite key) is needed.
---
## Performance Considerations
### Hash Computation Overhead
| File Size | Hash Time | Upload Time (100 Mbps) | Overhead |
|-----------|-----------|------------------------|----------|
| 10 MB | 25ms | 800ms | 3% |
| 100 MB | 250ms | 8s | 3% |
| 1 GB | 2.5s | 80s | 3% |
| 10 GB | 25s | 800s | 3% |
**Conclusion**: Hash computation adds ~3% overhead regardless of file size. Network I/O dominates.
### Existence Check Overhead
- S3 HEAD request: ~50-100ms per call
- Cached in future: Could use Redis/memory cache for hot paths
- Current MVP: No caching (acceptable for expected load)
### Deduplication Savings
Example with 50% duplication rate:
| Metric | Without Dedup | With Dedup | Savings |
|--------|---------------|------------|---------|
| Storage (100K files, 10MB avg) | 1 TB | 500 GB | 50% |
| Upload bandwidth | 1 TB | 500 GB | 50% |
| S3 costs | $23/mo | $11.50/mo | 50% |
---
## Operations Runbook
### Monitoring Deduplication
```bash
# View deduplication stats
curl http://orchard:8080/api/v1/stats/deduplication
# Response includes:
# - deduplication_ratio
# - total_uploads, deduplicated_uploads
# - bytes_saved
```
### Checking for Orphaned Artifacts
```bash
# List orphaned artifacts (ref_count = 0)
curl http://orchard:8080/api/v1/admin/orphaned-artifacts
# Dry-run garbage collection
curl -X POST "http://orchard:8080/api/v1/admin/garbage-collect?dry_run=true"
# Execute garbage collection
curl -X POST "http://orchard:8080/api/v1/admin/garbage-collect?dry_run=false"
```
### Verifying Artifact Integrity
```bash
# Download and verify hash matches artifact ID
ARTIFACT_ID="dffd6021bb2bd5b0af676290809ec3a53191dd81c7f70a4b28688a362182986f"
curl -O http://orchard:8080/api/v1/artifact/$ARTIFACT_ID/download
COMPUTED=$(sha256sum downloaded_file | cut -d' ' -f1)
[ "$ARTIFACT_ID" = "$COMPUTED" ] && echo "OK" || echo "INTEGRITY FAILURE"
```
### Troubleshooting
| Symptom | Likely Cause | Resolution |
|---------|--------------|------------|
| "Hash computation error" | Empty file or read error | Check file content, retry |
| "Storage unavailable" | S3/MinIO down | Check S3 health, retry |
| "File too large" | Exceeds max_file_size | Adjust config or use chunked upload |
| "Hash collision detected" | Extremely rare | Investigate, do not ignore |
| Orphaned artifacts accumulating | Tags deleted, no GC run | Run garbage collection |
| Download returns 404 | S3 object missing | Check S3 bucket, restore from backup |
### Configuration Reference
| Variable | Default | Description |
|----------|---------|-------------|
| `ORCHARD_MAX_FILE_SIZE` | 10GB | Maximum upload size |
| `ORCHARD_MIN_FILE_SIZE` | 1 | Minimum upload size (rejects empty) |
| `ORCHARD_S3_MAX_RETRIES` | 3 | Retry attempts for S3 operations |
| `ORCHARD_S3_CONNECT_TIMEOUT` | 10s | S3 connection timeout |
| `ORCHARD_S3_READ_TIMEOUT` | 60s | S3 read timeout |
---
## Appendix: Decision Records
### ADR-001: SHA256 for Content Hashing
**Status**: Accepted
**Context**: Need deterministic content identifier for deduplication.
**Decision**: Use SHA256.
**Rationale**:
- Cryptographically strong (no known attacks)
- Universal adoption (Git, Docker, npm)
- Sufficient speed for I/O-bound workloads
- Excellent tooling
**Consequences**:
- 64-character artifact IDs (longer than UUIDs)
- CPU overhead ~3% of upload time
- Future algorithm migration requires versioning
### ADR-002: Whole-File Deduplication Only
**Status**: Accepted
**Context**: Could implement chunk-level deduplication for better savings.
**Decision**: Whole-file only for MVP.
**Rationale**:
- Simpler implementation
- No chunking algorithm complexity
- Sufficient for build artifact use case
- Can add chunk-level later if needed
**Consequences**:
- Files with partial overlap stored entirely
- Large files with small changes not deduplicated
- Acceptable for binary artifact workloads
### ADR-003: SQL Triggers for ref_count
**Status**: Accepted
**Context**: ref_count must be accurate for garbage collection.
**Decision**: Use PostgreSQL triggers, not application code.
**Rationale**:
- Atomic with tag operations
- Cannot be bypassed
- Works regardless of client (API, direct SQL, migrations)
- Simpler application code
**Consequences**:
- Trigger logic in SQL (less visible)
- Must maintain triggers across schema changes
- Debugging requires database access