""" Package repository for data access operations. """ from typing import Optional, List, Tuple from sqlalchemy.orm import Session from sqlalchemy import func, or_, asc, desc from uuid import UUID from .base import BaseRepository from ..models import Package, Project, Tag, Upload, Artifact class PackageRepository(BaseRepository[Package]): """Repository for Package entity operations.""" model = Package def get_by_name(self, project_id: UUID, name: str) -> Optional[Package]: """Get package by name within a project.""" return ( self.db.query(Package) .filter(Package.project_id == project_id, Package.name == name) .first() ) def get_by_project_and_name(self, project_name: str, package_name: str) -> Optional[Package]: """Get package by project name and package name.""" return ( self.db.query(Package) .join(Project, Package.project_id == Project.id) .filter(Project.name == project_name, Package.name == package_name) .first() ) def exists_by_name(self, project_id: UUID, name: str) -> bool: """Check if package with name exists in project.""" return self.db.query( self.db.query(Package) .filter(Package.project_id == project_id, Package.name == name) .exists() ).scalar() def list_by_project( self, project_id: UUID, page: int = 1, limit: int = 20, search: Optional[str] = None, format: Optional[str] = None, platform: Optional[str] = None, sort: str = "name", order: str = "asc", ) -> Tuple[List[Package], int]: """ List packages in a project with filtering and pagination. Returns tuple of (packages, total_count). """ query = self.db.query(Package).filter(Package.project_id == project_id) # Apply search filter if search: search_lower = search.lower() query = query.filter( or_( func.lower(Package.name).contains(search_lower), func.lower(Package.description).contains(search_lower) ) ) # Apply format filter if format: query = query.filter(Package.format == format) # Apply platform filter if platform: query = query.filter(Package.platform == platform) # Get total count total = query.count() # Apply sorting sort_columns = { "name": Package.name, "created_at": Package.created_at, "updated_at": Package.updated_at, } sort_column = sort_columns.get(sort, Package.name) if order == "desc": query = query.order_by(desc(sort_column)) else: query = query.order_by(asc(sort_column)) # Apply pagination offset = (page - 1) * limit packages = query.offset(offset).limit(limit).all() return packages, total def create_package( self, project_id: UUID, name: str, description: Optional[str] = None, format: str = "generic", platform: str = "any", ) -> Package: """Create a new package.""" return self.create( project_id=project_id, name=name, description=description, format=format, platform=platform, ) def update_package( self, package: Package, name: Optional[str] = None, description: Optional[str] = None, format: Optional[str] = None, platform: Optional[str] = None, ) -> Package: """Update package fields.""" updates = {} if name is not None: updates["name"] = name if description is not None: updates["description"] = description if format is not None: updates["format"] = format if platform is not None: updates["platform"] = platform return self.update(package, **updates) def get_stats(self, package_id: UUID) -> dict: """Get package statistics (tag count, artifact count, total size).""" tag_count = ( self.db.query(func.count(Tag.id)) .filter(Tag.package_id == package_id) .scalar() or 0 ) artifact_stats = ( self.db.query( func.count(func.distinct(Upload.artifact_id)), func.coalesce(func.sum(Artifact.size), 0) ) .join(Artifact, Upload.artifact_id == Artifact.id) .filter(Upload.package_id == package_id) .first() ) return { "tag_count": tag_count, "artifact_count": artifact_stats[0] if artifact_stats else 0, "total_size": artifact_stats[1] if artifact_stats else 0, } def search(self, query_str: str, limit: int = 10) -> List[Tuple[Package, str]]: """Search packages by name or description. Returns (package, project_name) tuples.""" search_lower = query_str.lower() return ( self.db.query(Package, Project.name) .join(Project, Package.project_id == Project.id) .filter( or_( func.lower(Package.name).contains(search_lower), func.lower(Package.description).contains(search_lower) ) ) .order_by(Package.name) .limit(limit) .all() )