158 lines
5.2 KiB
Python
158 lines
5.2 KiB
Python
"""
|
|
Artifact repository for data access operations.
|
|
"""
|
|
|
|
from typing import Optional, List, Tuple
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func, or_
|
|
from uuid import UUID
|
|
|
|
from .base import BaseRepository
|
|
from ..models import Artifact, Tag, Upload, Package, Project
|
|
|
|
|
|
class ArtifactRepository(BaseRepository[Artifact]):
|
|
"""Repository for Artifact entity operations."""
|
|
|
|
model = Artifact
|
|
|
|
def get_by_sha256(self, sha256: str) -> Optional[Artifact]:
|
|
"""Get artifact by SHA256 hash (primary key)."""
|
|
return self.db.query(Artifact).filter(Artifact.id == sha256).first()
|
|
|
|
def exists_by_sha256(self, sha256: str) -> bool:
|
|
"""Check if artifact with SHA256 exists."""
|
|
return self.db.query(
|
|
self.db.query(Artifact).filter(Artifact.id == sha256).exists()
|
|
).scalar()
|
|
|
|
def create_artifact(
|
|
self,
|
|
sha256: str,
|
|
size: int,
|
|
s3_key: str,
|
|
created_by: str,
|
|
content_type: Optional[str] = None,
|
|
original_name: Optional[str] = None,
|
|
format_metadata: Optional[dict] = None,
|
|
) -> Artifact:
|
|
"""Create a new artifact."""
|
|
artifact = Artifact(
|
|
id=sha256,
|
|
size=size,
|
|
s3_key=s3_key,
|
|
created_by=created_by,
|
|
content_type=content_type,
|
|
original_name=original_name,
|
|
format_metadata=format_metadata or {},
|
|
ref_count=1,
|
|
)
|
|
self.db.add(artifact)
|
|
self.db.flush()
|
|
return artifact
|
|
|
|
def increment_ref_count(self, artifact: Artifact) -> Artifact:
|
|
"""Increment artifact reference count."""
|
|
artifact.ref_count += 1
|
|
self.db.flush()
|
|
return artifact
|
|
|
|
def decrement_ref_count(self, artifact: Artifact) -> Artifact:
|
|
"""
|
|
Decrement artifact reference count.
|
|
Returns the artifact with updated count.
|
|
Does not delete the artifact even if ref_count reaches 0.
|
|
"""
|
|
if artifact.ref_count > 0:
|
|
artifact.ref_count -= 1
|
|
self.db.flush()
|
|
return artifact
|
|
|
|
def get_orphaned_artifacts(self, limit: int = 100) -> List[Artifact]:
|
|
"""Get artifacts with ref_count = 0 (candidates for cleanup)."""
|
|
return (
|
|
self.db.query(Artifact)
|
|
.filter(Artifact.ref_count == 0)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
def get_artifacts_without_tags(self, limit: int = 100) -> List[Artifact]:
|
|
"""Get artifacts that have no tags pointing to them."""
|
|
# Subquery to find artifact IDs that have tags
|
|
tagged_artifacts = self.db.query(Tag.artifact_id).distinct().subquery()
|
|
|
|
return (
|
|
self.db.query(Artifact)
|
|
.filter(~Artifact.id.in_(tagged_artifacts))
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
def find_by_package(
|
|
self,
|
|
package_id: UUID,
|
|
page: int = 1,
|
|
limit: int = 20,
|
|
content_type: Optional[str] = None,
|
|
) -> Tuple[List[Artifact], int]:
|
|
"""Find artifacts uploaded to a package."""
|
|
# Get distinct artifact IDs from uploads
|
|
artifact_ids_subquery = (
|
|
self.db.query(func.distinct(Upload.artifact_id))
|
|
.filter(Upload.package_id == package_id)
|
|
.subquery()
|
|
)
|
|
|
|
query = self.db.query(Artifact).filter(Artifact.id.in_(artifact_ids_subquery))
|
|
|
|
if content_type:
|
|
query = query.filter(Artifact.content_type == content_type)
|
|
|
|
total = query.count()
|
|
offset = (page - 1) * limit
|
|
artifacts = query.order_by(Artifact.created_at.desc()).offset(offset).limit(limit).all()
|
|
|
|
return artifacts, total
|
|
|
|
def get_referencing_tags(self, artifact_id: str) -> List[Tuple[Tag, Package, Project]]:
|
|
"""Get all tags referencing this artifact with package and project info."""
|
|
return (
|
|
self.db.query(Tag, Package, Project)
|
|
.join(Package, Tag.package_id == Package.id)
|
|
.join(Project, Package.project_id == Project.id)
|
|
.filter(Tag.artifact_id == artifact_id)
|
|
.all()
|
|
)
|
|
|
|
def search(self, query_str: str, limit: int = 10) -> List[Tuple[Tag, Artifact, str, str]]:
|
|
"""
|
|
Search artifacts by tag name or original filename.
|
|
Returns (tag, artifact, package_name, project_name) tuples.
|
|
"""
|
|
search_lower = query_str.lower()
|
|
return (
|
|
self.db.query(Tag, Artifact, Package.name, Project.name)
|
|
.join(Artifact, Tag.artifact_id == Artifact.id)
|
|
.join(Package, Tag.package_id == Package.id)
|
|
.join(Project, Package.project_id == Project.id)
|
|
.filter(
|
|
or_(
|
|
func.lower(Tag.name).contains(search_lower),
|
|
func.lower(Artifact.original_name).contains(search_lower)
|
|
)
|
|
)
|
|
.order_by(Tag.name)
|
|
.limit(limit)
|
|
.all()
|
|
)
|
|
|
|
def update_metadata(self, artifact: Artifact, metadata: dict) -> Artifact:
|
|
"""Update or merge format metadata."""
|
|
if artifact.format_metadata:
|
|
artifact.format_metadata = {**artifact.format_metadata, **metadata}
|
|
else:
|
|
artifact.format_metadata = metadata
|
|
self.db.flush()
|
|
return artifact
|