4 Commits

Author SHA1 Message Date
Mondo Diaz
d2abfe671a Update CHANGELOG for issues #33, #34, #35, #36 2026-01-05 11:31:55 -06:00
Mondo Diaz
109677e43a Add storage abstraction, stats endpoints, garbage collection, and test infrastructure
- Add StorageBackend protocol for backend-agnostic storage interface
- Add health check with storage and database connectivity verification
- Add garbage collection endpoints for orphaned artifacts (ref_count=0)
- Add deduplication statistics endpoints (/api/v1/stats, /stats/storage, /stats/deduplication)
- Add per-project statistics endpoint
- Add verify_integrity method for post-upload hash validation
- Set up pytest infrastructure with mock S3 client
- Add unit tests for hash calculation and duplicate detection
2026-01-05 11:16:46 -06:00
Mondo Diaz
dbe78ded2f Fix double-counting: let SQL triggers manage ref_count
Previously both Python code AND SQL triggers were incrementing/decrementing
ref_count, causing inconsistent values. Now:

- New artifacts start with ref_count=0 (triggers increment on tag creation)
- Tag INSERT/UPDATE/DELETE triggers handle all ref_count changes
- Python code only logs operations, doesn't manipulate ref_count
- Delete endpoints rely on cascade + triggers for ref_count management

Tested: new uploads, duplicates, tag deletion, package deletion (cascade),
project deletion (cascade) - all ref_count values now correct.
2026-01-05 10:18:23 -06:00
Mondo Diaz
865812af98 Add ref_count management for deletions with atomic operations and error handling
- Add DELETE endpoints for tags, packages, and projects with proper ref_count
  decrements for all affected artifacts
- Implement atomic ref_count operations using SELECT FOR UPDATE row-level locking
  to prevent race conditions
- Add custom storage exceptions (HashComputationError, S3ExistenceCheckError,
  S3UploadError) with retry logic for S3 existence checks
- Handle race conditions in upload by locking artifact row before modification
- Add comprehensive logging for all ref_count changes and deduplication events
- Include ref_count in upload response schema
2026-01-05 10:04:59 -06:00
10 changed files with 2502 additions and 320 deletions

View File

