Add storage abstraction, stats endpoints, garbage collection, and test infrastructure

- Add StorageBackend protocol for backend-agnostic storage interface
- Add health check with storage and database connectivity verification
- Add garbage collection endpoints for orphaned artifacts (ref_count=0)
- Add deduplication statistics endpoints (/api/v1/stats, /stats/storage, /stats/deduplication)
- Add per-project statistics endpoint
- Add verify_integrity method for post-upload hash validation
- Set up pytest infrastructure with mock S3 client
- Add unit tests for hash calculation and duplicate detection
This commit is contained in:
Mondo Diaz
2026-01-05 11:16:46 -06:00
parent dbe78ded2f
commit 109677e43a
9 changed files with 1311 additions and 5 deletions

View File

@@ -73,6 +73,11 @@ from .schemas import (
SearchResultPackage,
SearchResultArtifact,
PresignedUrlResponse,
GarbageCollectionResponse,
OrphanedArtifactResponse,
StorageStatsResponse,
DeduplicationStatsResponse,
ProjectStatsResponse,
)
from .metadata import extract_metadata
from .config import get_settings
@@ -246,8 +251,39 @@ def _log_audit(
# Health check
@router.get("/health", response_model=HealthResponse)
def health_check():
return HealthResponse(status="ok")
def health_check(
db: Session = Depends(get_db),
storage: S3Storage = Depends(get_storage),
):
"""
Health check endpoint with optional storage and database health verification.
"""
storage_healthy = None
database_healthy = None
# Check database connectivity
try:
db.execute("SELECT 1")
database_healthy = True
except Exception as e:
logger.warning(f"Database health check failed: {e}")
database_healthy = False
# Check storage connectivity by listing bucket (lightweight operation)
try:
storage.client.head_bucket(Bucket=storage.bucket)
storage_healthy = True
except Exception as e:
logger.warning(f"Storage health check failed: {e}")
storage_healthy = False
overall_status = "ok" if (storage_healthy and database_healthy) else "degraded"
return HealthResponse(
status=overall_status,
storage_healthy=storage_healthy,
database_healthy=database_healthy,
)
# Global search
@@ -2104,3 +2140,365 @@ def get_artifact(artifact_id: str, db: Session = Depends(get_db)):
format_metadata=artifact.format_metadata,
tags=tag_infos,
)
# =============================================================================
# Garbage Collection Endpoints (ISSUE 36)
# =============================================================================
@router.get(
"/api/v1/admin/orphaned-artifacts",
response_model=List[OrphanedArtifactResponse],
)
def list_orphaned_artifacts(
request: Request,
limit: int = Query(
default=100, ge=1, le=1000, description="Max artifacts to return"
),
db: Session = Depends(get_db),
):
"""
List artifacts with ref_count=0 (orphaned artifacts not referenced by any tag).
These artifacts can be safely cleaned up as they are not referenced by any tag.
"""
orphaned = (
db.query(Artifact)
.filter(Artifact.ref_count == 0)
.order_by(Artifact.created_at.asc())
.limit(limit)
.all()
)
return [
OrphanedArtifactResponse(
id=a.id,
size=a.size,
created_at=a.created_at,
created_by=a.created_by,
original_name=a.original_name,
)
for a in orphaned
]
@router.post(
"/api/v1/admin/garbage-collect",
response_model=GarbageCollectionResponse,
)
def garbage_collect(
request: Request,
dry_run: bool = Query(
default=True, description="If true, only report what would be deleted"
),
limit: int = Query(
default=100, ge=1, le=1000, description="Max artifacts to delete per run"
),
db: Session = Depends(get_db),
storage: S3Storage = Depends(get_storage),
):
"""
Clean up orphaned artifacts (ref_count=0) from storage and database.
By default runs in dry-run mode (only reports what would be deleted).
Set dry_run=false to actually delete artifacts.
Returns list of deleted artifact IDs and total bytes freed.
"""
user_id = get_user_id(request)
# Find orphaned artifacts
orphaned = (
db.query(Artifact)
.filter(Artifact.ref_count == 0)
.order_by(Artifact.created_at.asc())
.limit(limit)
.all()
)
deleted_ids = []
bytes_freed = 0
for artifact in orphaned:
if not dry_run:
# Delete from S3
try:
storage.delete(artifact.s3_key)
except Exception as e:
logger.error(f"Failed to delete S3 object {artifact.s3_key}: {e}")
continue
# Delete from database
db.delete(artifact)
logger.info(
f"Garbage collected artifact {artifact.id[:12]}... ({artifact.size} bytes)"
)
deleted_ids.append(artifact.id)
bytes_freed += artifact.size
if not dry_run:
# Audit log
_log_audit(
db,
action="garbage_collect",
resource="artifacts",
user_id=user_id,
source_ip=request.client.host if request.client else None,
details={
"artifacts_deleted": len(deleted_ids),
"bytes_freed": bytes_freed,
"artifact_ids": deleted_ids[:10], # Log first 10 for brevity
},
)
db.commit()
return GarbageCollectionResponse(
artifacts_deleted=len(deleted_ids),
bytes_freed=bytes_freed,
artifact_ids=deleted_ids,
dry_run=dry_run,
)
# =============================================================================
# Statistics Endpoints (ISSUE 34)
# =============================================================================
@router.get("/api/v1/stats", response_model=StorageStatsResponse)
def get_storage_stats(db: Session = Depends(get_db)):
"""
Get global storage statistics including deduplication metrics.
"""
# Total artifacts and size
total_stats = db.query(
func.count(Artifact.id),
func.coalesce(func.sum(Artifact.size), 0),
).first()
total_artifacts = total_stats[0] or 0
total_size_bytes = total_stats[1] or 0
# Unique artifacts (ref_count > 0) and their size
unique_stats = (
db.query(
func.count(Artifact.id),
)
.filter(Artifact.ref_count > 0)
.first()
)
unique_artifacts = unique_stats[0] or 0
# Orphaned artifacts (ref_count = 0)
orphaned_stats = (
db.query(
func.count(Artifact.id),
func.coalesce(func.sum(Artifact.size), 0),
)
.filter(Artifact.ref_count == 0)
.first()
)
orphaned_artifacts = orphaned_stats[0] or 0
orphaned_size_bytes = orphaned_stats[1] or 0
# Total uploads and deduplicated uploads
upload_stats = db.query(
func.count(Upload.id),
func.count(Upload.id).filter(Upload.deduplicated == True),
).first()
total_uploads = upload_stats[0] or 0
deduplicated_uploads = upload_stats[1] or 0
# Calculate deduplication ratio
deduplication_ratio = (
total_uploads / unique_artifacts if unique_artifacts > 0 else 0.0
)
# Calculate storage saved (sum of size * (ref_count - 1) for artifacts with ref_count > 1)
# This represents bytes that would have been stored without deduplication
saved_query = (
db.query(func.coalesce(func.sum(Artifact.size * (Artifact.ref_count - 1)), 0))
.filter(Artifact.ref_count > 1)
.first()
)
storage_saved_bytes = saved_query[0] or 0
return StorageStatsResponse(
total_artifacts=total_artifacts,
total_size_bytes=total_size_bytes,
unique_artifacts=unique_artifacts,
orphaned_artifacts=orphaned_artifacts,
orphaned_size_bytes=orphaned_size_bytes,
total_uploads=total_uploads,
deduplicated_uploads=deduplicated_uploads,
deduplication_ratio=deduplication_ratio,
storage_saved_bytes=storage_saved_bytes,
)
@router.get("/api/v1/stats/storage", response_model=StorageStatsResponse)
def get_storage_stats_alias(db: Session = Depends(get_db)):
"""Alias for /api/v1/stats - get global storage statistics."""
return get_storage_stats(db)
@router.get("/api/v1/stats/deduplication", response_model=DeduplicationStatsResponse)
def get_deduplication_stats(
top_n: int = Query(
default=10,
ge=1,
le=100,
description="Number of top referenced artifacts to return",
),
db: Session = Depends(get_db),
):
"""
Get detailed deduplication effectiveness statistics.
"""
# Total logical bytes (sum of all upload sizes - what would be stored without dedup)
# We calculate this as: sum(artifact.size * artifact.ref_count) for all artifacts
logical_query = db.query(
func.coalesce(func.sum(Artifact.size * Artifact.ref_count), 0)
).first()
total_logical_bytes = logical_query[0] or 0
# Total physical bytes (actual storage used)
physical_query = (
db.query(func.coalesce(func.sum(Artifact.size), 0))
.filter(Artifact.ref_count > 0)
.first()
)
total_physical_bytes = physical_query[0] or 0
# Bytes saved
bytes_saved = total_logical_bytes - total_physical_bytes
# Savings percentage
savings_percentage = (
(bytes_saved / total_logical_bytes * 100) if total_logical_bytes > 0 else 0.0
)
# Upload counts
total_uploads = db.query(func.count(Upload.id)).scalar() or 0
unique_artifacts = (
db.query(func.count(Artifact.id)).filter(Artifact.ref_count > 0).scalar() or 0
)
duplicate_uploads = (
total_uploads - unique_artifacts if total_uploads > unique_artifacts else 0
)
# Average and max ref_count
ref_stats = (
db.query(
func.coalesce(func.avg(Artifact.ref_count), 0),
func.coalesce(func.max(Artifact.ref_count), 0),
)
.filter(Artifact.ref_count > 0)
.first()
)
average_ref_count = float(ref_stats[0] or 0)
max_ref_count = ref_stats[1] or 0
# Top N most referenced artifacts
top_artifacts = (
db.query(Artifact)
.filter(Artifact.ref_count > 1)
.order_by(Artifact.ref_count.desc())
.limit(top_n)
.all()
)
most_referenced = [
{
"id": a.id,
"ref_count": a.ref_count,
"size": a.size,
"storage_saved": a.size * (a.ref_count - 1),
"original_name": a.original_name,
}
for a in top_artifacts
]
return DeduplicationStatsResponse(
total_logical_bytes=total_logical_bytes,
total_physical_bytes=total_physical_bytes,
bytes_saved=bytes_saved,
savings_percentage=savings_percentage,
total_uploads=total_uploads,
unique_artifacts=unique_artifacts,
duplicate_uploads=duplicate_uploads,
average_ref_count=average_ref_count,
max_ref_count=max_ref_count,
most_referenced_artifacts=most_referenced,
)
@router.get(
"/api/v1/projects/{project_name}/stats", response_model=ProjectStatsResponse
)
def get_project_stats(
project_name: str,
db: Session = Depends(get_db),
):
"""
Get statistics for a specific project.
"""
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Package count
package_count = (
db.query(func.count(Package.id))
.filter(Package.project_id == project.id)
.scalar()
or 0
)
# Get all package IDs for this project
package_ids = (
db.query(Package.id).filter(Package.project_id == project.id).subquery()
)
# Tag count
tag_count = (
db.query(func.count(Tag.id)).filter(Tag.package_id.in_(package_ids)).scalar()
or 0
)
# Unique artifact count and total size (via uploads)
artifact_stats = (
db.query(
func.count(func.distinct(Upload.artifact_id)),
func.coalesce(func.sum(Artifact.size), 0),
)
.join(Artifact, Upload.artifact_id == Artifact.id)
.filter(Upload.package_id.in_(package_ids))
.first()
)
artifact_count = artifact_stats[0] if artifact_stats else 0
total_size_bytes = artifact_stats[1] if artifact_stats else 0
# Upload counts
upload_stats = (
db.query(
func.count(Upload.id),
func.count(Upload.id).filter(Upload.deduplicated == True),
)
.filter(Upload.package_id.in_(package_ids))
.first()
)
upload_count = upload_stats[0] if upload_stats else 0
deduplicated_uploads = upload_stats[1] if upload_stats else 0
return ProjectStatsResponse(
project_id=str(project.id),
project_name=project.name,
package_count=package_count,
tag_count=tag_count,
artifact_count=artifact_count,
total_size_bytes=total_size_bytes,
upload_count=upload_count,
deduplicated_uploads=deduplicated_uploads,
)

