""" Artifact repository for data access operations. """ from typing import Optional, List, Tuple from sqlalchemy.orm import Session from sqlalchemy import func, or_ from uuid import UUID from .base import BaseRepository from ..models import Artifact, Tag, Upload, Package, Project class ArtifactRepository(BaseRepository[Artifact]): """Repository for Artifact entity operations.""" model = Artifact def get_by_sha256(self, sha256: str) -> Optional[Artifact]: """Get artifact by SHA256 hash (primary key).""" return self.db.query(Artifact).filter(Artifact.id == sha256).first() def exists_by_sha256(self, sha256: str) -> bool: """Check if artifact with SHA256 exists.""" return self.db.query( self.db.query(Artifact).filter(Artifact.id == sha256).exists() ).scalar() def create_artifact( self, sha256: str, size: int, s3_key: str, created_by: str, content_type: Optional[str] = None, original_name: Optional[str] = None, format_metadata: Optional[dict] = None, ) -> Artifact: """Create a new artifact.""" artifact = Artifact( id=sha256, size=size, s3_key=s3_key, created_by=created_by, content_type=content_type, original_name=original_name, format_metadata=format_metadata or {}, ref_count=1, ) self.db.add(artifact) self.db.flush() return artifact def increment_ref_count(self, artifact: Artifact) -> Artifact: """Increment artifact reference count.""" artifact.ref_count += 1 self.db.flush() return artifact def decrement_ref_count(self, artifact: Artifact) -> Artifact: """ Decrement artifact reference count. Returns the artifact with updated count. Does not delete the artifact even if ref_count reaches 0. """ if artifact.ref_count > 0: artifact.ref_count -= 1 self.db.flush() return artifact def get_orphaned_artifacts(self, limit: int = 100) -> List[Artifact]: """Get artifacts with ref_count = 0 (candidates for cleanup).""" return ( self.db.query(Artifact) .filter(Artifact.ref_count == 0) .limit(limit) .all() ) def get_artifacts_without_tags(self, limit: int = 100) -> List[Artifact]: """Get artifacts that have no tags pointing to them.""" # Subquery to find artifact IDs that have tags tagged_artifacts = self.db.query(Tag.artifact_id).distinct().subquery() return ( self.db.query(Artifact) .filter(~Artifact.id.in_(tagged_artifacts)) .limit(limit) .all() ) def find_by_package( self, package_id: UUID, page: int = 1, limit: int = 20, content_type: Optional[str] = None, ) -> Tuple[List[Artifact], int]: """Find artifacts uploaded to a package.""" # Get distinct artifact IDs from uploads artifact_ids_subquery = ( self.db.query(func.distinct(Upload.artifact_id)) .filter(Upload.package_id == package_id) .subquery() ) query = self.db.query(Artifact).filter(Artifact.id.in_(artifact_ids_subquery)) if content_type: query = query.filter(Artifact.content_type == content_type) total = query.count() offset = (page - 1) * limit artifacts = query.order_by(Artifact.created_at.desc()).offset(offset).limit(limit).all() return artifacts, total def get_referencing_tags(self, artifact_id: str) -> List[Tuple[Tag, Package, Project]]: """Get all tags referencing this artifact with package and project info.""" return ( self.db.query(Tag, Package, Project) .join(Package, Tag.package_id == Package.id) .join(Project, Package.project_id == Project.id) .filter(Tag.artifact_id == artifact_id) .all() ) def search(self, query_str: str, limit: int = 10) -> List[Tuple[Tag, Artifact, str, str]]: """ Search artifacts by tag name or original filename. Returns (tag, artifact, package_name, project_name) tuples. """ search_lower = query_str.lower() return ( self.db.query(Tag, Artifact, Package.name, Project.name) .join(Artifact, Tag.artifact_id == Artifact.id) .join(Package, Tag.package_id == Package.id) .join(Project, Package.project_id == Project.id) .filter( or_( func.lower(Tag.name).contains(search_lower), func.lower(Artifact.original_name).contains(search_lower) ) ) .order_by(Tag.name) .limit(limit) .all() ) def update_metadata(self, artifact: Artifact, metadata: dict) -> Artifact: """Update or merge format metadata.""" if artifact.format_metadata: artifact.format_metadata = {**artifact.format_metadata, **metadata} else: artifact.format_metadata = metadata self.db.flush() return artifact