@@ -6,6 +6,20 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased] ## [Unreleased]
### Added
- Added `StorageBackend` protocol/interface for backend-agnostic storage (#33)
- Added `health_check()` method to storage backend with `/health` endpoint integration (#33)
- Added `verify_integrity()` method for post-upload hash validation (#33)
- Added garbage collection endpoint `POST /api/v1/admin/garbage-collect` for orphaned artifacts (#36)
- Added orphaned artifacts listing endpoint `GET /api/v1/admin/orphaned-artifacts` (#36)
- Added global storage statistics endpoint `GET /api/v1/stats` (#34)
- Added storage breakdown endpoint `GET /api/v1/stats/storage` (#34)
- Added deduplication metrics endpoint `GET /api/v1/stats/deduplication` (#34)
- Added per-project statistics endpoint `GET /api/v1/projects/{project}/stats` (#34)
- Added pytest infrastructure with mock S3 client for unit testing (#35)
- Added unit tests for SHA256 hash calculation (#35)
- Added unit tests for duplicate detection and deduplication behavior (#35)
- Added test dependencies to requirements.txt (pytest, pytest-asyncio, pytest-cov, httpx, moto) (#35)
### Fixed ### Fixed
- Fixed Helm chart `minio.ingress` conflicting with Bitnami MinIO subchart by renaming to `minioIngress` (#48) - Fixed Helm chart `minio.ingress` conflicting with Bitnami MinIO subchart by renaming to `minioIngress` (#48)

File diff suppressed because it is too large Load Diff

View File

@@ -40,8 +40,28 @@ class ProjectResponse(BaseModel):
# Package format and platform enums # Package format and platform enums
PACKAGE_FORMATS = ["generic", "npm", "pypi", "docker", "deb", "rpm", "maven", "nuget", "helm"] PACKAGE_FORMATS = [
PACKAGE_PLATFORMS = ["any", "linux", "darwin", "windows", "linux-amd64", "linux-arm64", "darwin-amd64", "darwin-arm64", "windows-amd64"] "generic",
"npm",
"pypi",
"docker",
"deb",
"rpm",
"maven",
"nuget",
"helm",
]
PACKAGE_PLATFORMS = [
"any",
"linux",
"darwin",
"windows",
"linux-amd64",
"linux-arm64",
"darwin-amd64",
"darwin-arm64",
"windows-amd64",
]
# Package schemas # Package schemas
@@ -68,6 +88,7 @@ class PackageResponse(BaseModel):
class TagSummary(BaseModel): class TagSummary(BaseModel):
"""Lightweight tag info for embedding in package responses""" """Lightweight tag info for embedding in package responses"""
name: str name: str
artifact_id: str artifact_id: str
created_at: datetime created_at: datetime
@@ -75,6 +96,7 @@ class TagSummary(BaseModel):
class PackageDetailResponse(BaseModel): class PackageDetailResponse(BaseModel):
"""Package with aggregated metadata""" """Package with aggregated metadata"""
id: UUID id: UUID
project_id: UUID project_id: UUID
name: str name: str
@@ -135,6 +157,7 @@ class TagResponse(BaseModel):
class TagDetailResponse(BaseModel): class TagDetailResponse(BaseModel):
"""Tag with embedded artifact metadata""" """Tag with embedded artifact metadata"""
id: UUID id: UUID
package_id: UUID package_id: UUID
name: str name: str
@@ -154,6 +177,7 @@ class TagDetailResponse(BaseModel):
class TagHistoryResponse(BaseModel): class TagHistoryResponse(BaseModel):
"""History entry for tag changes""" """History entry for tag changes"""
id: UUID id: UUID
tag_id: UUID tag_id: UUID
old_artifact_id: Optional[str] old_artifact_id: Optional[str]
@@ -167,6 +191,7 @@ class TagHistoryResponse(BaseModel):
class ArtifactTagInfo(BaseModel): class ArtifactTagInfo(BaseModel):
"""Tag info for embedding in artifact responses""" """Tag info for embedding in artifact responses"""
id: UUID id: UUID
name: str name: str
package_id: UUID package_id: UUID
@@ -176,6 +201,7 @@ class ArtifactTagInfo(BaseModel):
class ArtifactDetailResponse(BaseModel): class ArtifactDetailResponse(BaseModel):
"""Artifact with list of tags/packages referencing it""" """Artifact with list of tags/packages referencing it"""
id: str id: str
sha256: str # Explicit SHA256 field (same as id) sha256: str # Explicit SHA256 field (same as id)
size: int size: int
@@ -196,6 +222,7 @@ class ArtifactDetailResponse(BaseModel):
class PackageArtifactResponse(BaseModel): class PackageArtifactResponse(BaseModel):
"""Artifact with tags for package artifact listing""" """Artifact with tags for package artifact listing"""
id: str id: str
sha256: str # Explicit SHA256 field (same as id) sha256: str # Explicit SHA256 field (same as id)
size: int size: int
@@ -226,11 +253,13 @@ class UploadResponse(BaseModel):
s3_etag: Optional[str] = None s3_etag: Optional[str] = None
format_metadata: Optional[Dict[str, Any]] = None format_metadata: Optional[Dict[str, Any]] = None
deduplicated: bool = False deduplicated: bool = False
ref_count: int = 1 # Current reference count after this upload
# Resumable upload schemas # Resumable upload schemas
class ResumableUploadInitRequest(BaseModel): class ResumableUploadInitRequest(BaseModel):
"""Request to initiate a resumable upload""" """Request to initiate a resumable upload"""
expected_hash: str # SHA256 hash of the file (client must compute) expected_hash: str # SHA256 hash of the file (client must compute)
filename: str filename: str
content_type: Optional[str] = None content_type: Optional[str] = None
@@ -240,6 +269,7 @@ class ResumableUploadInitRequest(BaseModel):
class ResumableUploadInitResponse(BaseModel): class ResumableUploadInitResponse(BaseModel):
"""Response from initiating a resumable upload""" """Response from initiating a resumable upload"""
upload_id: Optional[str] # None if file already exists upload_id: Optional[str] # None if file already exists
already_exists: bool already_exists: bool
artifact_id: Optional[str] = None # Set if already_exists is True artifact_id: Optional[str] = None # Set if already_exists is True
@@ -248,17 +278,20 @@ class ResumableUploadInitResponse(BaseModel):
class ResumableUploadPartResponse(BaseModel): class ResumableUploadPartResponse(BaseModel):
"""Response from uploading a part""" """Response from uploading a part"""
part_number: int part_number: int
etag: str etag: str
class ResumableUploadCompleteRequest(BaseModel): class ResumableUploadCompleteRequest(BaseModel):
"""Request to complete a resumable upload""" """Request to complete a resumable upload"""
tag: Optional[str] = None tag: Optional[str] = None
class ResumableUploadCompleteResponse(BaseModel): class ResumableUploadCompleteResponse(BaseModel):
"""Response from completing a resumable upload""" """Response from completing a resumable upload"""
artifact_id: str artifact_id: str
size: int size: int
project: str project: str
@@ -268,6 +301,7 @@ class ResumableUploadCompleteResponse(BaseModel):
class ResumableUploadStatusResponse(BaseModel): class ResumableUploadStatusResponse(BaseModel):
"""Status of a resumable upload""" """Status of a resumable upload"""
upload_id: str upload_id: str
uploaded_parts: List[int] uploaded_parts: List[int]
total_uploaded_bytes: int total_uploaded_bytes: int
@@ -288,6 +322,7 @@ class ConsumerResponse(BaseModel):
# Global search schemas # Global search schemas
class SearchResultProject(BaseModel): class SearchResultProject(BaseModel):
"""Project result for global search""" """Project result for global search"""
id: UUID id: UUID
name: str name: str
description: Optional[str] description: Optional[str]
@@ -299,6 +334,7 @@ class SearchResultProject(BaseModel):
class SearchResultPackage(BaseModel): class SearchResultPackage(BaseModel):
"""Package result for global search""" """Package result for global search"""
id: UUID id: UUID
project_id: UUID project_id: UUID
project_name: str project_name: str
@@ -312,6 +348,7 @@ class SearchResultPackage(BaseModel):
class SearchResultArtifact(BaseModel): class SearchResultArtifact(BaseModel):
"""Artifact/tag result for global search""" """Artifact/tag result for global search"""
tag_id: UUID tag_id: UUID
tag_name: str tag_name: str
artifact_id: str artifact_id: str
@@ -323,6 +360,7 @@ class SearchResultArtifact(BaseModel):
class GlobalSearchResponse(BaseModel): class GlobalSearchResponse(BaseModel):
"""Combined search results across all entity types""" """Combined search results across all entity types"""
query: str query: str
projects: List[SearchResultProject] projects: List[SearchResultProject]
packages: List[SearchResultPackage] packages: List[SearchResultPackage]
@@ -333,6 +371,7 @@ class GlobalSearchResponse(BaseModel):
# Presigned URL response # Presigned URL response
class PresignedUrlResponse(BaseModel): class PresignedUrlResponse(BaseModel):
"""Response containing a presigned URL for direct S3 download""" """Response containing a presigned URL for direct S3 download"""
url: str url: str
expires_at: datetime expires_at: datetime
method: str = "GET" method: str = "GET"
@@ -348,3 +387,72 @@ class PresignedUrlResponse(BaseModel):
class HealthResponse(BaseModel): class HealthResponse(BaseModel):
status: str status: str
version: str = "1.0.0" version: str = "1.0.0"
storage_healthy: Optional[bool] = None
database_healthy: Optional[bool] = None
# Garbage collection schemas
class GarbageCollectionResponse(BaseModel):
"""Response from garbage collection operation"""
artifacts_deleted: int
bytes_freed: int
artifact_ids: List[str]
dry_run: bool
class OrphanedArtifactResponse(BaseModel):
"""Information about an orphaned artifact"""
id: str
size: int
created_at: datetime
created_by: str
original_name: Optional[str]
# Storage statistics schemas
class StorageStatsResponse(BaseModel):
"""Global storage statistics"""
total_artifacts: int
total_size_bytes: int
unique_artifacts: int # Artifacts with ref_count > 0
orphaned_artifacts: int # Artifacts with ref_count = 0
orphaned_size_bytes: int
total_uploads: int
deduplicated_uploads: int
deduplication_ratio: (
float # total_uploads / unique_artifacts (if > 1, deduplication is working)
)
storage_saved_bytes: int # Bytes saved through deduplication
class DeduplicationStatsResponse(BaseModel):
"""Deduplication effectiveness statistics"""
total_logical_bytes: (
int # Sum of all upload sizes (what would be stored without dedup)
)
total_physical_bytes: int # Actual storage used
bytes_saved: int
savings_percentage: float
total_uploads: int
unique_artifacts: int
duplicate_uploads: int
average_ref_count: float
max_ref_count: int
most_referenced_artifacts: List[Dict[str, Any]] # Top N most referenced
class ProjectStatsResponse(BaseModel):
"""Per-project statistics"""
project_id: str
project_name: str
package_count: int
tag_count: int
artifact_count: int
total_size_bytes: int
upload_count: int
deduplicated_uploads: int

View File

@@ -1,6 +1,17 @@
import hashlib import hashlib
import logging import logging
from typing import BinaryIO, Tuple, Optional, Dict, Any, Generator, NamedTuple from abc import ABC, abstractmethod
from typing import (
BinaryIO,
Tuple,
Optional,
Dict,
Any,
Generator,
NamedTuple,
Protocol,
runtime_checkable,
)
import boto3 import boto3
from botocore.config import Config from botocore.config import Config
from botocore.exceptions import ClientError from botocore.exceptions import ClientError
@@ -10,16 +21,170 @@ from .config import get_settings
settings = get_settings() settings = get_settings()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# =============================================================================
# Storage Backend Protocol/Interface (ISSUE 33)
# =============================================================================
@runtime_checkable
class StorageBackend(Protocol):
"""
Abstract protocol defining the interface for storage backends.
All storage implementations (S3, MinIO, future backends) must implement
this interface to ensure consistent behavior across the application.
Note on Deduplication:
- This system uses whole-file deduplication based on SHA256 hash
- Partial/chunk-level deduplication is NOT supported (out of scope for MVP)
- Files with identical content but different metadata are deduplicated
"""
def store(
self, file: BinaryIO, content_length: Optional[int] = None
) -> "StorageResult":
"""
Store a file and return StorageResult with all checksums.
Content-addressable: if the file already exists (by hash), just return
the existing hash without uploading again.
Args:
file: File-like object to store
content_length: Optional hint for file size (enables multipart upload)
Returns:
StorageResult with sha256, size, s3_key, and optional checksums
Raises:
HashComputationError: If hash computation fails
S3ExistenceCheckError: If existence check fails after retries
S3UploadError: If upload fails
"""
...
def get(self, s3_key: str) -> bytes:
"""
Retrieve a file by its storage key.
Args:
s3_key: The storage key (path) of the file
Returns:
File content as bytes
"""
...
def get_stream(
self, s3_key: str, range_header: Optional[str] = None
) -> Tuple[Any, int, Optional[str]]:
"""
Get a streaming response for a file.
Supports range requests for partial downloads.
Args:
s3_key: The storage key of the file
range_header: Optional HTTP Range header value
Returns:
Tuple of (stream, content_length, content_range)
"""
...
def delete(self, s3_key: str) -> bool:
"""
Delete a file from storage.
Args:
s3_key: The storage key of the file to delete
Returns:
True if deleted successfully, False otherwise
"""
...
def get_object_info(self, s3_key: str) -> Optional[Dict[str, Any]]:
"""
Get object metadata without downloading content.
Args:
s3_key: The storage key of the file
Returns:
Dict with size, content_type, last_modified, etag, or None if not found
"""
...
def generate_presigned_url(
self,
s3_key: str,
expiry: Optional[int] = None,
response_content_type: Optional[str] = None,
response_content_disposition: Optional[str] = None,
) -> str:
"""
Generate a presigned URL for downloading an object.
Args:
s3_key: The storage key of the file
expiry: URL expiry in seconds
response_content_type: Override Content-Type header in response
response_content_disposition: Override Content-Disposition header
Returns:
Presigned URL string
"""
...
def health_check(self) -> bool:
"""
Check if the storage backend is healthy and accessible.
Returns:
True if healthy, False otherwise
"""
...
# Threshold for multipart upload (100MB) # Threshold for multipart upload (100MB)
MULTIPART_THRESHOLD = 100 * 1024 * 1024 MULTIPART_THRESHOLD = 100 * 1024 * 1024
# Chunk size for multipart upload (10MB) # Chunk size for multipart upload (10MB)
MULTIPART_CHUNK_SIZE = 10 * 1024 * 1024 MULTIPART_CHUNK_SIZE = 10 * 1024 * 1024
# Chunk size for streaming hash computation # Chunk size for streaming hash computation
HASH_CHUNK_SIZE = 8 * 1024 * 1024 HASH_CHUNK_SIZE = 8 * 1024 * 1024
# Maximum retries for S3 existence check
MAX_EXISTENCE_CHECK_RETRIES = 3
class StorageError(Exception):
"""Base exception for storage operations"""
pass
class HashComputationError(StorageError):
"""Raised when hash computation fails"""
pass
class S3ExistenceCheckError(StorageError):
"""Raised when S3 existence check fails after retries"""
pass
class S3UploadError(StorageError):
"""Raised when S3 upload fails"""
pass
class StorageResult(NamedTuple): class StorageResult(NamedTuple):
"""Result of storing a file with all computed checksums""" """Result of storing a file with all computed checksums"""
sha256: str sha256: str
size: int size: int
s3_key: str s3_key: str
@@ -30,7 +195,9 @@ class StorageResult(NamedTuple):
class S3Storage: class S3Storage:
def __init__(self): def __init__(self):
config = Config(s3={"addressing_style": "path"} if settings.s3_use_path_style else {}) config = Config(
s3={"addressing_style": "path"} if settings.s3_use_path_style else {}
)
self.client = boto3.client( self.client = boto3.client(
"s3", "s3",
@@ -44,7 +211,9 @@ class S3Storage:
# Store active multipart uploads for resumable support # Store active multipart uploads for resumable support
self._active_uploads: Dict[str, Dict[str, Any]] = {} self._active_uploads: Dict[str, Dict[str, Any]] = {}
def store(self, file: BinaryIO, content_length: Optional[int] = None) -> StorageResult: def store(
self, file: BinaryIO, content_length: Optional[int] = None
) -> StorageResult:
""" """
Store a file and return StorageResult with all checksums. Store a file and return StorageResult with all checksums.
Content-addressable: if the file already exists, just return the hash. Content-addressable: if the file already exists, just return the hash.
@@ -57,25 +226,54 @@ class S3Storage:
return self._store_multipart(file, content_length) return self._store_multipart(file, content_length)
def _store_simple(self, file: BinaryIO) -> StorageResult: def _store_simple(self, file: BinaryIO) -> StorageResult:
"""Store a small file using simple put_object""" """
# Read file and compute all hashes Store a small file using simple put_object.
Raises:
HashComputationError: If hash computation fails
S3ExistenceCheckError: If S3 existence check fails after retries
S3UploadError: If S3 upload fails
"""
# Read file and compute all hashes with error handling
try:
content = file.read() content = file.read()
if not content:
raise HashComputationError("Empty file content")
sha256_hash = hashlib.sha256(content).hexdigest() sha256_hash = hashlib.sha256(content).hexdigest()
md5_hash = hashlib.md5(content).hexdigest() md5_hash = hashlib.md5(content).hexdigest()
sha1_hash = hashlib.sha1(content).hexdigest() sha1_hash = hashlib.sha1(content).hexdigest()
size = len(content) size = len(content)
except HashComputationError:
raise
except Exception as e:
logger.error(f"Hash computation failed: {e}")
raise HashComputationError(f"Failed to compute hash: {e}") from e
# Check if already exists # Check if already exists (with retry logic)
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}" s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
s3_etag = None s3_etag = None
if not self._exists(s3_key): try:
exists = self._exists(s3_key)
except S3ExistenceCheckError:
# Re-raise the specific error
raise
except Exception as e:
logger.error(f"Unexpected error during S3 existence check: {e}")
raise S3ExistenceCheckError(f"Failed to check S3 existence: {e}") from e
if not exists:
try:
response = self.client.put_object( response = self.client.put_object(
Bucket=self.bucket, Bucket=self.bucket,
Key=s3_key, Key=s3_key,
Body=content, Body=content,
) )
s3_etag = response.get("ETag", "").strip('"') s3_etag = response.get("ETag", "").strip('"')
except ClientError as e:
logger.error(f"S3 upload failed: {e}")
raise S3UploadError(f"Failed to upload to S3: {e}") from e
else: else:
# Get existing ETag # Get existing ETag
obj_info = self.get_object_info(s3_key) obj_info = self.get_object_info(s3_key)
@@ -92,8 +290,16 @@ class S3Storage:
) )
def _store_multipart(self, file: BinaryIO, content_length: int) -> StorageResult: def _store_multipart(self, file: BinaryIO, content_length: int) -> StorageResult:
"""Store a large file using S3 multipart upload with streaming hash computation""" """
Store a large file using S3 multipart upload with streaming hash computation.
Raises:
HashComputationError: If hash computation fails
S3ExistenceCheckError: If S3 existence check fails after retries
S3UploadError: If S3 upload fails
"""
# First pass: compute all hashes by streaming through file # First pass: compute all hashes by streaming through file
try:
sha256_hasher = hashlib.sha256() sha256_hasher = hashlib.sha256()
md5_hasher = hashlib.md5() md5_hasher = hashlib.md5()
sha1_hasher = hashlib.sha1() sha1_hasher = hashlib.sha1()
@@ -109,13 +315,30 @@ class S3Storage:
sha1_hasher.update(chunk) sha1_hasher.update(chunk)
size += len(chunk) size += len(chunk)
if size == 0:
raise HashComputationError("Empty file content")
sha256_hash = sha256_hasher.hexdigest() sha256_hash = sha256_hasher.hexdigest()
md5_hash = md5_hasher.hexdigest() md5_hash = md5_hasher.hexdigest()
sha1_hash = sha1_hasher.hexdigest() sha1_hash = sha1_hasher.hexdigest()
except HashComputationError:
raise
except Exception as e:
logger.error(f"Hash computation failed for multipart upload: {e}")
raise HashComputationError(f"Failed to compute hash: {e}") from e
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}" s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
# Check if already exists (deduplication) # Check if already exists (deduplication) with retry logic
if self._exists(s3_key): try:
exists = self._exists(s3_key)
except S3ExistenceCheckError:
raise
except Exception as e:
logger.error(f"Unexpected error during S3 existence check: {e}")
raise S3ExistenceCheckError(f"Failed to check S3 existence: {e}") from e
if exists:
obj_info = self.get_object_info(s3_key) obj_info = self.get_object_info(s3_key)
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
return StorageResult( return StorageResult(
@@ -150,10 +373,12 @@ class S3Storage:
PartNumber=part_number, PartNumber=part_number,
Body=chunk, Body=chunk,
) )
parts.append({ parts.append(
{
"PartNumber": part_number, "PartNumber": part_number,
"ETag": response["ETag"], "ETag": response["ETag"],
}) }
)
part_number += 1 part_number += 1
# Complete multipart upload # Complete multipart upload
@@ -226,7 +451,9 @@ class S3Storage:
# Upload based on size # Upload based on size
if size < MULTIPART_THRESHOLD: if size < MULTIPART_THRESHOLD:
content = b"".join(all_chunks) content = b"".join(all_chunks)
response = self.client.put_object(Bucket=self.bucket, Key=s3_key, Body=content) response = self.client.put_object(
Bucket=self.bucket, Key=s3_key, Body=content
)
s3_etag = response.get("ETag", "").strip('"') s3_etag = response.get("ETag", "").strip('"')
else: else:
# Use multipart for large files # Use multipart for large files
@@ -251,10 +478,12 @@ class S3Storage:
PartNumber=part_number, PartNumber=part_number,
Body=part_data, Body=part_data,
) )
parts.append({ parts.append(
{
"PartNumber": part_number, "PartNumber": part_number,
"ETag": response["ETag"], "ETag": response["ETag"],
}) }
)
part_number += 1 part_number += 1
# Upload remaining buffer # Upload remaining buffer
@@ -266,10 +495,12 @@ class S3Storage:
PartNumber=part_number, PartNumber=part_number,
Body=buffer, Body=buffer,
) )
parts.append({ parts.append(
{
"PartNumber": part_number, "PartNumber": part_number,
"ETag": response["ETag"], "ETag": response["ETag"],
}) }
)
complete_response = self.client.complete_multipart_upload( complete_response = self.client.complete_multipart_upload(
Bucket=self.bucket, Bucket=self.bucket,
@@ -326,7 +557,9 @@ class S3Storage:
self._active_uploads[upload_id] = session self._active_uploads[upload_id] = session
return session return session
def upload_part(self, upload_id: str, part_number: int, data: bytes) -> Dict[str, Any]: def upload_part(
self, upload_id: str, part_number: int, data: bytes
) -> Dict[str, Any]:
""" """
Upload a part for a resumable upload. Upload a part for a resumable upload.
Returns part info including ETag. Returns part info including ETag.
@@ -434,14 +667,51 @@ class S3Storage:
except ClientError: except ClientError:
return None return None
def _exists(self, s3_key: str) -> bool: def _exists(self, s3_key: str, retry: bool = True) -> bool:
"""Check if an object exists""" """
Check if an object exists with optional retry logic.
Args:
s3_key: The S3 key to check
retry: Whether to retry on transient failures (default: True)
Returns:
True if object exists, False otherwise
Raises:
S3ExistenceCheckError: If all retries fail due to non-404 errors
"""
import time
max_retries = MAX_EXISTENCE_CHECK_RETRIES if retry else 1
last_error = None
for attempt in range(max_retries):
try: try:
self.client.head_object(Bucket=self.bucket, Key=s3_key) self.client.head_object(Bucket=self.bucket, Key=s3_key)
return True return True
except ClientError: except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
# 404 means object doesn't exist - not an error
if error_code in ("404", "NoSuchKey"):
return False return False
# For other errors, retry
last_error = e
if attempt < max_retries - 1:
logger.warning(
f"S3 existence check failed (attempt {attempt + 1}/{max_retries}): {e}"
)
time.sleep(0.1 * (attempt + 1)) # Exponential backoff
# All retries failed
logger.error(
f"S3 existence check failed after {max_retries} attempts: {last_error}"
)
raise S3ExistenceCheckError(
f"Failed to check S3 object existence after {max_retries} attempts: {last_error}"
)
def delete(self, s3_key: str) -> bool: def delete(self, s3_key: str) -> bool:
"""Delete an object""" """Delete an object"""
try: try:
@@ -490,12 +760,68 @@ class S3Storage:
) )
return url return url
def health_check(self) -> bool:
"""
Check if the storage backend is healthy and accessible.
Performs a lightweight HEAD request on the bucket to verify connectivity.
Returns:
True if healthy, False otherwise
"""
try:
self.client.head_bucket(Bucket=self.bucket)
return True
except ClientError as e:
logger.warning(f"Storage health check failed: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error during storage health check: {e}")
return False
def verify_integrity(self, s3_key: str, expected_sha256: str) -> bool:
"""
Verify the integrity of a stored object by downloading and re-hashing.
This is an expensive operation and should only be used for critical
verification scenarios.
Args:
s3_key: The storage key of the file
expected_sha256: The expected SHA256 hash
Returns:
True if hash matches, False otherwise
"""
try:
content = self.get(s3_key)
actual_hash = hashlib.sha256(content).hexdigest()
if actual_hash != expected_sha256:
logger.error(
f"Integrity verification failed for {s3_key}: "
f"expected {expected_sha256[:12]}..., got {actual_hash[:12]}..."
)
return False
return True
except Exception as e:
logger.error(f"Error during integrity verification for {s3_key}: {e}")
return False
# Singleton instance # Singleton instance
_storage = None _storage: Optional[S3Storage] = None
def get_storage() -> S3Storage: def get_storage() -> StorageBackend:
"""
Get the configured storage backend instance.
Currently returns S3Storage (works with S3-compatible backends like MinIO).
Future implementations may support backend selection via configuration.
Returns:
StorageBackend instance
"""
global _storage global _storage
if _storage is None: if _storage is None:
_storage = S3Storage() _storage = S3Storage()

14
backend/pytest.ini Normal file
View File

@@ -0,0 +1,14 @@
[pytest]
testpaths = tests
python_files = test_*.py
python_functions = test_*
python_classes = Test*
asyncio_mode = auto
addopts = -v --tb=short
filterwarnings =
ignore::DeprecationWarning
ignore::UserWarning
markers =
unit: Unit tests (no external dependencies)
integration: Integration tests (require database/storage)
slow: Slow tests (skip with -m "not slow")

View File

@@ -9,3 +9,10 @@ pydantic==2.5.3
pydantic-settings==2.1.0 pydantic-settings==2.1.0
python-jose[cryptography]==3.3.0 python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4 passlib[bcrypt]==1.7.4
# Test dependencies
pytest>=7.4.0
pytest-asyncio>=0.21.0
pytest-cov>=4.1.0
httpx>=0.25.0
moto[s3]>=4.2.0

View File

@@ -0,0 +1 @@
# Test package

201
backend/tests/conftest.py Normal file
View File

@@ -0,0 +1,201 @@
"""
Test configuration and fixtures for Orchard backend tests.
This module provides:
- Database fixtures with test isolation
- Mock S3 storage using moto
- Test data factories for common scenarios
"""
import os
import pytest
import hashlib
from typing import Generator, BinaryIO
from unittest.mock import MagicMock, patch
import io
# Set test environment before importing app modules
os.environ["ORCHARD_DATABASE_HOST"] = "localhost"
os.environ["ORCHARD_DATABASE_PORT"] = "5432"
os.environ["ORCHARD_DATABASE_USER"] = "test"
os.environ["ORCHARD_DATABASE_PASSWORD"] = "test"
os.environ["ORCHARD_DATABASE_DBNAME"] = "orchard_test"
os.environ["ORCHARD_S3_ENDPOINT"] = "http://localhost:9000"
os.environ["ORCHARD_S3_BUCKET"] = "test-bucket"
os.environ["ORCHARD_S3_ACCESS_KEY_ID"] = "test"
os.environ["ORCHARD_S3_SECRET_ACCESS_KEY"] = "test"
# =============================================================================
# Test Data Factories
# =============================================================================
def create_test_file(content: bytes = None, size: int = 1024) -> io.BytesIO:
"""
Create a test file with known content.
Args:
content: Specific content to use, or None to generate random-ish content
size: Size of generated content if content is None
Returns:
BytesIO object with the content
"""
if content is None:
content = os.urandom(size)
return io.BytesIO(content)
def compute_sha256(content: bytes) -> str:
"""Compute SHA256 hash of content as lowercase hex string."""
return hashlib.sha256(content).hexdigest()
def compute_md5(content: bytes) -> str:
"""Compute MD5 hash of content as lowercase hex string."""
return hashlib.md5(content).hexdigest()
def compute_sha1(content: bytes) -> str:
"""Compute SHA1 hash of content as lowercase hex string."""
return hashlib.sha1(content).hexdigest()
# Known test data with pre-computed hashes
TEST_CONTENT_HELLO = b"Hello, World!"
TEST_HASH_HELLO = "dffd6021bb2bd5b0af676290809ec3a53191dd81c7f70a4b28688a362182986f"
TEST_MD5_HELLO = "65a8e27d8879283831b664bd8b7f0ad4"
TEST_SHA1_HELLO = "0a0a9f2a6772942557ab5355d76af442f8f65e01"
TEST_CONTENT_EMPTY = b""
# Note: Empty content should be rejected by the storage layer
TEST_CONTENT_BINARY = bytes(range(256))
TEST_HASH_BINARY = compute_sha256(TEST_CONTENT_BINARY)
# =============================================================================
# Mock Storage Fixtures
# =============================================================================
class MockS3Client:
"""Mock S3 client for unit testing without actual S3/MinIO."""
def __init__(self):
self.objects = {} # key -> content
self.bucket = "test-bucket"
def put_object(self, Bucket: str, Key: str, Body: bytes) -> dict:
self.objects[Key] = Body
return {"ETag": f'"{compute_md5(Body)}"'}
def get_object(self, Bucket: str, Key: str, **kwargs) -> dict:
if Key not in self.objects:
raise Exception("NoSuchKey")
content = self.objects[Key]
return {
"Body": io.BytesIO(content),
"ContentLength": len(content),
}
def head_object(self, Bucket: str, Key: str) -> dict:
if Key not in self.objects:
from botocore.exceptions import ClientError
error_response = {"Error": {"Code": "404", "Message": "Not Found"}}
raise ClientError(error_response, "HeadObject")
content = self.objects[Key]
return {
"ContentLength": len(content),
"ETag": f'"{compute_md5(content)}"',
}
def delete_object(self, Bucket: str, Key: str) -> dict:
if Key in self.objects:
del self.objects[Key]
return {}
def head_bucket(self, Bucket: str) -> dict:
return {}
def create_multipart_upload(self, Bucket: str, Key: str) -> dict:
return {"UploadId": "test-upload-id"}
def upload_part(
self, Bucket: str, Key: str, UploadId: str, PartNumber: int, Body: bytes
) -> dict:
return {"ETag": f'"{compute_md5(Body)}"'}
def complete_multipart_upload(
self, Bucket: str, Key: str, UploadId: str, MultipartUpload: dict
) -> dict:
return {"ETag": '"test-etag"'}
def abort_multipart_upload(self, Bucket: str, Key: str, UploadId: str) -> dict:
return {}
def generate_presigned_url(
self, ClientMethod: str, Params: dict, ExpiresIn: int
) -> str:
return f"https://test-bucket.s3.amazonaws.com/{Params['Key']}?presigned=true"
@pytest.fixture
def mock_s3_client() -> MockS3Client:
"""Provide a mock S3 client for unit tests."""
return MockS3Client()
@pytest.fixture
def mock_storage(mock_s3_client):
"""
Provide a mock storage instance for unit tests.
Uses the MockS3Client to avoid actual S3/MinIO calls.
"""
from app.storage import S3Storage
storage = S3Storage.__new__(S3Storage)
storage.client = mock_s3_client
storage.bucket = "test-bucket"
storage._active_uploads = {}
return storage
# =============================================================================
# Database Fixtures (for integration tests)
# =============================================================================
@pytest.fixture(scope="session")
def test_db_url():
"""Get the test database URL."""
return (
f"postgresql://{os.environ['ORCHARD_DATABASE_USER']}:"
f"{os.environ['ORCHARD_DATABASE_PASSWORD']}@"
f"{os.environ['ORCHARD_DATABASE_HOST']}:"
f"{os.environ['ORCHARD_DATABASE_PORT']}/"
f"{os.environ['ORCHARD_DATABASE_DBNAME']}"
)
# =============================================================================
# HTTP Client Fixtures (for API tests)
# =============================================================================
@pytest.fixture
def test_app():
"""
Create a test FastAPI application.
Note: This requires the database to be available for integration tests.
For unit tests, use mock_storage fixture instead.
"""
from fastapi.testclient import TestClient
from app.main import app
return TestClient(app)

View File

@@ -0,0 +1,207 @@
"""
Unit tests for duplicate detection and deduplication logic.
Tests cover:
- _exists() method correctly identifies existing S3 keys
- S3 key generation follows expected pattern
- Storage layer skips upload when artifact already exists
- Storage layer performs upload when artifact does not exist
"""
import pytest
import io
from unittest.mock import MagicMock, patch
from tests.conftest import (
compute_sha256,
TEST_CONTENT_HELLO,
TEST_HASH_HELLO,
)
class TestExistsMethod:
"""Tests for the _exists() method that checks S3 object existence."""
@pytest.mark.unit
def test_exists_returns_true_for_existing_key(self, mock_storage, mock_s3_client):
"""Test _exists() returns True when object exists."""
# Pre-populate the mock storage
test_key = "fruits/df/fd/test-hash"
mock_s3_client.objects[test_key] = b"content"
result = mock_storage._exists(test_key)
assert result is True
@pytest.mark.unit
def test_exists_returns_false_for_nonexistent_key(self, mock_storage):
"""Test _exists() returns False when object doesn't exist."""
result = mock_storage._exists("fruits/no/ne/nonexistent-key")
assert result is False
@pytest.mark.unit
def test_exists_handles_404_error(self, mock_storage):
"""Test _exists() handles 404 errors gracefully."""
# The mock client raises ClientError for nonexistent keys
result = mock_storage._exists("fruits/xx/yy/does-not-exist")
assert result is False
class TestS3KeyGeneration:
"""Tests for S3 key pattern generation."""
@pytest.mark.unit
def test_s3_key_pattern(self):
"""Test S3 key follows pattern: fruits/{hash[:2]}/{hash[2:4]}/{hash}"""
test_hash = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
expected_key = f"fruits/{test_hash[:2]}/{test_hash[2:4]}/{test_hash}"
# Expected: fruits/ab/cd/abcdef1234567890...
assert expected_key == f"fruits/ab/cd/{test_hash}"
@pytest.mark.unit
def test_s3_key_generation_in_storage(self, mock_storage):
"""Test storage layer generates correct S3 key."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
expected_key = (
f"fruits/{TEST_HASH_HELLO[:2]}/{TEST_HASH_HELLO[2:4]}/{TEST_HASH_HELLO}"
)
assert result.s3_key == expected_key
@pytest.mark.unit
def test_s3_key_uses_sha256_hash(self, mock_storage):
"""Test S3 key is derived from SHA256 hash."""
content = b"unique test content for key test"
file_obj = io.BytesIO(content)
expected_hash = compute_sha256(content)
result = mock_storage._store_simple(file_obj)
# Key should contain the hash
assert expected_hash in result.s3_key
class TestDeduplicationBehavior:
"""Tests for deduplication (skip upload when exists)."""
@pytest.mark.unit
def test_skips_upload_when_exists(self, mock_storage, mock_s3_client):
"""Test storage skips S3 upload when artifact already exists."""
content = TEST_CONTENT_HELLO
s3_key = (
f"fruits/{TEST_HASH_HELLO[:2]}/{TEST_HASH_HELLO[2:4]}/{TEST_HASH_HELLO}"
)
# Pre-populate storage (simulate existing artifact)
mock_s3_client.objects[s3_key] = content
# Track put_object calls
original_put = mock_s3_client.put_object
put_called = []
def tracked_put(*args, **kwargs):
put_called.append(True)
return original_put(*args, **kwargs)
mock_s3_client.put_object = tracked_put
# Store the same content
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
# put_object should NOT have been called (deduplication)
assert len(put_called) == 0
assert result.sha256 == TEST_HASH_HELLO
@pytest.mark.unit
def test_uploads_when_not_exists(self, mock_storage, mock_s3_client):
"""Test storage uploads to S3 when artifact doesn't exist."""
content = b"brand new unique content"
content_hash = compute_sha256(content)
s3_key = f"fruits/{content_hash[:2]}/{content_hash[2:4]}/{content_hash}"
# Ensure object doesn't exist
assert s3_key not in mock_s3_client.objects
# Store the content
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
# Object should now exist in mock storage
assert s3_key in mock_s3_client.objects
assert mock_s3_client.objects[s3_key] == content
@pytest.mark.unit
def test_returns_same_hash_for_duplicate(self, mock_storage, mock_s3_client):
"""Test storing same content twice returns same hash."""
content = b"content to be stored twice"
# First store
file1 = io.BytesIO(content)
result1 = mock_storage._store_simple(file1)
# Second store (duplicate)
file2 = io.BytesIO(content)
result2 = mock_storage._store_simple(file2)
assert result1.sha256 == result2.sha256
assert result1.s3_key == result2.s3_key
@pytest.mark.unit
def test_different_content_different_keys(self, mock_storage):
"""Test different content produces different S3 keys."""
content1 = b"first content"
content2 = b"second content"
file1 = io.BytesIO(content1)
result1 = mock_storage._store_simple(file1)
file2 = io.BytesIO(content2)
result2 = mock_storage._store_simple(file2)
assert result1.sha256 != result2.sha256
assert result1.s3_key != result2.s3_key
class TestDeduplicationEdgeCases:
"""Edge case tests for deduplication."""
@pytest.mark.unit
def test_same_content_different_filenames(self, mock_storage):
"""Test same content with different metadata is deduplicated."""
content = b"identical content"
# Store with "filename1"
file1 = io.BytesIO(content)
result1 = mock_storage._store_simple(file1)
# Store with "filename2" (same content)
file2 = io.BytesIO(content)
result2 = mock_storage._store_simple(file2)
# Both should have same hash (content-addressable)
assert result1.sha256 == result2.sha256
@pytest.mark.unit
def test_whitespace_only_difference(self, mock_storage):
"""Test content differing only by whitespace produces different hashes."""
content1 = b"test content"
content2 = b"test content" # Extra space
content3 = b"test content " # Trailing space
file1 = io.BytesIO(content1)
file2 = io.BytesIO(content2)
file3 = io.BytesIO(content3)
result1 = mock_storage._store_simple(file1)
result2 = mock_storage._store_simple(file2)
result3 = mock_storage._store_simple(file3)
# All should be different (content-addressable)
assert len({result1.sha256, result2.sha256, result3.sha256}) == 3

View File

@@ -0,0 +1,215 @@
"""
Unit tests for SHA256 hash calculation and deduplication logic.
Tests cover:
- Hash computation produces consistent results
- Hash is always 64 character lowercase hexadecimal
- Different content produces different hashes
- Binary content handling
- Large file handling (streaming)
"""
import pytest
import hashlib
import io
from tests.conftest import (
create_test_file,
compute_sha256,
TEST_CONTENT_HELLO,
TEST_HASH_HELLO,
TEST_CONTENT_BINARY,
TEST_HASH_BINARY,
)
class TestHashComputation:
"""Unit tests for hash calculation functionality."""
@pytest.mark.unit
def test_sha256_consistent_results(self):
"""Test SHA256 hash produces consistent results for identical content."""
content = b"test content for hashing"
# Compute hash multiple times
hash1 = compute_sha256(content)
hash2 = compute_sha256(content)
hash3 = compute_sha256(content)
assert hash1 == hash2 == hash3
@pytest.mark.unit
def test_sha256_different_content_different_hash(self):
"""Test SHA256 produces different hashes for different content."""
content1 = b"content version 1"
content2 = b"content version 2"
hash1 = compute_sha256(content1)
hash2 = compute_sha256(content2)
assert hash1 != hash2
@pytest.mark.unit
def test_sha256_format_64_char_hex(self):
"""Test SHA256 hash is always 64 character lowercase hexadecimal."""
test_cases = [
b"", # Empty
b"a", # Single char
b"Hello, World!", # Normal string
bytes(range(256)), # All byte values
b"x" * 10000, # Larger content
]
for content in test_cases:
hash_value = compute_sha256(content)
# Check length
assert len(hash_value) == 64, (
f"Hash length should be 64, got {len(hash_value)}"
)
# Check lowercase
assert hash_value == hash_value.lower(), "Hash should be lowercase"
# Check hexadecimal
assert all(c in "0123456789abcdef" for c in hash_value), (
"Hash should be hex"
)
@pytest.mark.unit
def test_sha256_known_value(self):
"""Test SHA256 produces expected hash for known input."""
assert compute_sha256(TEST_CONTENT_HELLO) == TEST_HASH_HELLO
@pytest.mark.unit
def test_sha256_binary_content(self):
"""Test SHA256 handles binary content correctly."""
assert compute_sha256(TEST_CONTENT_BINARY) == TEST_HASH_BINARY
# Test with null bytes
content_with_nulls = b"\x00\x00test\x00\x00"
hash_value = compute_sha256(content_with_nulls)
assert len(hash_value) == 64
@pytest.mark.unit
def test_sha256_streaming_computation(self):
"""Test SHA256 can be computed in chunks (streaming)."""
# Large content
chunk_size = 8192
total_size = chunk_size * 10 # 80KB
content = b"x" * total_size
# Direct computation
direct_hash = compute_sha256(content)
# Streaming computation
hasher = hashlib.sha256()
for i in range(0, total_size, chunk_size):
hasher.update(content[i : i + chunk_size])
streaming_hash = hasher.hexdigest()
assert direct_hash == streaming_hash
@pytest.mark.unit
def test_sha256_order_matters(self):
"""Test that content order affects hash (not just content set)."""
content1 = b"AB"
content2 = b"BA"
assert compute_sha256(content1) != compute_sha256(content2)
class TestStorageHashComputation:
"""Tests for hash computation in the storage layer."""
@pytest.mark.unit
def test_storage_computes_sha256(self, mock_storage):
"""Test storage layer correctly computes SHA256 hash."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
assert result.sha256 == TEST_HASH_HELLO
@pytest.mark.unit
def test_storage_computes_md5(self, mock_storage):
"""Test storage layer also computes MD5 hash."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
expected_md5 = hashlib.md5(content).hexdigest()
assert result.md5 == expected_md5
@pytest.mark.unit
def test_storage_computes_sha1(self, mock_storage):
"""Test storage layer also computes SHA1 hash."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
expected_sha1 = hashlib.sha1(content).hexdigest()
assert result.sha1 == expected_sha1
@pytest.mark.unit
def test_storage_returns_correct_size(self, mock_storage):
"""Test storage layer returns correct file size."""
content = b"test content with known size"
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
assert result.size == len(content)
@pytest.mark.unit
def test_storage_generates_correct_s3_key(self, mock_storage):
"""Test storage layer generates correct S3 key pattern."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
# Key should be: fruits/{hash[:2]}/{hash[2:4]}/{hash}
expected_key = (
f"fruits/{TEST_HASH_HELLO[:2]}/{TEST_HASH_HELLO[2:4]}/{TEST_HASH_HELLO}"
)
assert result.s3_key == expected_key
class TestHashEdgeCases:
"""Edge case tests for hash computation."""
@pytest.mark.unit
def test_hash_empty_content_rejected(self, mock_storage):
"""Test that empty content is rejected."""
from app.storage import HashComputationError
file_obj = io.BytesIO(b"")
with pytest.raises(HashComputationError):
mock_storage._store_simple(file_obj)
@pytest.mark.unit
def test_hash_large_file_streaming(self, mock_storage):
"""Test hash computation for large files uses streaming."""
# Create a 10MB file
size = 10 * 1024 * 1024
content = b"x" * size
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
expected_hash = compute_sha256(content)
assert result.sha256 == expected_hash
@pytest.mark.unit
def test_hash_special_bytes(self):
"""Test hash handles all byte values correctly."""
# All possible byte values
content = bytes(range(256))
hash_value = compute_sha256(content)
assert len(hash_value) == 64
assert hash_value == TEST_HASH_BINARY