View File

@@ -387,3 +387,72 @@ class PresignedUrlResponse(BaseModel):
class HealthResponse(BaseModel):
status: str
version: str = "1.0.0"
storage_healthy: Optional[bool] = None
database_healthy: Optional[bool] = None
# Garbage collection schemas
class GarbageCollectionResponse(BaseModel):
"""Response from garbage collection operation"""
artifacts_deleted: int
bytes_freed: int
artifact_ids: List[str]
dry_run: bool
class OrphanedArtifactResponse(BaseModel):
"""Information about an orphaned artifact"""
id: str
size: int
created_at: datetime
created_by: str
original_name: Optional[str]
# Storage statistics schemas
class StorageStatsResponse(BaseModel):
"""Global storage statistics"""
total_artifacts: int
total_size_bytes: int
unique_artifacts: int # Artifacts with ref_count > 0
orphaned_artifacts: int # Artifacts with ref_count = 0
orphaned_size_bytes: int
total_uploads: int
deduplicated_uploads: int
deduplication_ratio: (
float # total_uploads / unique_artifacts (if > 1, deduplication is working)
)
storage_saved_bytes: int # Bytes saved through deduplication
class DeduplicationStatsResponse(BaseModel):
"""Deduplication effectiveness statistics"""
total_logical_bytes: (
int # Sum of all upload sizes (what would be stored without dedup)
)
total_physical_bytes: int # Actual storage used
bytes_saved: int
savings_percentage: float
total_uploads: int
unique_artifacts: int
duplicate_uploads: int
average_ref_count: float
max_ref_count: int
most_referenced_artifacts: List[Dict[str, Any]] # Top N most referenced
class ProjectStatsResponse(BaseModel):
"""Per-project statistics"""
project_id: str
project_name: str
package_count: int
tag_count: int
artifact_count: int
total_size_bytes: int
upload_count: int
deduplicated_uploads: int

