137 lines
4.2 KiB
Python
137 lines
4.2 KiB
Python
"""
|
|
Upload repository for data access operations.
|
|
"""
|
|
|
|
from typing import Optional, List, Tuple
|
|
from datetime import datetime
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func, desc
|
|
from uuid import UUID
|
|
|
|
from .base import BaseRepository
|
|
from ..models import Upload, Artifact, Package, Project
|
|
|
|
|
|
class UploadRepository(BaseRepository[Upload]):
|
|
"""Repository for Upload entity operations."""
|
|
|
|
model = Upload
|
|
|
|
def create_upload(
|
|
self,
|
|
artifact_id: str,
|
|
package_id: UUID,
|
|
uploaded_by: str,
|
|
original_name: Optional[str] = None,
|
|
source_ip: Optional[str] = None,
|
|
) -> Upload:
|
|
"""Record a new upload event."""
|
|
return self.create(
|
|
artifact_id=artifact_id,
|
|
package_id=package_id,
|
|
original_name=original_name,
|
|
uploaded_by=uploaded_by,
|
|
source_ip=source_ip,
|
|
)
|
|
|
|
def list_by_package(
|
|
self,
|
|
package_id: UUID,
|
|
page: int = 1,
|
|
limit: int = 20,
|
|
) -> Tuple[List[Upload], int]:
|
|
"""List uploads for a package with pagination."""
|
|
query = self.db.query(Upload).filter(Upload.package_id == package_id)
|
|
|
|
total = query.count()
|
|
offset = (page - 1) * limit
|
|
uploads = query.order_by(Upload.uploaded_at.desc()).offset(offset).limit(limit).all()
|
|
|
|
return uploads, total
|
|
|
|
def list_by_artifact(self, artifact_id: str) -> List[Upload]:
|
|
"""List all uploads of a specific artifact."""
|
|
return (
|
|
self.db.query(Upload)
|
|
.filter(Upload.artifact_id == artifact_id)
|
|
.order_by(Upload.uploaded_at.desc())
|
|
.all()
|
|
)
|
|
|
|
def get_latest_for_package(self, package_id: UUID) -> Optional[Upload]:
|
|
"""Get the most recent upload for a package."""
|
|
return (
|
|
self.db.query(Upload)
|
|
.filter(Upload.package_id == package_id)
|
|
.order_by(Upload.uploaded_at.desc())
|
|
.first()
|
|
)
|
|
|
|
def get_latest_timestamp(self, package_id: UUID) -> Optional[datetime]:
|
|
"""Get timestamp of most recent upload for a package."""
|
|
result = (
|
|
self.db.query(func.max(Upload.uploaded_at))
|
|
.filter(Upload.package_id == package_id)
|
|
.scalar()
|
|
)
|
|
return result
|
|
|
|
def count_by_artifact(self, artifact_id: str) -> int:
|
|
"""Count uploads of a specific artifact."""
|
|
return (
|
|
self.db.query(func.count(Upload.id))
|
|
.filter(Upload.artifact_id == artifact_id)
|
|
.scalar() or 0
|
|
)
|
|
|
|
def count_by_package(self, package_id: UUID) -> int:
|
|
"""Count total uploads for a package."""
|
|
return (
|
|
self.db.query(func.count(Upload.id))
|
|
.filter(Upload.package_id == package_id)
|
|
.scalar() or 0
|
|
)
|
|
|
|
def get_distinct_artifacts_count(self, package_id: UUID) -> int:
|
|
"""Count distinct artifacts uploaded to a package."""
|
|
return (
|
|
self.db.query(func.count(func.distinct(Upload.artifact_id)))
|
|
.filter(Upload.package_id == package_id)
|
|
.scalar() or 0
|
|
)
|
|
|
|
def get_uploads_by_user(
|
|
self,
|
|
user_id: str,
|
|
page: int = 1,
|
|
limit: int = 20,
|
|
) -> Tuple[List[Upload], int]:
|
|
"""List uploads by a specific user."""
|
|
query = self.db.query(Upload).filter(Upload.uploaded_by == user_id)
|
|
|
|
total = query.count()
|
|
offset = (page - 1) * limit
|
|
uploads = query.order_by(Upload.uploaded_at.desc()).offset(offset).limit(limit).all()
|
|
|
|
return uploads, total
|
|
|
|
def get_upload_stats(self, package_id: UUID) -> dict:
|
|
"""Get upload statistics for a package."""
|
|
stats = (
|
|
self.db.query(
|
|
func.count(Upload.id),
|
|
func.count(func.distinct(Upload.artifact_id)),
|
|
func.min(Upload.uploaded_at),
|
|
func.max(Upload.uploaded_at),
|
|
)
|
|
.filter(Upload.package_id == package_id)
|
|
.first()
|
|
)
|
|
|
|
return {
|
|
"total_uploads": stats[0] if stats else 0,
|
|
"unique_artifacts": stats[1] if stats else 0,
|
|
"first_upload": stats[2] if stats else None,
|
|
"last_upload": stats[3] if stats else None,
|
|
}
|