""" Upload repository for data access operations. """ from typing import Optional, List, Tuple from datetime import datetime from sqlalchemy.orm import Session from sqlalchemy import func, desc from uuid import UUID from .base import BaseRepository from ..models import Upload, Artifact, Package, Project class UploadRepository(BaseRepository[Upload]): """Repository for Upload entity operations.""" model = Upload def create_upload( self, artifact_id: str, package_id: UUID, uploaded_by: str, original_name: Optional[str] = None, source_ip: Optional[str] = None, ) -> Upload: """Record a new upload event.""" return self.create( artifact_id=artifact_id, package_id=package_id, original_name=original_name, uploaded_by=uploaded_by, source_ip=source_ip, ) def list_by_package( self, package_id: UUID, page: int = 1, limit: int = 20, ) -> Tuple[List[Upload], int]: """List uploads for a package with pagination.""" query = self.db.query(Upload).filter(Upload.package_id == package_id) total = query.count() offset = (page - 1) * limit uploads = query.order_by(Upload.uploaded_at.desc()).offset(offset).limit(limit).all() return uploads, total def list_by_artifact(self, artifact_id: str) -> List[Upload]: """List all uploads of a specific artifact.""" return ( self.db.query(Upload) .filter(Upload.artifact_id == artifact_id) .order_by(Upload.uploaded_at.desc()) .all() ) def get_latest_for_package(self, package_id: UUID) -> Optional[Upload]: """Get the most recent upload for a package.""" return ( self.db.query(Upload) .filter(Upload.package_id == package_id) .order_by(Upload.uploaded_at.desc()) .first() ) def get_latest_timestamp(self, package_id: UUID) -> Optional[datetime]: """Get timestamp of most recent upload for a package.""" result = ( self.db.query(func.max(Upload.uploaded_at)) .filter(Upload.package_id == package_id) .scalar() ) return result def count_by_artifact(self, artifact_id: str) -> int: """Count uploads of a specific artifact.""" return ( self.db.query(func.count(Upload.id)) .filter(Upload.artifact_id == artifact_id) .scalar() or 0 ) def count_by_package(self, package_id: UUID) -> int: """Count total uploads for a package.""" return ( self.db.query(func.count(Upload.id)) .filter(Upload.package_id == package_id) .scalar() or 0 ) def get_distinct_artifacts_count(self, package_id: UUID) -> int: """Count distinct artifacts uploaded to a package.""" return ( self.db.query(func.count(func.distinct(Upload.artifact_id))) .filter(Upload.package_id == package_id) .scalar() or 0 ) def get_uploads_by_user( self, user_id: str, page: int = 1, limit: int = 20, ) -> Tuple[List[Upload], int]: """List uploads by a specific user.""" query = self.db.query(Upload).filter(Upload.uploaded_by == user_id) total = query.count() offset = (page - 1) * limit uploads = query.order_by(Upload.uploaded_at.desc()).offset(offset).limit(limit).all() return uploads, total def get_upload_stats(self, package_id: UUID) -> dict: """Get upload statistics for a package.""" stats = ( self.db.query( func.count(Upload.id), func.count(func.distinct(Upload.artifact_id)), func.min(Upload.uploaded_at), func.max(Upload.uploaded_at), ) .filter(Upload.package_id == package_id) .first() ) return { "total_uploads": stats[0] if stats else 0, "unique_artifacts": stats[1] if stats else 0, "first_upload": stats[2] if stats else None, "last_upload": stats[3] if stats else None, }