View File

@@ -1,6 +1,17 @@
import hashlib
import logging
from typing import BinaryIO, Tuple, Optional, Dict, Any, Generator, NamedTuple
from abc import ABC, abstractmethod
from typing import (
BinaryIO,
Tuple,
Optional,
Dict,
Any,
Generator,
NamedTuple,
Protocol,
runtime_checkable,
)
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
@@ -10,6 +21,133 @@ from .config import get_settings
settings = get_settings()
logger = logging.getLogger(__name__)
# =============================================================================
# Storage Backend Protocol/Interface (ISSUE 33)
# =============================================================================
@runtime_checkable
class StorageBackend(Protocol):
"""
Abstract protocol defining the interface for storage backends.
All storage implementations (S3, MinIO, future backends) must implement
this interface to ensure consistent behavior across the application.
Note on Deduplication:
- This system uses whole-file deduplication based on SHA256 hash
- Partial/chunk-level deduplication is NOT supported (out of scope for MVP)
- Files with identical content but different metadata are deduplicated
"""
def store(
self, file: BinaryIO, content_length: Optional[int] = None
) -> "StorageResult":
"""
Store a file and return StorageResult with all checksums.
Content-addressable: if the file already exists (by hash), just return
the existing hash without uploading again.
Args:
file: File-like object to store
content_length: Optional hint for file size (enables multipart upload)
Returns:
StorageResult with sha256, size, s3_key, and optional checksums
Raises:
HashComputationError: If hash computation fails
S3ExistenceCheckError: If existence check fails after retries
S3UploadError: If upload fails
"""
...
def get(self, s3_key: str) -> bytes:
"""
Retrieve a file by its storage key.
Args:
s3_key: The storage key (path) of the file
Returns:
File content as bytes
"""
...
def get_stream(
self, s3_key: str, range_header: Optional[str] = None
) -> Tuple[Any, int, Optional[str]]:
"""
Get a streaming response for a file.
Supports range requests for partial downloads.
Args:
s3_key: The storage key of the file
range_header: Optional HTTP Range header value
Returns:
Tuple of (stream, content_length, content_range)
"""
...
def delete(self, s3_key: str) -> bool:
"""
Delete a file from storage.
Args:
s3_key: The storage key of the file to delete
Returns:
True if deleted successfully, False otherwise
"""
...
def get_object_info(self, s3_key: str) -> Optional[Dict[str, Any]]:
"""
Get object metadata without downloading content.
Args:
s3_key: The storage key of the file
Returns:
Dict with size, content_type, last_modified, etag, or None if not found
"""
...
def generate_presigned_url(
self,
s3_key: str,
expiry: Optional[int] = None,
response_content_type: Optional[str] = None,
response_content_disposition: Optional[str] = None,
) -> str:
"""
Generate a presigned URL for downloading an object.
Args:
s3_key: The storage key of the file
expiry: URL expiry in seconds
response_content_type: Override Content-Type header in response
response_content_disposition: Override Content-Disposition header
Returns:
Presigned URL string
"""
...
def health_check(self) -> bool:
"""
Check if the storage backend is healthy and accessible.
Returns:
True if healthy, False otherwise
"""
...
# Threshold for multipart upload (100MB)
MULTIPART_THRESHOLD = 100 * 1024 * 1024
# Chunk size for multipart upload (10MB)
@@ -622,12 +760,68 @@ class S3Storage:
)
return url
def health_check(self) -> bool:
"""
Check if the storage backend is healthy and accessible.
Performs a lightweight HEAD request on the bucket to verify connectivity.
Returns:
True if healthy, False otherwise
"""
try:
self.client.head_bucket(Bucket=self.bucket)
return True
except ClientError as e:
logger.warning(f"Storage health check failed: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error during storage health check: {e}")
return False
def verify_integrity(self, s3_key: str, expected_sha256: str) -> bool:
"""
Verify the integrity of a stored object by downloading and re-hashing.
This is an expensive operation and should only be used for critical
verification scenarios.
Args:
s3_key: The storage key of the file
expected_sha256: The expected SHA256 hash
Returns:
True if hash matches, False otherwise
"""
try:
content = self.get(s3_key)
actual_hash = hashlib.sha256(content).hexdigest()
if actual_hash != expected_sha256:
logger.error(
f"Integrity verification failed for {s3_key}: "
f"expected {expected_sha256[:12]}..., got {actual_hash[:12]}..."
)
return False
return True
except Exception as e:
logger.error(f"Error during integrity verification for {s3_key}: {e}")
return False
# Singleton instance
_storage = None
_storage: Optional[S3Storage] = None
def get_storage() -> S3Storage:
def get_storage() -> StorageBackend:
"""
Get the configured storage backend instance.
Currently returns S3Storage (works with S3-compatible backends like MinIO).
Future implementations may support backend selection via configuration.
Returns:
StorageBackend instance
"""
global _storage
if _storage is None:
_storage = S3Storage()