import json from datetime import datetime, timedelta, timezone from fastapi import ( APIRouter, Depends, HTTPException, UploadFile, File, Form, Request, Query, Header, Response, Cookie, status, ) from fastapi.responses import StreamingResponse, RedirectResponse from sqlalchemy.orm import Session from sqlalchemy import or_, and_, func, text from typing import List, Optional, Literal import math import io from .database import get_db from .storage import ( get_storage, S3Storage, MULTIPART_CHUNK_SIZE, StorageError, HashComputationError, FileSizeExceededError, S3ExistenceCheckError, S3UploadError, S3StorageUnavailableError, HashCollisionError, ) from .models import ( Project, Package, Artifact, Tag, TagHistory, Upload, UploadLock, Consumer, AuditLog, User, AccessPermission, PackageVersion, ) from .schemas import ( ProjectCreate, ProjectUpdate, ProjectResponse, ProjectWithAccessResponse, PackageCreate, PackageUpdate, PackageResponse, PackageDetailResponse, TagSummary, PACKAGE_FORMATS, PACKAGE_PLATFORMS, ArtifactDetailResponse, ArtifactTagInfo, PackageArtifactResponse, TagCreate, TagResponse, TagDetailResponse, TagHistoryResponse, TagHistoryDetailResponse, AuditLogResponse, UploadHistoryResponse, ArtifactProvenanceResponse, UploadResponse, ConsumerResponse, HealthResponse, PaginatedResponse, PaginationMeta, ResumableUploadInitRequest, ResumableUploadInitResponse, ResumableUploadPartResponse, ResumableUploadCompleteRequest, ResumableUploadCompleteResponse, ResumableUploadStatusResponse, GlobalSearchResponse, SearchResultProject, SearchResultPackage, SearchResultArtifact, PresignedUrlResponse, GarbageCollectionResponse, OrphanedArtifactResponse, ConsistencyCheckResponse, StorageStatsResponse, DeduplicationStatsResponse, ProjectStatsResponse, PackageStatsResponse, ArtifactStatsResponse, CrossProjectDeduplicationResponse, TimeBasedStatsResponse, StatsReportResponse, GlobalArtifactResponse, GlobalTagResponse, LoginRequest, LoginResponse, ChangePasswordRequest, UserResponse, UserCreate, UserUpdate, ResetPasswordRequest, APIKeyCreate, APIKeyResponse, APIKeyCreateResponse, AccessPermissionCreate, AccessPermissionUpdate, AccessPermissionResponse, OIDCConfigResponse, OIDCConfigUpdate, OIDCStatusResponse, OIDCLoginResponse, PackageVersionResponse, PackageVersionDetailResponse, ) from .metadata import extract_metadata from .config import get_settings from .checksum import ( ChecksumMismatchError, VerifyingStreamWrapper, sha256_to_base64, ) router = APIRouter() def sanitize_filename(filename: str) -> str: """Sanitize filename for use in Content-Disposition header. Removes characters that could enable header injection attacks: - Double quotes (") - could break out of quoted filename - Carriage return (\\r) and newline (\\n) - could inject headers """ import re return re.sub(r'[\r\n"]', "", filename) def get_user_id_from_request( request: Request, db: Session, current_user: Optional[User] = None, ) -> str: """Extract user ID from request using auth system. If a current_user is provided (from auth dependency), use their username. Otherwise, try to authenticate from headers and fall back to 'anonymous'. """ if current_user: return current_user.username # Try to authenticate from API key header auth_header = request.headers.get("Authorization") if auth_header and auth_header.startswith("Bearer "): api_key = auth_header[7:] auth_service = AuthService(db) user = auth_service.get_user_from_api_key(api_key) if user: return user.username return "anonymous" def get_user_id(request: Request) -> str: """Legacy function for backward compatibility. DEPRECATED: Use get_user_id_from_request with db session for proper auth. """ auth = request.headers.get("Authorization") if auth and auth.startswith("Bearer "): return "authenticated-user" return "anonymous" import logging import time logger = logging.getLogger(__name__) def _increment_ref_count(db: Session, artifact_id: str) -> int: """ Atomically increment ref_count for an artifact using row-level locking. Returns the new ref_count value. Uses SELECT FOR UPDATE to prevent race conditions when multiple requests try to modify the same artifact's ref_count simultaneously. """ # Lock the row to prevent concurrent modifications artifact = ( db.query(Artifact).filter(Artifact.id == artifact_id).with_for_update().first() ) if not artifact: logger.warning( f"Attempted to increment ref_count for non-existent artifact: {artifact_id[:12]}..." ) return 0 artifact.ref_count += 1 db.flush() # Ensure the update is written but don't commit yet return artifact.ref_count def _decrement_ref_count(db: Session, artifact_id: str) -> int: """ Atomically decrement ref_count for an artifact using row-level locking. Returns the new ref_count value. Uses SELECT FOR UPDATE to prevent race conditions when multiple requests try to modify the same artifact's ref_count simultaneously. Will not decrement below 0. """ # Lock the row to prevent concurrent modifications artifact = ( db.query(Artifact).filter(Artifact.id == artifact_id).with_for_update().first() ) if not artifact: logger.warning( f"Attempted to decrement ref_count for non-existent artifact: {artifact_id[:12]}..." ) return 0 # Prevent going below 0 if artifact.ref_count > 0: artifact.ref_count -= 1 else: logger.warning( f"Attempted to decrement ref_count below 0 for artifact: {artifact_id[:12]}... " f"(current: {artifact.ref_count})" ) db.flush() # Ensure the update is written but don't commit yet return artifact.ref_count import re # Regex pattern for detecting version in filename # Matches: name-1.0.0, name_1.0.0, name-v1.0.0, etc. # Supports: X.Y, X.Y.Z, X.Y.Z-alpha, X.Y.Z.beta1, X.Y.Z_rc2 VERSION_FILENAME_PATTERN = re.compile( r"[-_]v?(\d+\.\d+(?:\.\d+)?(?:[-_][a-zA-Z0-9]+)?)(?:\.tar|\.zip|\.tgz|\.gz|\.bz2|\.xz|$)" ) def _detect_version( explicit_version: Optional[str], metadata: dict, filename: str, ) -> tuple[Optional[str], Optional[str]]: """ Detect version from explicit parameter, metadata, or filename. Priority: 1. Explicit version parameter (user provided) 2. Version from package metadata (deb, rpm, whl, jar) 3. Version from filename pattern Returns: tuple of (version, source) where source is one of: - 'explicit': User provided the version - 'metadata': Extracted from package metadata - 'filename': Parsed from filename - None: No version could be determined """ # 1. Explicit version takes priority if explicit_version: return explicit_version, "explicit" # 2. Try metadata extraction (from deb, rpm, whl, jar, etc.) if metadata.get("version"): return metadata["version"], "metadata" # 3. Try filename pattern matching match = VERSION_FILENAME_PATTERN.search(filename) if match: return match.group(1), "filename" return None, None def _create_or_update_version( db: Session, package_id: str, artifact_id: str, version: str, version_source: str, user_id: str, ) -> PackageVersion: """ Create a version record for a package-artifact pair. Raises HTTPException 409 if version already exists for this package. """ # Check if version already exists existing = ( db.query(PackageVersion) .filter(PackageVersion.package_id == package_id, PackageVersion.version == version) .first() ) if existing: raise HTTPException( status_code=409, detail=f"Version {version} already exists in this package", ) # Check if artifact already has a version in this package existing_artifact_version = ( db.query(PackageVersion) .filter( PackageVersion.package_id == package_id, PackageVersion.artifact_id == artifact_id, ) .first() ) if existing_artifact_version: # Artifact already has a version, return it return existing_artifact_version # Create new version record pkg_version = PackageVersion( package_id=package_id, artifact_id=artifact_id, version=version, version_source=version_source, created_by=user_id, ) db.add(pkg_version) db.flush() return pkg_version def _create_or_update_tag( db: Session, package_id: str, tag_name: str, new_artifact_id: str, user_id: str, ) -> tuple[Tag, bool, Optional[str]]: """ Create or update a tag, handling ref_count and history. Returns: tuple of (tag, is_new, old_artifact_id) - tag: The created/updated Tag object - is_new: True if tag was created, False if updated - old_artifact_id: Previous artifact_id if tag was updated, None otherwise """ existing_tag = ( db.query(Tag).filter(Tag.package_id == package_id, Tag.name == tag_name).first() ) if existing_tag: old_artifact_id = existing_tag.artifact_id # Only process if artifact actually changed if old_artifact_id != new_artifact_id: # Record history history = TagHistory( tag_id=existing_tag.id, old_artifact_id=old_artifact_id, new_artifact_id=new_artifact_id, change_type="update", changed_by=user_id, ) db.add(history) # Update tag to point to new artifact # NOTE: SQL trigger (tags_ref_count_update_trigger) handles ref_count: # - Decrements old artifact's ref_count # - Increments new artifact's ref_count existing_tag.artifact_id = new_artifact_id existing_tag.created_by = user_id logger.info( f"Tag '{tag_name}' updated: {old_artifact_id[:12]}... -> {new_artifact_id[:12]}..." ) return existing_tag, False, old_artifact_id else: # Same artifact, no change needed return existing_tag, False, None else: # Create new tag new_tag = Tag( package_id=package_id, name=tag_name, artifact_id=new_artifact_id, created_by=user_id, ) db.add(new_tag) db.flush() # Get the tag ID # Record history for creation history = TagHistory( tag_id=new_tag.id, old_artifact_id=None, new_artifact_id=new_artifact_id, change_type="create", changed_by=user_id, ) db.add(history) return new_tag, True, None def _log_audit( db: Session, action: str, resource: str, user_id: str, source_ip: Optional[str] = None, details: Optional[dict] = None, ): """Log an action to the audit_logs table.""" audit_log = AuditLog( action=action, resource=resource, user_id=user_id, source_ip=source_ip, details=details or {}, ) db.add(audit_log) # Health check @router.get("/health", response_model=HealthResponse) def health_check( db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), ): """ Health check endpoint with optional storage and database health verification. """ storage_healthy = None database_healthy = None # Check database connectivity try: db.execute(text("SELECT 1")) database_healthy = True except Exception as e: logger.warning(f"Database health check failed: {e}") database_healthy = False # Check storage connectivity by listing bucket (lightweight operation) try: storage.client.head_bucket(Bucket=storage.bucket) storage_healthy = True except Exception as e: logger.warning(f"Storage health check failed: {e}") storage_healthy = False overall_status = "ok" if (storage_healthy and database_healthy) else "degraded" return HealthResponse( status=overall_status, storage_healthy=storage_healthy, database_healthy=database_healthy, ) # --- Authentication Routes --- from .auth import ( AuthService, get_current_user, get_current_user_optional, require_admin, get_auth_service, SESSION_COOKIE_NAME, verify_password, validate_password_strength, PasswordTooShortError, MIN_PASSWORD_LENGTH, check_project_access, AuthorizationService, ) from .rate_limit import limiter, LOGIN_RATE_LIMIT @router.post("/api/v1/auth/login", response_model=LoginResponse) @limiter.limit(LOGIN_RATE_LIMIT) def login( login_request: LoginRequest, request: Request, response: Response, auth_service: AuthService = Depends(get_auth_service), ): """ Login with username and password. Returns user info and sets a session cookie. """ user = auth_service.authenticate_user( login_request.username, login_request.password ) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid username or password", ) # Create session session, token = auth_service.create_session( user, user_agent=request.headers.get("User-Agent"), ip_address=request.client.host if request.client else None, ) # Update last login auth_service.update_last_login(user) # Set session cookie response.set_cookie( key=SESSION_COOKIE_NAME, value=token, httponly=True, secure=request.url.scheme == "https", samesite="lax", max_age=24 * 60 * 60, # 24 hours ) # Log audit _log_audit( auth_service.db, "auth.login", f"user:{user.username}", user.username, request, {"user_id": str(user.id)}, ) return LoginResponse( id=user.id, username=user.username, email=user.email, is_admin=user.is_admin, must_change_password=user.must_change_password, ) @router.post("/api/v1/auth/logout") def logout( request: Request, response: Response, db: Session = Depends(get_db), session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME), ): """ Logout and invalidate the session. """ if session_token: auth_service = AuthService(db) session = auth_service.get_session_by_token(session_token) if session: auth_service.delete_session(session) # Clear the session cookie response.delete_cookie(key=SESSION_COOKIE_NAME) return {"message": "Logged out successfully"} @router.get("/api/v1/auth/me", response_model=UserResponse) def get_current_user_info( current_user: User = Depends(get_current_user), ): """ Get information about the currently authenticated user. """ return UserResponse( id=current_user.id, username=current_user.username, email=current_user.email, is_admin=current_user.is_admin, is_active=current_user.is_active, must_change_password=current_user.must_change_password, created_at=current_user.created_at, last_login=current_user.last_login, ) @router.post("/api/v1/auth/change-password") def change_password( password_request: ChangePasswordRequest, request: Request, current_user: User = Depends(get_current_user), auth_service: AuthService = Depends(get_auth_service), ): """ Change the current user's password. Requires the current password for verification. """ # Verify current password if not verify_password( password_request.current_password, current_user.password_hash ): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Current password is incorrect", ) # Validate and change password try: auth_service.change_password(current_user, password_request.new_password) except PasswordTooShortError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Password must be at least {MIN_PASSWORD_LENGTH} characters", ) # Log audit _log_audit( auth_service.db, "auth.password_change", f"user:{current_user.username}", current_user.username, request, ) return {"message": "Password changed successfully"} # --- API Key Routes --- @router.post("/api/v1/auth/keys", response_model=APIKeyCreateResponse) def create_api_key( key_request: APIKeyCreate, request: Request, current_user: User = Depends(get_current_user), auth_service: AuthService = Depends(get_auth_service), ): """ Create a new API key for the current user. The key is only returned once - store it securely! """ api_key, key = auth_service.create_api_key( user=current_user, name=key_request.name, description=key_request.description, scopes=key_request.scopes, ) # Log audit _log_audit( auth_service.db, "auth.api_key_create", f"api_key:{api_key.id}", current_user.username, request, {"key_name": key_request.name}, ) return APIKeyCreateResponse( id=api_key.id, name=api_key.name, description=api_key.description, scopes=api_key.scopes, key=key, created_at=api_key.created_at, expires_at=api_key.expires_at, ) @router.get("/api/v1/auth/keys", response_model=List[APIKeyResponse]) def list_api_keys( current_user: User = Depends(get_current_user), auth_service: AuthService = Depends(get_auth_service), ): """ List all API keys for the current user. Does not include the secret key. """ keys = auth_service.list_user_api_keys(current_user) return [ APIKeyResponse( id=k.id, name=k.name, description=k.description, scopes=k.scopes, created_at=k.created_at, expires_at=k.expires_at, last_used=k.last_used, ) for k in keys ] @router.delete("/api/v1/auth/keys/{key_id}") def delete_api_key( key_id: str, request: Request, current_user: User = Depends(get_current_user), auth_service: AuthService = Depends(get_auth_service), ): """ Revoke an API key. Users can only delete their own keys, unless they are an admin. """ api_key = auth_service.get_api_key_by_id(key_id) if not api_key: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="API key not found", ) # Check ownership (admins can delete any key) if api_key.owner_id != current_user.id and not current_user.is_admin: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Cannot delete another user's API key", ) key_name = api_key.name auth_service.delete_api_key(api_key) # Log audit _log_audit( auth_service.db, "auth.api_key_delete", f"api_key:{key_id}", current_user.username, request, {"key_name": key_name}, ) return {"message": "API key deleted successfully"} # --- OIDC Configuration Routes --- @router.get("/api/v1/auth/oidc/status") def get_oidc_status( db: Session = Depends(get_db), ): """ Get OIDC status (public endpoint). Returns whether OIDC is enabled and the issuer URL if so. """ from .auth import OIDCConfigService from .schemas import OIDCStatusResponse oidc_service = OIDCConfigService(db) config = oidc_service.get_config() if config.enabled and config.issuer_url and config.client_id: return OIDCStatusResponse(enabled=True, issuer_url=config.issuer_url) return OIDCStatusResponse(enabled=False) @router.get("/api/v1/auth/oidc/config") def get_oidc_config( current_user: User = Depends(require_admin), db: Session = Depends(get_db), ): """ Get OIDC configuration (admin only). Client secret is not exposed. """ from .auth import OIDCConfigService from .schemas import OIDCConfigResponse oidc_service = OIDCConfigService(db) config = oidc_service.get_config() return OIDCConfigResponse( enabled=config.enabled, issuer_url=config.issuer_url, client_id=config.client_id, has_client_secret=bool(config.client_secret), scopes=config.scopes, auto_create_users=config.auto_create_users, admin_group=config.admin_group, ) @router.put("/api/v1/auth/oidc/config") def update_oidc_config( config_update: "OIDCConfigUpdate", request: Request, current_user: User = Depends(require_admin), db: Session = Depends(get_db), ): """ Update OIDC configuration (admin only). """ from .auth import OIDCConfigService, OIDCConfig from .schemas import OIDCConfigUpdate, OIDCConfigResponse oidc_service = OIDCConfigService(db) current_config = oidc_service.get_config() # Update only provided fields new_config = OIDCConfig( enabled=config_update.enabled if config_update.enabled is not None else current_config.enabled, issuer_url=config_update.issuer_url if config_update.issuer_url is not None else current_config.issuer_url, client_id=config_update.client_id if config_update.client_id is not None else current_config.client_id, client_secret=config_update.client_secret if config_update.client_secret is not None else current_config.client_secret, scopes=config_update.scopes if config_update.scopes is not None else current_config.scopes, auto_create_users=config_update.auto_create_users if config_update.auto_create_users is not None else current_config.auto_create_users, admin_group=config_update.admin_group if config_update.admin_group is not None else current_config.admin_group, ) oidc_service.save_config(new_config) # Log audit _log_audit( db, "auth.oidc_config_update", "oidc_config", current_user.username, request, {"enabled": new_config.enabled, "issuer_url": new_config.issuer_url}, ) return OIDCConfigResponse( enabled=new_config.enabled, issuer_url=new_config.issuer_url, client_id=new_config.client_id, has_client_secret=bool(new_config.client_secret), scopes=new_config.scopes, auto_create_users=new_config.auto_create_users, admin_group=new_config.admin_group, ) @router.get("/api/v1/auth/oidc/login") def oidc_login( request: Request, redirect_uri: Optional[str] = Query(None, description="Override redirect URI"), db: Session = Depends(get_db), ): """ Initiate OIDC login flow. Redirects to the OIDC provider's authorization endpoint. """ from .auth import OIDCConfigService, OIDCService oidc_config_service = OIDCConfigService(db) config = oidc_config_service.get_config() if not config.enabled or not config.issuer_url or not config.client_id: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="OIDC is not configured", ) # Generate state for CSRF protection state = secrets.token_urlsafe(32) # Determine redirect URI if not redirect_uri: # Use the request's base URL base_url = str(request.base_url).rstrip("/") redirect_uri = f"{base_url}/api/v1/auth/oidc/callback" # Store state in session (using a simple cookie for now) oidc_service = OIDCService(db, config) auth_url = oidc_service.get_authorization_url(redirect_uri, state) if not auth_url: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Failed to generate authorization URL", ) # Return redirect response with state cookie response = RedirectResponse(url=auth_url, status_code=status.HTTP_302_FOUND) response.set_cookie( key="oidc_state", value=state, httponly=True, secure=request.url.scheme == "https", samesite="lax", max_age=600, # 10 minutes ) response.set_cookie( key="oidc_redirect_uri", value=redirect_uri, httponly=True, secure=request.url.scheme == "https", samesite="lax", max_age=600, ) return response @router.get("/api/v1/auth/oidc/callback") def oidc_callback( request: Request, code: str = Query(..., description="Authorization code"), state: str = Query(..., description="State parameter"), oidc_state: Optional[str] = Cookie(None), oidc_redirect_uri: Optional[str] = Cookie(None), db: Session = Depends(get_db), ): """ Handle OIDC callback after user authenticates. Exchanges the authorization code for tokens and creates a session. """ from .auth import OIDCConfigService, OIDCService, AuthService # Verify state if not oidc_state or state != oidc_state: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid state parameter", ) oidc_config_service = OIDCConfigService(db) config = oidc_config_service.get_config() if not config.enabled: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="OIDC is not configured", ) # Determine redirect URI (must match what was used in login) if not oidc_redirect_uri: base_url = str(request.base_url).rstrip("/") oidc_redirect_uri = f"{base_url}/api/v1/auth/oidc/callback" oidc_service = OIDCService(db, config) # Exchange code for tokens tokens = oidc_service.exchange_code_for_tokens(code, oidc_redirect_uri) if not tokens: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Failed to exchange authorization code", ) id_token = tokens.get("id_token") if not id_token: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="No ID token in response", ) # Validate ID token claims = oidc_service.validate_id_token(id_token) if not claims: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid ID token", ) # Get or create user user = oidc_service.get_or_create_user(claims) if not user: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User creation not allowed", ) if not user.is_active: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is disabled", ) # Create session auth_service = AuthService(db) session, token = auth_service.create_session( user, user_agent=request.headers.get("User-Agent"), ip_address=request.client.host if request.client else None, ) # Log audit _log_audit( db, "auth.oidc_login", f"user:{user.id}", user.username, request, {"oidc_subject": claims.get("sub")}, ) # Redirect to frontend with session cookie response = RedirectResponse(url="/", status_code=status.HTTP_302_FOUND) response.set_cookie( key=SESSION_COOKIE_NAME, value=token, httponly=True, secure=request.url.scheme == "https", samesite="lax", max_age=SESSION_DURATION_HOURS * 3600, ) # Clear OIDC state cookies response.delete_cookie("oidc_state") response.delete_cookie("oidc_redirect_uri") return response # --- Admin User Management Routes --- @router.get("/api/v1/admin/users", response_model=List[UserResponse]) def list_users( include_inactive: bool = Query(default=False), current_user: User = Depends(require_admin), auth_service: AuthService = Depends(get_auth_service), ): """ List all users (admin only). """ users = auth_service.list_users(include_inactive=include_inactive) return [ UserResponse( id=u.id, username=u.username, email=u.email, is_admin=u.is_admin, is_active=u.is_active, must_change_password=u.must_change_password, created_at=u.created_at, last_login=u.last_login, ) for u in users ] @router.post("/api/v1/admin/users", response_model=UserResponse) def create_user( user_create: UserCreate, request: Request, current_user: User = Depends(require_admin), auth_service: AuthService = Depends(get_auth_service), ): """ Create a new user (admin only). """ # Check if username already exists existing = auth_service.get_user_by_username(user_create.username) if existing: raise HTTPException( status_code=status.HTTP_409_CONFLICT, detail="Username already exists", ) # Validate password strength try: validate_password_strength(user_create.password) except PasswordTooShortError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Password must be at least {MIN_PASSWORD_LENGTH} characters", ) user = auth_service.create_user( username=user_create.username, password=user_create.password, email=user_create.email, is_admin=user_create.is_admin, ) # Log audit _log_audit( auth_service.db, "admin.user_create", f"user:{user.username}", current_user.username, request, {"new_user": user_create.username, "is_admin": user_create.is_admin}, ) return UserResponse( id=user.id, username=user.username, email=user.email, is_admin=user.is_admin, is_active=user.is_active, must_change_password=user.must_change_password, created_at=user.created_at, last_login=user.last_login, ) @router.get("/api/v1/admin/users/{username}", response_model=UserResponse) def get_user( username: str, current_user: User = Depends(require_admin), auth_service: AuthService = Depends(get_auth_service), ): """ Get a specific user by username (admin only). """ user = auth_service.get_user_by_username(username) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) return UserResponse( id=user.id, username=user.username, email=user.email, is_admin=user.is_admin, is_active=user.is_active, must_change_password=user.must_change_password, created_at=user.created_at, last_login=user.last_login, ) @router.put("/api/v1/admin/users/{username}", response_model=UserResponse) def update_user( username: str, user_update: UserUpdate, request: Request, current_user: User = Depends(require_admin), auth_service: AuthService = Depends(get_auth_service), ): """ Update a user (admin only). """ user = auth_service.get_user_by_username(username) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Prevent removing the last admin if user_update.is_admin is False and user.is_admin: admin_count = ( auth_service.db.query(User) .filter(User.is_admin.is_(True), User.is_active.is_(True)) .count() ) if admin_count <= 1: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Cannot remove the last admin", ) # Update fields if user_update.email is not None: user.email = user_update.email if user_update.is_admin is not None: user.is_admin = user_update.is_admin if user_update.is_active is not None: user.is_active = user_update.is_active auth_service.db.commit() # Log audit _log_audit( auth_service.db, "admin.user_update", f"user:{username}", current_user.username, request, {"updates": user_update.model_dump(exclude_none=True)}, ) return UserResponse( id=user.id, username=user.username, email=user.email, is_admin=user.is_admin, is_active=user.is_active, must_change_password=user.must_change_password, created_at=user.created_at, last_login=user.last_login, ) @router.post("/api/v1/admin/users/{username}/reset-password") def reset_user_password( username: str, reset_request: ResetPasswordRequest, request: Request, current_user: User = Depends(require_admin), auth_service: AuthService = Depends(get_auth_service), ): """ Reset a user's password (admin only). Sets must_change_password to True. """ user = auth_service.get_user_by_username(username) if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) try: auth_service.reset_user_password(user, reset_request.new_password) except PasswordTooShortError: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=f"Password must be at least {MIN_PASSWORD_LENGTH} characters", ) # Log audit _log_audit( auth_service.db, "admin.password_reset", f"user:{username}", current_user.username, request, ) return {"message": f"Password reset for user {username}"} # Global search @router.get("/api/v1/search", response_model=GlobalSearchResponse) def global_search( request: Request, q: str = Query(..., min_length=1, description="Search query"), limit: int = Query(default=5, ge=1, le=20, description="Results per type"), db: Session = Depends(get_db), ): """ Search across all entity types (projects, packages, artifacts/tags). Returns limited results for each type plus total counts. """ user_id = get_user_id(request) search_lower = q.lower() # Search projects (name and description) project_query = db.query(Project).filter( or_(Project.is_public == True, Project.created_by == user_id), or_( func.lower(Project.name).contains(search_lower), func.lower(Project.description).contains(search_lower), ), ) project_count = project_query.count() projects = project_query.order_by(Project.name).limit(limit).all() # Search packages (name and description) with project name package_query = ( db.query(Package, Project.name.label("project_name")) .join(Project, Package.project_id == Project.id) .filter( or_(Project.is_public == True, Project.created_by == user_id), or_( func.lower(Package.name).contains(search_lower), func.lower(Package.description).contains(search_lower), ), ) ) package_count = package_query.count() package_results = package_query.order_by(Package.name).limit(limit).all() # Search tags/artifacts (tag name and original filename) artifact_query = ( db.query( Tag, Artifact, Package.name.label("package_name"), Project.name.label("project_name"), ) .join(Artifact, Tag.artifact_id == Artifact.id) .join(Package, Tag.package_id == Package.id) .join(Project, Package.project_id == Project.id) .filter( or_(Project.is_public == True, Project.created_by == user_id), or_( func.lower(Tag.name).contains(search_lower), func.lower(Artifact.original_name).contains(search_lower), ), ) ) artifact_count = artifact_query.count() artifact_results = artifact_query.order_by(Tag.name).limit(limit).all() return GlobalSearchResponse( query=q, projects=[ SearchResultProject( id=p.id, name=p.name, description=p.description, is_public=p.is_public ) for p in projects ], packages=[ SearchResultPackage( id=pkg.id, project_id=pkg.project_id, project_name=project_name, name=pkg.name, description=pkg.description, format=pkg.format, ) for pkg, project_name in package_results ], artifacts=[ SearchResultArtifact( tag_id=tag.id, tag_name=tag.name, artifact_id=artifact.id, package_id=tag.package_id, package_name=package_name, project_name=project_name, original_name=artifact.original_name, ) for tag, artifact, package_name, project_name in artifact_results ], counts={ "projects": project_count, "packages": package_count, "artifacts": artifact_count, "total": project_count + package_count + artifact_count, }, ) # Project routes @router.get("/api/v1/projects", response_model=PaginatedResponse[ProjectWithAccessResponse]) def list_projects( request: Request, page: int = Query(default=1, ge=1, description="Page number"), limit: int = Query(default=20, ge=1, le=100, description="Items per page"), search: Optional[str] = Query( default=None, description="Search by project name or description" ), visibility: Optional[str] = Query( default=None, description="Filter by visibility (public, private)" ), sort: str = Query( default="name", description="Sort field (name, created_at, updated_at)" ), order: str = Query(default="asc", description="Sort order (asc, desc)"), db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user_optional), ): user_id = current_user.username if current_user else get_user_id(request) # Validate sort field valid_sort_fields = { "name": Project.name, "created_at": Project.created_at, "updated_at": Project.updated_at, } if sort not in valid_sort_fields: raise HTTPException( status_code=400, detail=f"Invalid sort field. Must be one of: {', '.join(valid_sort_fields.keys())}", ) # Validate order if order not in ("asc", "desc"): raise HTTPException( status_code=400, detail="Invalid order. Must be 'asc' or 'desc'" ) # Base query - filter by access query = db.query(Project).filter( or_(Project.is_public == True, Project.created_by == user_id) ) # Apply visibility filter if visibility == "public": query = query.filter(Project.is_public == True) elif visibility == "private": query = query.filter(Project.is_public == False, Project.created_by == user_id) # Apply search filter (case-insensitive on name and description) if search: search_lower = search.lower() query = query.filter( or_( func.lower(Project.name).contains(search_lower), func.lower(Project.description).contains(search_lower), ) ) # Get total count before pagination total = query.count() # Apply sorting sort_column = valid_sort_fields[sort] if order == "desc": query = query.order_by(sort_column.desc()) else: query = query.order_by(sort_column.asc()) # Apply pagination offset = (page - 1) * limit projects = query.offset(offset).limit(limit).all() # Calculate total pages total_pages = math.ceil(total / limit) if total > 0 else 1 # Build access level info for each project project_ids = [p.id for p in projects] access_map = {} if current_user and project_ids: # Get access permissions for this user across these projects permissions = ( db.query(AccessPermission) .filter( AccessPermission.project_id.in_(project_ids), AccessPermission.user_id == current_user.username, ) .all() ) access_map = {p.project_id: p.level for p in permissions} # Build response with access levels items = [] for p in projects: is_owner = p.created_by == user_id access_level = None if is_owner: access_level = "admin" elif p.id in access_map: access_level = access_map[p.id] elif p.is_public: access_level = "read" items.append( ProjectWithAccessResponse( id=p.id, name=p.name, description=p.description, is_public=p.is_public, created_at=p.created_at, updated_at=p.updated_at, created_by=p.created_by, access_level=access_level, is_owner=is_owner, ) ) return PaginatedResponse( items=items, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) @router.post("/api/v1/projects", response_model=ProjectResponse) def create_project( project: ProjectCreate, request: Request, db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user_optional), ): user_id = get_user_id_from_request(request, db, current_user) existing = db.query(Project).filter(Project.name == project.name).first() if existing: raise HTTPException(status_code=400, detail="Project already exists") db_project = Project( name=project.name, description=project.description, is_public=project.is_public, created_by=user_id, ) db.add(db_project) # Audit log _log_audit( db=db, action="project.create", resource=f"project/{project.name}", user_id=user_id, source_ip=request.client.host if request.client else None, details={"is_public": project.is_public}, ) db.commit() db.refresh(db_project) return db_project @router.get("/api/v1/projects/{project_name}", response_model=ProjectResponse) def get_project( project_name: str, db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user_optional), ): """Get a single project by name. Requires read access for private projects.""" project = check_project_access(db, project_name, current_user, "read") return project @router.put("/api/v1/projects/{project_name}", response_model=ProjectResponse) def update_project( project_name: str, project_update: ProjectUpdate, request: Request, db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user_optional), ): """Update a project's metadata. Requires admin access.""" project = check_project_access(db, project_name, current_user, "admin") user_id = current_user.username if current_user else get_user_id(request) # Track changes for audit log changes = {} if ( project_update.description is not None and project_update.description != project.description ): changes["description"] = { "old": project.description, "new": project_update.description, } project.description = project_update.description if ( project_update.is_public is not None and project_update.is_public != project.is_public ): changes["is_public"] = { "old": project.is_public, "new": project_update.is_public, } project.is_public = project_update.is_public if not changes: # No changes, return current project return project # Audit log _log_audit( db=db, action="project.update", resource=f"project/{project_name}", user_id=user_id, source_ip=request.client.host if request.client else None, details={"changes": changes}, ) db.commit() db.refresh(project) return project @router.delete("/api/v1/projects/{project_name}", status_code=204) def delete_project( project_name: str, request: Request, db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user_optional), ): """ Delete a project and all its packages. Requires admin access. Decrements ref_count for all artifacts referenced by tags in all packages within this project. """ check_project_access(db, project_name, current_user, "admin") user_id = current_user.username if current_user else get_user_id(request) project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") # Get counts for logging packages = db.query(Package).filter(Package.project_id == project.id).all() package_count = len(packages) total_tags = 0 artifact_ids = set() for package in packages: tags = db.query(Tag).filter(Tag.package_id == package.id).all() total_tags += len(tags) for tag in tags: artifact_ids.add(tag.artifact_id) logger.info( f"Project '{project_name}' deletion: {package_count} packages, " f"{total_tags} tags affecting {len(artifact_ids)} artifacts" ) # Delete the project (cascade will delete packages, tags, etc.) # NOTE: SQL triggers (tags_ref_count_delete_trigger) handle ref_count automatically db.delete(project) db.commit() # Audit log (after commit) _log_audit( db, action="project.delete", resource=f"project/{project_name}", user_id=user_id, source_ip=request.client.host if request.client else None, details={ "packages_deleted": package_count, "tags_deleted": total_tags, "artifacts_affected": list(artifact_ids), }, ) db.commit() return None # Access Permission routes @router.get( "/api/v1/project/{project_name}/permissions", response_model=List[AccessPermissionResponse], ) def list_project_permissions( project_name: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ List all access permissions for a project. Requires admin access to the project. """ project = check_project_access(db, project_name, current_user, "admin") auth_service = AuthorizationService(db) permissions = auth_service.list_project_permissions(str(project.id)) return permissions @router.post( "/api/v1/project/{project_name}/permissions", response_model=AccessPermissionResponse, ) def grant_project_access( project_name: str, permission: AccessPermissionCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Grant access to a user for a project. Requires admin access to the project. """ project = check_project_access(db, project_name, current_user, "admin") auth_service = AuthorizationService(db) new_permission = auth_service.grant_access( str(project.id), permission.username, permission.level, permission.expires_at, ) return new_permission @router.put( "/api/v1/project/{project_name}/permissions/{username}", response_model=AccessPermissionResponse, ) def update_project_access( project_name: str, username: str, permission: AccessPermissionUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Update a user's access level for a project. Requires admin access to the project. """ project = check_project_access(db, project_name, current_user, "admin") auth_service = AuthorizationService(db) # Get existing permission from .models import AccessPermission existing = ( db.query(AccessPermission) .filter( AccessPermission.project_id == project.id, AccessPermission.user_id == username, ) .first() ) if not existing: raise HTTPException( status_code=404, detail=f"No access permission found for user '{username}'", ) # Update fields if permission.level is not None: existing.level = permission.level if permission.expires_at is not None: existing.expires_at = permission.expires_at db.commit() db.refresh(existing) return existing @router.delete( "/api/v1/project/{project_name}/permissions/{username}", status_code=204, ) def revoke_project_access( project_name: str, username: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user), ): """ Revoke a user's access to a project. Requires admin access to the project. """ project = check_project_access(db, project_name, current_user, "admin") auth_service = AuthorizationService(db) deleted = auth_service.revoke_access(str(project.id), username) if not deleted: raise HTTPException( status_code=404, detail=f"No access permission found for user '{username}'", ) return None @router.get( "/api/v1/project/{project_name}/my-access", ) def get_my_project_access( project_name: str, db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user_optional), ): """ Get the current user's access level for a project. Returns null for anonymous users on private projects. """ from .models import Project project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") auth_service = AuthorizationService(db) access_level = auth_service.get_user_access_level(str(project.id), current_user) return { "project": project_name, "access_level": access_level, "is_owner": current_user and project.created_by == current_user.username, } # Package routes @router.get( "/api/v1/project/{project_name}/packages", response_model=PaginatedResponse[PackageDetailResponse], ) def list_packages( project_name: str, page: int = Query(default=1, ge=1, description="Page number"), limit: int = Query(default=20, ge=1, le=100, description="Items per page"), search: Optional[str] = Query( default=None, description="Search by name or description" ), sort: str = Query( default="name", description="Sort field (name, created_at, updated_at)" ), order: str = Query(default="asc", description="Sort order (asc, desc)"), format: Optional[str] = Query(default=None, description="Filter by package format"), platform: Optional[str] = Query(default=None, description="Filter by platform"), db: Session = Depends(get_db), ): project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") # Validate sort field valid_sort_fields = { "name": Package.name, "created_at": Package.created_at, "updated_at": Package.updated_at, } if sort not in valid_sort_fields: raise HTTPException( status_code=400, detail=f"Invalid sort field. Must be one of: {', '.join(valid_sort_fields.keys())}", ) # Validate order if order not in ("asc", "desc"): raise HTTPException( status_code=400, detail="Invalid order. Must be 'asc' or 'desc'" ) # Validate format filter if format and format not in PACKAGE_FORMATS: raise HTTPException( status_code=400, detail=f"Invalid format. Must be one of: {', '.join(PACKAGE_FORMATS)}", ) # Validate platform filter if platform and platform not in PACKAGE_PLATFORMS: raise HTTPException( status_code=400, detail=f"Invalid platform. Must be one of: {', '.join(PACKAGE_PLATFORMS)}", ) # Base query query = db.query(Package).filter(Package.project_id == project.id) # Apply search filter (case-insensitive on name and description) if search: search_lower = search.lower() query = query.filter( or_( func.lower(Package.name).contains(search_lower), func.lower(Package.description).contains(search_lower), ) ) # Apply format filter if format: query = query.filter(Package.format == format) # Apply platform filter if platform: query = query.filter(Package.platform == platform) # Get total count before pagination total = query.count() # Apply sorting sort_column = valid_sort_fields[sort] if order == "desc": query = query.order_by(sort_column.desc()) else: query = query.order_by(sort_column.asc()) # Apply pagination offset = (page - 1) * limit packages = query.offset(offset).limit(limit).all() # Calculate total pages total_pages = math.ceil(total / limit) if total > 0 else 1 # Build detailed responses with aggregated data detailed_packages = [] for pkg in packages: # Get tag count tag_count = ( db.query(func.count(Tag.id)).filter(Tag.package_id == pkg.id).scalar() or 0 ) # Get unique artifact count and total size via uploads artifact_stats = ( db.query( func.count(func.distinct(Upload.artifact_id)), func.coalesce(func.sum(Artifact.size), 0), ) .join(Artifact, Upload.artifact_id == Artifact.id) .filter(Upload.package_id == pkg.id) .first() ) artifact_count = artifact_stats[0] if artifact_stats else 0 total_size = artifact_stats[1] if artifact_stats else 0 # Get latest tag latest_tag_obj = ( db.query(Tag) .filter(Tag.package_id == pkg.id) .order_by(Tag.created_at.desc()) .first() ) latest_tag = latest_tag_obj.name if latest_tag_obj else None # Get latest upload timestamp latest_upload = ( db.query(func.max(Upload.uploaded_at)) .filter(Upload.package_id == pkg.id) .scalar() ) # Get recent tags (limit 5) recent_tags_objs = ( db.query(Tag) .filter(Tag.package_id == pkg.id) .order_by(Tag.created_at.desc()) .limit(5) .all() ) recent_tags = [ TagSummary(name=t.name, artifact_id=t.artifact_id, created_at=t.created_at) for t in recent_tags_objs ] detailed_packages.append( PackageDetailResponse( id=pkg.id, project_id=pkg.project_id, name=pkg.name, description=pkg.description, format=pkg.format, platform=pkg.platform, created_at=pkg.created_at, updated_at=pkg.updated_at, tag_count=tag_count, artifact_count=artifact_count, total_size=total_size, latest_tag=latest_tag, latest_upload_at=latest_upload, recent_tags=recent_tags, ) ) return PaginatedResponse( items=detailed_packages, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) @router.get( "/api/v1/project/{project_name}/packages/{package_name}", response_model=PackageDetailResponse, ) def get_package( project_name: str, package_name: str, include_tags: bool = Query( default=False, description="Include all tags (not just recent 5)" ), db: Session = Depends(get_db), ): """Get a single package with full metadata""" project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") pkg = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not pkg: raise HTTPException(status_code=404, detail="Package not found") # Get tag count tag_count = ( db.query(func.count(Tag.id)).filter(Tag.package_id == pkg.id).scalar() or 0 ) # Get unique artifact count and total size via uploads artifact_stats = ( db.query( func.count(func.distinct(Upload.artifact_id)), func.coalesce(func.sum(Artifact.size), 0), ) .join(Artifact, Upload.artifact_id == Artifact.id) .filter(Upload.package_id == pkg.id) .first() ) artifact_count = artifact_stats[0] if artifact_stats else 0 total_size = artifact_stats[1] if artifact_stats else 0 # Get latest tag latest_tag_obj = ( db.query(Tag) .filter(Tag.package_id == pkg.id) .order_by(Tag.created_at.desc()) .first() ) latest_tag = latest_tag_obj.name if latest_tag_obj else None # Get latest upload timestamp latest_upload = ( db.query(func.max(Upload.uploaded_at)) .filter(Upload.package_id == pkg.id) .scalar() ) # Get tags (all if include_tags=true, else limit 5) tags_query = ( db.query(Tag).filter(Tag.package_id == pkg.id).order_by(Tag.created_at.desc()) ) if not include_tags: tags_query = tags_query.limit(5) tags_objs = tags_query.all() recent_tags = [ TagSummary(name=t.name, artifact_id=t.artifact_id, created_at=t.created_at) for t in tags_objs ] return PackageDetailResponse( id=pkg.id, project_id=pkg.project_id, name=pkg.name, description=pkg.description, format=pkg.format, platform=pkg.platform, created_at=pkg.created_at, updated_at=pkg.updated_at, tag_count=tag_count, artifact_count=artifact_count, total_size=total_size, latest_tag=latest_tag, latest_upload_at=latest_upload, recent_tags=recent_tags, ) @router.post("/api/v1/project/{project_name}/packages", response_model=PackageResponse) def create_package( project_name: str, package: PackageCreate, request: Request, db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user_optional), ): """Create a new package in a project. Requires write access.""" project = check_project_access(db, project_name, current_user, "write") # Validate format if package.format not in PACKAGE_FORMATS: raise HTTPException( status_code=400, detail=f"Invalid format. Must be one of: {', '.join(PACKAGE_FORMATS)}", ) # Validate platform if package.platform not in PACKAGE_PLATFORMS: raise HTTPException( status_code=400, detail=f"Invalid platform. Must be one of: {', '.join(PACKAGE_PLATFORMS)}", ) existing = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package.name) .first() ) if existing: raise HTTPException( status_code=400, detail="Package already exists in this project" ) db_package = Package( project_id=project.id, name=package.name, description=package.description, format=package.format, platform=package.platform, ) db.add(db_package) # Audit log _log_audit( db=db, action="package.create", resource=f"project/{project_name}/{package.name}", user_id=get_user_id(request), source_ip=request.client.host if request.client else None, details={"format": package.format, "platform": package.platform}, ) db.commit() db.refresh(db_package) return db_package @router.put( "/api/v1/project/{project_name}/packages/{package_name}", response_model=PackageResponse, ) def update_package( project_name: str, package_name: str, package_update: PackageUpdate, request: Request, db: Session = Depends(get_db), ): """Update a package's metadata.""" user_id = get_user_id(request) project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Validate format and platform if provided if ( package_update.format is not None and package_update.format not in PACKAGE_FORMATS ): raise HTTPException( status_code=400, detail=f"Invalid format. Must be one of: {', '.join(PACKAGE_FORMATS)}", ) if ( package_update.platform is not None and package_update.platform not in PACKAGE_PLATFORMS ): raise HTTPException( status_code=400, detail=f"Invalid platform. Must be one of: {', '.join(PACKAGE_PLATFORMS)}", ) # Track changes for audit log changes = {} if ( package_update.description is not None and package_update.description != package.description ): changes["description"] = { "old": package.description, "new": package_update.description, } package.description = package_update.description if package_update.format is not None and package_update.format != package.format: changes["format"] = {"old": package.format, "new": package_update.format} package.format = package_update.format if ( package_update.platform is not None and package_update.platform != package.platform ): changes["platform"] = {"old": package.platform, "new": package_update.platform} package.platform = package_update.platform if not changes: # No changes, return current package return package # Audit log _log_audit( db=db, action="package.update", resource=f"project/{project_name}/{package_name}", user_id=user_id, source_ip=request.client.host if request.client else None, details={"changes": changes}, ) db.commit() db.refresh(package) return package @router.delete( "/api/v1/project/{project_name}/packages/{package_name}", status_code=204, ) def delete_package( project_name: str, package_name: str, request: Request, db: Session = Depends(get_db), ): """ Delete a package and all its tags. Decrements ref_count for all artifacts referenced by tags in this package. The package's uploads records are preserved for audit purposes but will have null package_id after cascade. """ user_id = get_user_id(request) project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Get tags count and affected artifacts for logging tags = db.query(Tag).filter(Tag.package_id == package.id).all() artifact_ids = list(set(tag.artifact_id for tag in tags)) tag_count = len(tags) logger.info( f"Package '{package_name}' deletion: {tag_count} tags affecting " f"{len(artifact_ids)} artifacts" ) # Delete the package (cascade will delete tags, which triggers ref_count decrements) # NOTE: SQL triggers (tags_ref_count_delete_trigger) handle ref_count automatically db.delete(package) db.commit() # Audit log (after commit) _log_audit( db, action="package.delete", resource=f"project/{project_name}/{package_name}", user_id=user_id, source_ip=request.client.host if request.client else None, details={ "tags_deleted": tag_count, "artifacts_affected": artifact_ids, }, ) db.commit() return None # Upload artifact @router.post( "/api/v1/project/{project_name}/{package_name}/upload", response_model=UploadResponse, ) def upload_artifact( project_name: str, package_name: str, request: Request, file: UploadFile = File(...), tag: Optional[str] = Form(None), version: Optional[str] = Form(None), db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), content_length: Optional[int] = Header(None, alias="Content-Length"), user_agent: Optional[str] = Header(None, alias="User-Agent"), client_checksum: Optional[str] = Header(None, alias="X-Checksum-SHA256"), current_user: Optional[User] = Depends(get_current_user_optional), ): """ Upload an artifact to a package. Headers: - X-Checksum-SHA256: Optional client-provided SHA256 for verification - User-Agent: Captured for audit purposes - Authorization: Bearer for authentication """ start_time = time.time() settings = get_settings() storage_result = None # Check authorization (write access required for uploads) project = check_project_access(db, project_name, current_user, "write") user_id = current_user.username if current_user else get_user_id_from_request(request, db, current_user) package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Validate file size if content_length is not None: if content_length > settings.max_file_size: raise HTTPException( status_code=413, detail=f"File too large. Maximum size is {settings.max_file_size // (1024 * 1024 * 1024)}GB", ) if content_length < settings.min_file_size: raise HTTPException( status_code=422, detail="Empty files are not allowed", ) # Validate client checksum format if provided if client_checksum: client_checksum = client_checksum.lower().strip() if len(client_checksum) != 64 or not all( c in "0123456789abcdef" for c in client_checksum ): raise HTTPException( status_code=400, detail="Invalid X-Checksum-SHA256 header. Must be 64 hex characters.", ) # Extract format-specific metadata before storing file_metadata = {} if file.filename: # Read file into memory for metadata extraction file_content = file.file.read() file.file.seek(0) # Extract metadata file_metadata = extract_metadata( io.BytesIO(file_content), file.filename, file.content_type ) # Detect version (explicit > metadata > filename) detected_version, version_source = _detect_version( version, file_metadata, file.filename or "" ) # Store file (uses multipart for large files) with error handling try: storage_result = storage.store(file.file, content_length) except HashComputationError as e: logger.error(f"Hash computation failed during upload: {e}") raise HTTPException( status_code=422, detail=f"Failed to process file: hash computation error - {str(e)}", ) except S3ExistenceCheckError as e: logger.error(f"S3 existence check failed during upload: {e}") raise HTTPException( status_code=503, detail="Storage service temporarily unavailable. Please retry.", ) except S3UploadError as e: logger.error(f"S3 upload failed: {e}") raise HTTPException( status_code=503, detail="Storage service temporarily unavailable. Please retry.", ) except S3StorageUnavailableError as e: logger.error(f"S3 storage unavailable: {e}") raise HTTPException( status_code=503, detail="Storage backend is unavailable. Please retry later.", ) except HashCollisionError as e: # This is extremely rare - log critical alert logger.critical(f"HASH COLLISION DETECTED: {e}") raise HTTPException( status_code=500, detail="Data integrity error detected. Please contact support.", ) except FileSizeExceededError as e: logger.warning(f"File size exceeded during upload: {e}") raise HTTPException( status_code=413, detail=f"File too large. Maximum size is {settings.max_file_size // (1024 * 1024 * 1024)}GB", ) except StorageError as e: logger.error(f"Storage error during upload: {e}") raise HTTPException(status_code=500, detail="Internal storage error") # Verify client-provided checksum if present checksum_verified = True if client_checksum and client_checksum != storage_result.sha256: # Checksum mismatch - clean up S3 object if it was newly uploaded logger.warning( f"Client checksum mismatch: expected {client_checksum}, got {storage_result.sha256}" ) # Attempt cleanup of the uploaded object try: if not storage_result.already_existed: storage.delete(storage_result.s3_key) logger.info( f"Cleaned up S3 object after checksum mismatch: {storage_result.s3_key}" ) except Exception as cleanup_error: logger.error( f"Failed to clean up S3 object after checksum mismatch: {cleanup_error}" ) raise HTTPException( status_code=422, detail=f"Checksum verification failed. Expected {client_checksum}, got {storage_result.sha256}", ) # Verify S3 object exists and size matches before proceeding try: s3_info = storage.get_object_info(storage_result.s3_key) if s3_info is None: raise HTTPException( status_code=500, detail="Failed to verify uploaded object in storage", ) if s3_info.get("size") != storage_result.size: logger.error( f"Size mismatch after upload: expected {storage_result.size}, " f"got {s3_info.get('size')}" ) raise HTTPException( status_code=500, detail="Upload verification failed: size mismatch", ) except HTTPException: raise except Exception as e: logger.error(f"Failed to verify S3 object: {e}") raise HTTPException( status_code=500, detail="Failed to verify uploaded object", ) # Check if this is a deduplicated upload deduplicated = False saved_bytes = 0 # Create or update artifact record # Use with_for_update() to lock the row and prevent race conditions artifact = ( db.query(Artifact) .filter(Artifact.id == storage_result.sha256) .with_for_update() .first() ) if artifact: # Artifact exists - this is a deduplicated upload # NOTE: ref_count is managed by SQL triggers on tag INSERT/DELETE # We don't manually increment here - the tag creation will trigger the increment deduplicated = True saved_bytes = storage_result.size # Merge metadata if new metadata was extracted if file_metadata and artifact.artifact_metadata: artifact.artifact_metadata = {**artifact.artifact_metadata, **file_metadata} elif file_metadata: artifact.artifact_metadata = file_metadata # Update checksums if not already set if not artifact.checksum_md5 and storage_result.md5: artifact.checksum_md5 = storage_result.md5 if not artifact.checksum_sha1 and storage_result.sha1: artifact.checksum_sha1 = storage_result.sha1 if not artifact.s3_etag and storage_result.s3_etag: artifact.s3_etag = storage_result.s3_etag # Refresh to get updated ref_count db.refresh(artifact) else: # Create new artifact with ref_count=0 # NOTE: ref_count is managed by SQL triggers on tag INSERT/DELETE # When a tag is created for this artifact, the trigger will increment ref_count artifact = Artifact( id=storage_result.sha256, size=storage_result.size, content_type=file.content_type, original_name=file.filename, checksum_md5=storage_result.md5, checksum_sha1=storage_result.sha1, s3_etag=storage_result.s3_etag, created_by=user_id, s3_key=storage_result.s3_key, artifact_metadata=file_metadata or {}, ref_count=0, # Triggers will manage this ) db.add(artifact) # Calculate upload duration duration_ms = int((time.time() - start_time) * 1000) # Record upload with enhanced metadata upload = Upload( artifact_id=storage_result.sha256, package_id=package.id, original_name=file.filename, tag_name=tag, user_agent=user_agent[:512] if user_agent else None, # Truncate if too long duration_ms=duration_ms, deduplicated=deduplicated, checksum_verified=checksum_verified, client_checksum=client_checksum, status="completed", uploaded_by=user_id, source_ip=request.client.host if request.client else None, ) db.add(upload) db.flush() # Flush to get upload ID # Create or update tag if provided (with ref_count management and history) if tag: _create_or_update_tag(db, package.id, tag, storage_result.sha256, user_id) # Create version record if version was detected pkg_version = None if detected_version: try: pkg_version = _create_or_update_version( db, package.id, storage_result.sha256, detected_version, version_source, user_id ) except HTTPException as e: # Version conflict (409) - log but don't fail the upload if e.status_code == 409: logger.warning( f"Version {detected_version} already exists for package {package_name}, " f"upload continues without version assignment" ) detected_version = None version_source = None else: raise # Log deduplication event if deduplicated: logger.info( f"Deduplication: artifact {storage_result.sha256[:12]}... " f"ref_count={artifact.ref_count}, saved_bytes={saved_bytes}" ) # Audit log _log_audit( db, action="artifact.upload", resource=f"project/{project_name}/{package_name}/artifact/{storage_result.sha256[:12]}", user_id=user_id, source_ip=request.client.host if request.client else None, details={ "artifact_id": storage_result.sha256, "size": storage_result.size, "deduplicated": deduplicated, "saved_bytes": saved_bytes, "tag": tag, "duration_ms": duration_ms, "client_checksum_provided": client_checksum is not None, }, ) # Commit with cleanup on failure try: db.commit() except Exception as commit_error: logger.error(f"Database commit failed after upload: {commit_error}") db.rollback() # Attempt to clean up newly uploaded S3 object if storage_result and not storage_result.already_existed: try: storage.delete(storage_result.s3_key) logger.info( f"Cleaned up S3 object after commit failure: {storage_result.s3_key}" ) except Exception as cleanup_error: logger.error( f"Failed to clean up S3 object after commit failure: {cleanup_error}" ) raise HTTPException( status_code=500, detail="Failed to save upload record. Please retry.", ) return UploadResponse( artifact_id=storage_result.sha256, sha256=storage_result.sha256, size=storage_result.size, project=project_name, package=package_name, tag=tag, version=detected_version, version_source=version_source, checksum_md5=storage_result.md5, checksum_sha1=storage_result.sha1, s3_etag=storage_result.s3_etag, format_metadata=artifact.artifact_metadata, deduplicated=deduplicated, ref_count=artifact.ref_count, upload_id=upload.id, content_type=artifact.content_type, original_name=artifact.original_name, created_at=artifact.created_at, ) # Resumable upload endpoints @router.post( "/api/v1/project/{project_name}/{package_name}/upload/init", response_model=ResumableUploadInitResponse, ) def init_resumable_upload( project_name: str, package_name: str, init_request: ResumableUploadInitRequest, request: Request, db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), ): """ Initialize a resumable upload session. Client must provide the SHA256 hash of the file in advance. """ user_id = get_user_id(request) # Validate project and package project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Validate file size settings = get_settings() if init_request.size > settings.max_file_size: raise HTTPException( status_code=413, detail=f"File too large. Maximum size is {settings.max_file_size // (1024 * 1024 * 1024)}GB", ) if init_request.size < settings.min_file_size: raise HTTPException( status_code=422, detail="Empty files are not allowed", ) # Check if artifact already exists (deduplication) existing_artifact = ( db.query(Artifact).filter(Artifact.id == init_request.expected_hash).first() ) if existing_artifact: # File already exists - deduplicated upload # NOTE: ref_count is managed by SQL triggers on tag INSERT/DELETE/UPDATE # We do NOT manually increment here because: # 1. If a tag is provided, _create_or_update_tag will create/update a tag # and the SQL trigger will handle ref_count # 2. If no tag is provided, ref_count shouldn't change (no new reference) # Record the upload upload = Upload( artifact_id=init_request.expected_hash, package_id=package.id, original_name=init_request.filename, uploaded_by=user_id, source_ip=request.client.host if request.client else None, deduplicated=True, ) db.add(upload) # Create or update tag if provided (with ref_count management and history) if init_request.tag: _create_or_update_tag( db, package.id, init_request.tag, init_request.expected_hash, user_id ) # Log deduplication event logger.info( f"Deduplication (resumable init): artifact {init_request.expected_hash[:12]}... " f"saved_bytes={init_request.size}" ) # Audit log _log_audit( db, action="artifact.upload", resource=f"project/{project_name}/{package_name}/artifact/{init_request.expected_hash[:12]}", user_id=user_id, source_ip=request.client.host if request.client else None, details={ "artifact_id": init_request.expected_hash, "size": init_request.size, "deduplicated": True, "saved_bytes": init_request.size, "tag": init_request.tag, "resumable": True, }, ) db.commit() return ResumableUploadInitResponse( upload_id=None, already_exists=True, artifact_id=init_request.expected_hash, chunk_size=MULTIPART_CHUNK_SIZE, ) # Initialize resumable upload session = storage.initiate_resumable_upload(init_request.expected_hash) return ResumableUploadInitResponse( upload_id=session["upload_id"], already_exists=False, artifact_id=None, chunk_size=MULTIPART_CHUNK_SIZE, ) @router.put( "/api/v1/project/{project_name}/{package_name}/upload/{upload_id}/part/{part_number}" ) def upload_part( project_name: str, package_name: str, upload_id: str, part_number: int, request: Request, db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), ): """ Upload a part of a resumable upload. Part numbers start at 1. """ # Validate project and package exist project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") if part_number < 1: raise HTTPException(status_code=400, detail="Part number must be >= 1") # Read part data from request body import asyncio loop = asyncio.new_event_loop() async def read_body(): return await request.body() try: data = loop.run_until_complete(read_body()) finally: loop.close() if not data: raise HTTPException(status_code=400, detail="No data in request body") try: part_info = storage.upload_part(upload_id, part_number, data) return ResumableUploadPartResponse( part_number=part_info["PartNumber"], etag=part_info["ETag"], ) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) @router.post( "/api/v1/project/{project_name}/{package_name}/upload/{upload_id}/complete" ) def complete_resumable_upload( project_name: str, package_name: str, upload_id: str, complete_request: ResumableUploadCompleteRequest, request: Request, db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), ): """Complete a resumable upload""" user_id = get_user_id(request) # Validate project and package project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") try: sha256_hash, s3_key = storage.complete_resumable_upload(upload_id) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) # Get file size from S3 obj_info = storage.get_object_info(s3_key) size = obj_info["size"] if obj_info else 0 # Create artifact record artifact = Artifact( id=sha256_hash, size=size, s3_key=s3_key, created_by=user_id, format_metadata={}, ) db.add(artifact) # Record upload upload = Upload( artifact_id=sha256_hash, package_id=package.id, uploaded_by=user_id, source_ip=request.client.host if request.client else None, ) db.add(upload) # Create tag if provided if complete_request.tag: existing_tag = ( db.query(Tag) .filter(Tag.package_id == package.id, Tag.name == complete_request.tag) .first() ) if existing_tag: existing_tag.artifact_id = sha256_hash existing_tag.created_by = user_id else: new_tag = Tag( package_id=package.id, name=complete_request.tag, artifact_id=sha256_hash, created_by=user_id, ) db.add(new_tag) db.commit() return ResumableUploadCompleteResponse( artifact_id=sha256_hash, size=size, project=project_name, package=package_name, tag=complete_request.tag, ) @router.delete("/api/v1/project/{project_name}/{package_name}/upload/{upload_id}") def abort_resumable_upload( project_name: str, package_name: str, upload_id: str, storage: S3Storage = Depends(get_storage), ): """Abort a resumable upload""" try: storage.abort_resumable_upload(upload_id) return {"status": "aborted"} except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) @router.get("/api/v1/project/{project_name}/{package_name}/upload/{upload_id}/status") def get_upload_status( project_name: str, package_name: str, upload_id: str, storage: S3Storage = Depends(get_storage), ): """Get status of a resumable upload""" try: parts = storage.list_upload_parts(upload_id) uploaded_parts = [p["PartNumber"] for p in parts] total_bytes = sum(p.get("Size", 0) for p in parts) return ResumableUploadStatusResponse( upload_id=upload_id, uploaded_parts=uploaded_parts, total_uploaded_bytes=total_bytes, ) except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) # Helper function to resolve artifact reference def _resolve_artifact_ref( ref: str, package: Package, db: Session, ) -> Optional[Artifact]: """Resolve a reference (tag name, version, artifact:hash, tag:name, version:X.Y.Z) to an artifact. Resolution order for implicit refs (no prefix): 1. Version (immutable) 2. Tag (mutable) 3. Artifact ID (direct hash) """ artifact = None # Check for explicit prefixes if ref.startswith("artifact:"): artifact_id = ref[9:] artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first() elif ref.startswith("version:"): version_str = ref[8:] pkg_version = ( db.query(PackageVersion) .filter(PackageVersion.package_id == package.id, PackageVersion.version == version_str) .first() ) if pkg_version: artifact = db.query(Artifact).filter(Artifact.id == pkg_version.artifact_id).first() elif ref.startswith("tag:"): tag_name = ref[4:] tag = ( db.query(Tag) .filter(Tag.package_id == package.id, Tag.name == tag_name) .first() ) if tag: artifact = db.query(Artifact).filter(Artifact.id == tag.artifact_id).first() else: # Implicit ref: try version first, then tag, then artifact ID # Try as version first pkg_version = ( db.query(PackageVersion) .filter(PackageVersion.package_id == package.id, PackageVersion.version == ref) .first() ) if pkg_version: artifact = db.query(Artifact).filter(Artifact.id == pkg_version.artifact_id).first() else: # Try as tag name tag = ( db.query(Tag).filter(Tag.package_id == package.id, Tag.name == ref).first() ) if tag: artifact = db.query(Artifact).filter(Artifact.id == tag.artifact_id).first() else: # Try as direct artifact ID artifact = db.query(Artifact).filter(Artifact.id == ref).first() return artifact # Download artifact with range request support, download modes, and verification @router.get("/api/v1/project/{project_name}/{package_name}/+/{ref}") def download_artifact( project_name: str, package_name: str, ref: str, request: Request, db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), current_user: Optional[User] = Depends(get_current_user_optional), range: Optional[str] = Header(None), mode: Optional[Literal["proxy", "redirect", "presigned"]] = Query( default=None, description="Download mode: proxy (stream through backend), redirect (302 to presigned URL), presigned (return JSON with URL)", ), verify: bool = Query( default=False, description="Enable checksum verification during download", ), verify_mode: Optional[Literal["stream", "pre"]] = Query( default="stream", description="Verification mode: 'stream' (verify after streaming, logs error if mismatch), 'pre' (verify before streaming, returns 500 if mismatch)", ), ): """ Download an artifact by reference (tag name, artifact:hash, tag:name). Verification modes: - verify=false (default): No verification, maximum performance - verify=true&verify_mode=stream: Compute hash while streaming, verify after completion. If mismatch, logs error but content already sent. - verify=true&verify_mode=pre: Download and verify BEFORE streaming to client. Higher latency but guarantees no corrupt data sent. Response headers always include: - X-Checksum-SHA256: The expected SHA256 hash - X-Content-Length: File size in bytes - ETag: Artifact ID (SHA256) - Digest: RFC 3230 format sha-256 hash When verify=true: - X-Verified: 'true' if verified, 'false' if verification failed """ settings = get_settings() # Check authorization (read access required for downloads) project = check_project_access(db, project_name, current_user, "read") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Resolve reference to artifact artifact = _resolve_artifact_ref(ref, package, db) if not artifact: raise HTTPException(status_code=404, detail="Artifact not found") filename = sanitize_filename(artifact.original_name or f"{artifact.id}") # Audit log download user_id = get_user_id(request) _log_audit( db=db, action="artifact.download", resource=f"project/{project_name}/{package_name}/artifact/{artifact.id[:12]}", user_id=user_id, source_ip=request.client.host if request.client else None, details={ "artifact_id": artifact.id, "ref": ref, "size": artifact.size, "original_name": artifact.original_name, }, ) db.commit() # Build common checksum headers (always included) checksum_headers = { "X-Checksum-SHA256": artifact.id, "X-Content-Length": str(artifact.size), "ETag": f'"{artifact.id}"', } # Add RFC 3230 Digest header try: digest_base64 = sha256_to_base64(artifact.id) checksum_headers["Digest"] = f"sha-256={digest_base64}" except Exception: pass # Skip if conversion fails # Add MD5 checksum if available if artifact.checksum_md5: checksum_headers["X-Checksum-MD5"] = artifact.checksum_md5 # Determine download mode (query param overrides server default) download_mode = mode or settings.download_mode # Handle presigned mode - return JSON with presigned URL if download_mode == "presigned": presigned_url = storage.generate_presigned_url( artifact.s3_key, response_content_type=artifact.content_type, response_content_disposition=f'attachment; filename="{filename}"', ) expires_at = datetime.now(timezone.utc) + timedelta( seconds=settings.presigned_url_expiry ) return PresignedUrlResponse( url=presigned_url, expires_at=expires_at, method="GET", artifact_id=artifact.id, size=artifact.size, content_type=artifact.content_type, original_name=artifact.original_name, checksum_sha256=artifact.id, checksum_md5=artifact.checksum_md5, ) # Handle redirect mode - return 302 redirect to presigned URL if download_mode == "redirect": presigned_url = storage.generate_presigned_url( artifact.s3_key, response_content_type=artifact.content_type, response_content_disposition=f'attachment; filename="{filename}"', ) return RedirectResponse(url=presigned_url, status_code=302) # Proxy mode (default fallback) - stream through backend # Handle range requests (verification not supported for partial downloads) if range: stream, content_length, content_range = storage.get_stream( artifact.s3_key, range ) headers = { "Content-Disposition": f'attachment; filename="{filename}"', "Accept-Ranges": "bytes", "Content-Length": str(content_length), **checksum_headers, } if content_range: headers["Content-Range"] = content_range # Note: X-Verified not set for range requests (cannot verify partial content) return StreamingResponse( stream, status_code=206, # Partial Content media_type=artifact.content_type or "application/octet-stream", headers=headers, ) # Full download with optional verification base_headers = { "Content-Disposition": f'attachment; filename="{filename}"', "Accept-Ranges": "bytes", **checksum_headers, } # Pre-verification mode: verify before streaming if verify and verify_mode == "pre": try: content = storage.get_verified(artifact.s3_key, artifact.id) return Response( content=content, media_type=artifact.content_type or "application/octet-stream", headers={ **base_headers, "Content-Length": str(len(content)), "X-Verified": "true", }, ) except ChecksumMismatchError as e: logger.error( f"Pre-verification failed for artifact {artifact.id[:16]}...: {e.to_dict()}" ) raise HTTPException( status_code=500, detail={ "error": "checksum_verification_failed", "message": "Downloaded content does not match expected checksum", "expected": e.expected, "actual": e.actual, "artifact_id": artifact.id, }, ) # Streaming verification mode: verify while/after streaming if verify and verify_mode == "stream": verifying_wrapper, content_length, _ = storage.get_stream_verified( artifact.s3_key, artifact.id ) def verified_stream(): """Generator that yields chunks and verifies after completion.""" try: for chunk in verifying_wrapper: yield chunk # After all chunks yielded, verify try: verifying_wrapper.verify() logger.info( f"Streaming verification passed for artifact {artifact.id[:16]}..." ) except ChecksumMismatchError as e: # Content already sent - log error but cannot reject logger.error( f"Streaming verification FAILED for artifact {artifact.id[:16]}...: " f"expected {e.expected[:16]}..., got {e.actual[:16]}..." ) except Exception as e: logger.error(f"Error during streaming download: {e}") raise return StreamingResponse( verified_stream(), media_type=artifact.content_type or "application/octet-stream", headers={ **base_headers, "Content-Length": str(content_length), "X-Verified": "pending", # Verification happens after streaming }, ) # No verification - direct streaming stream, content_length, _ = storage.get_stream(artifact.s3_key) return StreamingResponse( stream, media_type=artifact.content_type or "application/octet-stream", headers={ **base_headers, "Content-Length": str(content_length), "X-Verified": "false", }, ) # Get presigned URL endpoint (explicit endpoint for getting URL without redirect) @router.get( "/api/v1/project/{project_name}/{package_name}/+/{ref}/url", response_model=PresignedUrlResponse, ) def get_artifact_url( project_name: str, package_name: str, ref: str, db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), expiry: Optional[int] = Query( default=None, description="Custom expiry time in seconds (defaults to server setting)", ), ): """ Get a presigned URL for direct S3 download. This endpoint always returns a presigned URL regardless of server download mode. """ settings = get_settings() # Check authorization (read access required for downloads) project = check_project_access(db, project_name, current_user, "read") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Resolve reference to artifact artifact = _resolve_artifact_ref(ref, package, db) if not artifact: raise HTTPException(status_code=404, detail="Artifact not found") filename = sanitize_filename(artifact.original_name or f"{artifact.id}") url_expiry = expiry or settings.presigned_url_expiry presigned_url = storage.generate_presigned_url( artifact.s3_key, expiry=url_expiry, response_content_type=artifact.content_type, response_content_disposition=f'attachment; filename="{filename}"', ) expires_at = datetime.now(timezone.utc) + timedelta(seconds=url_expiry) return PresignedUrlResponse( url=presigned_url, expires_at=expires_at, method="GET", artifact_id=artifact.id, size=artifact.size, content_type=artifact.content_type, original_name=artifact.original_name, checksum_sha256=artifact.id, checksum_md5=artifact.checksum_md5, ) # HEAD request for download (to check file info without downloading) @router.head("/api/v1/project/{project_name}/{package_name}/+/{ref}") def head_artifact( project_name: str, package_name: str, ref: str, db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), ): """ Get artifact metadata without downloading content. Returns headers with checksum information for client-side verification. """ # Get project and package project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Resolve reference to artifact artifact = _resolve_artifact_ref(ref, package, db) if not artifact: raise HTTPException(status_code=404, detail="Artifact not found") filename = sanitize_filename(artifact.original_name or f"{artifact.id}") # Build headers with checksum information headers = { "Content-Disposition": f'attachment; filename="{filename}"', "Accept-Ranges": "bytes", "Content-Length": str(artifact.size), "X-Artifact-Id": artifact.id, "X-Checksum-SHA256": artifact.id, "X-Content-Length": str(artifact.size), "ETag": f'"{artifact.id}"', } # Add RFC 3230 Digest header try: digest_base64 = sha256_to_base64(artifact.id) headers["Digest"] = f"sha-256={digest_base64}" except Exception: pass # Skip if conversion fails # Add MD5 checksum if available if artifact.checksum_md5: headers["X-Checksum-MD5"] = artifact.checksum_md5 return Response( content=b"", media_type=artifact.content_type or "application/octet-stream", headers=headers, ) # Compatibility route @router.get("/project/{project_name}/{package_name}/+/{ref}") def download_artifact_compat( project_name: str, package_name: str, ref: str, request: Request, db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), range: Optional[str] = Header(None), verify: bool = Query(default=False), verify_mode: Optional[Literal["stream", "pre"]] = Query(default="stream"), ): return download_artifact( project_name, package_name, ref, request, db, storage, range, verify=verify, verify_mode=verify_mode, ) # Version routes @router.get( "/api/v1/project/{project_name}/{package_name}/versions", response_model=PaginatedResponse[PackageVersionResponse], ) def list_versions( project_name: str, package_name: str, page: int = Query(default=1, ge=1, description="Page number"), limit: int = Query(default=20, ge=1, le=100, description="Items per page"), search: Optional[str] = Query(default=None, description="Search by version string"), sort: str = Query(default="version", description="Sort field (version, created_at)"), order: str = Query(default="desc", description="Sort order (asc, desc)"), db: Session = Depends(get_db), ): """List all versions for a package.""" project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Validate sort field valid_sort_fields = {"version": PackageVersion.version, "created_at": PackageVersion.created_at} if sort not in valid_sort_fields: raise HTTPException( status_code=400, detail=f"Invalid sort field. Must be one of: {', '.join(valid_sort_fields.keys())}", ) # Validate order if order not in ("asc", "desc"): raise HTTPException(status_code=400, detail="Invalid order. Must be 'asc' or 'desc'") # Base query with JOIN to artifact for metadata query = ( db.query(PackageVersion, Artifact) .join(Artifact, PackageVersion.artifact_id == Artifact.id) .filter(PackageVersion.package_id == package.id) ) # Apply search filter if search: query = query.filter(PackageVersion.version.ilike(f"%{search}%")) # Get total count before pagination total = query.count() # Apply sorting sort_column = valid_sort_fields[sort] if order == "desc": query = query.order_by(sort_column.desc()) else: query = query.order_by(sort_column.asc()) # Apply pagination offset = (page - 1) * limit results = query.offset(offset).limit(limit).all() # Get tags for each version's artifact version_responses = [] for pkg_version, artifact in results: # Get tags pointing to this artifact in this package tags = ( db.query(Tag.name) .filter(Tag.package_id == package.id, Tag.artifact_id == artifact.id) .all() ) tag_names = [t[0] for t in tags] version_responses.append( PackageVersionResponse( id=pkg_version.id, package_id=pkg_version.package_id, artifact_id=pkg_version.artifact_id, version=pkg_version.version, version_source=pkg_version.version_source, created_at=pkg_version.created_at, created_by=pkg_version.created_by, size=artifact.size, content_type=artifact.content_type, original_name=artifact.original_name, tags=tag_names, ) ) total_pages = math.ceil(total / limit) if total > 0 else 1 has_more = page < total_pages return PaginatedResponse( items=version_responses, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=has_more, ), ) @router.get( "/api/v1/project/{project_name}/{package_name}/versions/{version}", response_model=PackageVersionDetailResponse, ) def get_version( project_name: str, package_name: str, version: str, db: Session = Depends(get_db), ): """Get details of a specific version.""" project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") pkg_version = ( db.query(PackageVersion) .filter(PackageVersion.package_id == package.id, PackageVersion.version == version) .first() ) if not pkg_version: raise HTTPException(status_code=404, detail="Version not found") artifact = db.query(Artifact).filter(Artifact.id == pkg_version.artifact_id).first() # Get tags pointing to this artifact tags = ( db.query(Tag.name) .filter(Tag.package_id == package.id, Tag.artifact_id == artifact.id) .all() ) tag_names = [t[0] for t in tags] return PackageVersionDetailResponse( id=pkg_version.id, package_id=pkg_version.package_id, artifact_id=pkg_version.artifact_id, version=pkg_version.version, version_source=pkg_version.version_source, created_at=pkg_version.created_at, created_by=pkg_version.created_by, size=artifact.size, content_type=artifact.content_type, original_name=artifact.original_name, tags=tag_names, format_metadata=artifact.artifact_metadata, checksum_md5=artifact.checksum_md5, checksum_sha1=artifact.checksum_sha1, ) @router.delete( "/api/v1/project/{project_name}/{package_name}/versions/{version}", status_code=204, ) def delete_version( project_name: str, package_name: str, version: str, request: Request, db: Session = Depends(get_db), current_user: User = Depends(require_admin), ): """Delete a version (admin only). Does not delete the underlying artifact.""" project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") pkg_version = ( db.query(PackageVersion) .filter(PackageVersion.package_id == package.id, PackageVersion.version == version) .first() ) if not pkg_version: raise HTTPException(status_code=404, detail="Version not found") artifact_id = pkg_version.artifact_id # Delete version (triggers will decrement ref_count) db.delete(pkg_version) # Audit log _log_audit( db, action="version.delete", resource=f"project/{project_name}/{package_name}/version/{version}", user_id=current_user.username, source_ip=request.client.host if request.client else None, details={"artifact_id": artifact_id}, ) db.commit() return Response(status_code=204) # Tag routes @router.get( "/api/v1/project/{project_name}/{package_name}/tags", response_model=PaginatedResponse[TagDetailResponse], ) def list_tags( project_name: str, package_name: str, page: int = Query(default=1, ge=1, description="Page number"), limit: int = Query(default=20, ge=1, le=100, description="Items per page"), search: Optional[str] = Query(default=None, description="Search by tag name"), sort: str = Query(default="name", description="Sort field (name, created_at)"), order: str = Query(default="asc", description="Sort order (asc, desc)"), from_date: Optional[datetime] = Query( default=None, alias="from", description="Filter tags created after this date" ), to_date: Optional[datetime] = Query( default=None, alias="to", description="Filter tags created before this date" ), db: Session = Depends(get_db), ): project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Validate sort field valid_sort_fields = {"name": Tag.name, "created_at": Tag.created_at} if sort not in valid_sort_fields: raise HTTPException( status_code=400, detail=f"Invalid sort field. Must be one of: {', '.join(valid_sort_fields.keys())}", ) # Validate order if order not in ("asc", "desc"): raise HTTPException( status_code=400, detail="Invalid order. Must be 'asc' or 'desc'" ) # Base query with JOIN to artifact for metadata and LEFT JOIN to version query = ( db.query(Tag, Artifact, PackageVersion.version) .join(Artifact, Tag.artifact_id == Artifact.id) .outerjoin( PackageVersion, and_( PackageVersion.package_id == Tag.package_id, PackageVersion.artifact_id == Tag.artifact_id, ), ) .filter(Tag.package_id == package.id) ) # Apply search filter (case-insensitive on tag name OR artifact original filename) if search: search_lower = search.lower() query = query.filter( or_( func.lower(Tag.name).contains(search_lower), func.lower(Artifact.original_name).contains(search_lower), ) ) # Apply date range filters if from_date: query = query.filter(Tag.created_at >= from_date) if to_date: query = query.filter(Tag.created_at <= to_date) # Get total count before pagination total = query.count() # Apply sorting sort_column = valid_sort_fields[sort] if order == "desc": query = query.order_by(sort_column.desc()) else: query = query.order_by(sort_column.asc()) # Apply pagination offset = (page - 1) * limit results = query.offset(offset).limit(limit).all() # Calculate total pages total_pages = math.ceil(total / limit) if total > 0 else 1 # Build detailed responses with artifact metadata and version detailed_tags = [] for tag, artifact, version in results: detailed_tags.append( TagDetailResponse( id=tag.id, package_id=tag.package_id, name=tag.name, artifact_id=tag.artifact_id, created_at=tag.created_at, created_by=tag.created_by, artifact_size=artifact.size, artifact_content_type=artifact.content_type, artifact_original_name=artifact.original_name, artifact_created_at=artifact.created_at, artifact_format_metadata=artifact.format_metadata, version=version, ) ) return PaginatedResponse( items=detailed_tags, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) @router.post( "/api/v1/project/{project_name}/{package_name}/tags", response_model=TagResponse ) def create_tag( project_name: str, package_name: str, tag: TagCreate, request: Request, db: Session = Depends(get_db), current_user: Optional[User] = Depends(get_current_user_optional), ): """Create or update a tag. Requires write access.""" project = check_project_access(db, project_name, current_user, "write") user_id = current_user.username if current_user else get_user_id(request) package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Verify artifact exists artifact = db.query(Artifact).filter(Artifact.id == tag.artifact_id).first() if not artifact: raise HTTPException(status_code=404, detail="Artifact not found") # Create or update tag existing = ( db.query(Tag).filter(Tag.package_id == package.id, Tag.name == tag.name).first() ) if existing: old_artifact_id = existing.artifact_id existing.artifact_id = tag.artifact_id existing.created_by = user_id # Audit log for tag update _log_audit( db=db, action="tag.update", resource=f"project/{project_name}/{package_name}/tag/{tag.name}", user_id=user_id, source_ip=request.client.host if request.client else None, details={ "old_artifact_id": old_artifact_id, "new_artifact_id": tag.artifact_id, }, ) db.commit() db.refresh(existing) return existing db_tag = Tag( package_id=package.id, name=tag.name, artifact_id=tag.artifact_id, created_by=user_id, ) db.add(db_tag) # Audit log for tag create _log_audit( db=db, action="tag.create", resource=f"project/{project_name}/{package_name}/tag/{tag.name}", user_id=user_id, source_ip=request.client.host if request.client else None, details={"artifact_id": tag.artifact_id}, ) db.commit() db.refresh(db_tag) return db_tag @router.get( "/api/v1/project/{project_name}/{package_name}/tags/{tag_name}", response_model=TagDetailResponse, ) def get_tag( project_name: str, package_name: str, tag_name: str, db: Session = Depends(get_db), ): """Get a single tag with full artifact metadata""" project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") result = ( db.query(Tag, Artifact, PackageVersion.version) .join(Artifact, Tag.artifact_id == Artifact.id) .outerjoin( PackageVersion, and_( PackageVersion.package_id == Tag.package_id, PackageVersion.artifact_id == Tag.artifact_id, ), ) .filter(Tag.package_id == package.id, Tag.name == tag_name) .first() ) if not result: raise HTTPException(status_code=404, detail="Tag not found") tag, artifact, version = result return TagDetailResponse( id=tag.id, package_id=tag.package_id, name=tag.name, artifact_id=tag.artifact_id, created_at=tag.created_at, created_by=tag.created_by, artifact_size=artifact.size, artifact_content_type=artifact.content_type, artifact_original_name=artifact.original_name, artifact_created_at=artifact.created_at, artifact_format_metadata=artifact.format_metadata, version=version, ) @router.get( "/api/v1/project/{project_name}/{package_name}/tags/{tag_name}/history", response_model=PaginatedResponse[TagHistoryDetailResponse], ) def get_tag_history( project_name: str, package_name: str, tag_name: str, page: int = Query(default=1, ge=1), limit: int = Query(default=20, ge=1, le=100), db: Session = Depends(get_db), ): """Get the history of artifact assignments for a tag with artifact metadata""" project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") tag = ( db.query(Tag).filter(Tag.package_id == package.id, Tag.name == tag_name).first() ) if not tag: raise HTTPException(status_code=404, detail="Tag not found") # Get total count total = ( db.query(func.count(TagHistory.id)).filter(TagHistory.tag_id == tag.id).scalar() or 0 ) # Get paginated history with artifact metadata offset = (page - 1) * limit history_items = ( db.query(TagHistory, Artifact) .outerjoin(Artifact, TagHistory.new_artifact_id == Artifact.id) .filter(TagHistory.tag_id == tag.id) .order_by(TagHistory.changed_at.desc()) .offset(offset) .limit(limit) .all() ) # Build response with artifact metadata items = [] for history, artifact in history_items: items.append( TagHistoryDetailResponse( id=history.id, tag_id=history.tag_id, tag_name=tag.name, old_artifact_id=history.old_artifact_id, new_artifact_id=history.new_artifact_id, changed_at=history.changed_at, changed_by=history.changed_by, artifact_size=artifact.size if artifact else 0, artifact_original_name=artifact.original_name if artifact else None, artifact_content_type=artifact.content_type if artifact else None, ) ) total_pages = math.ceil(total / limit) if limit > 0 else 0 return PaginatedResponse( items=items, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) @router.delete( "/api/v1/project/{project_name}/{package_name}/tags/{tag_name}", status_code=204, ) def delete_tag( project_name: str, package_name: str, tag_name: str, request: Request, db: Session = Depends(get_db), ): """ Delete a tag and decrement the artifact's ref_count. Records the deletion in tag history before removing the tag. """ user_id = get_user_id(request) project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") tag = ( db.query(Tag).filter(Tag.package_id == package.id, Tag.name == tag_name).first() ) if not tag: raise HTTPException(status_code=404, detail="Tag not found") artifact_id = tag.artifact_id # Record deletion in history history = TagHistory( tag_id=tag.id, old_artifact_id=artifact_id, new_artifact_id=artifact_id, # Same artifact for delete record change_type="delete", changed_by=user_id, ) db.add(history) db.flush() # Flush history before deleting tag (cascade will delete history) # NOTE: ref_count decrement is handled by SQL trigger (tags_ref_count_delete_trigger) # when the tag is deleted below logger.info(f"Tag '{tag_name}' deleted for artifact {artifact_id[:12]}...") # Delete the tag (SQL trigger will decrement ref_count) db.delete(tag) db.commit() # Audit log (after commit so we can query the updated ref_count) artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first() _log_audit( db, action="tag.delete", resource=f"project/{project_name}/{package_name}/tag/{tag_name}", user_id=user_id, source_ip=request.client.host if request.client else None, details={ "artifact_id": artifact_id, "ref_count_after": artifact.ref_count if artifact else 0, }, ) db.commit() # Commit the audit log return None # Consumer routes @router.get( "/api/v1/project/{project_name}/{package_name}/consumers", response_model=List[ConsumerResponse], ) def get_consumers(project_name: str, package_name: str, db: Session = Depends(get_db)): project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") consumers = ( db.query(Consumer) .filter(Consumer.package_id == package.id) .order_by(Consumer.last_access.desc()) .all() ) return consumers # Package artifacts @router.get( "/api/v1/project/{project_name}/{package_name}/artifacts", response_model=PaginatedResponse[PackageArtifactResponse], ) def list_package_artifacts( project_name: str, package_name: str, page: int = Query(default=1, ge=1, description="Page number"), limit: int = Query(default=20, ge=1, le=100, description="Items per page"), content_type: Optional[str] = Query( default=None, description="Filter by content type" ), created_after: Optional[datetime] = Query( default=None, description="Filter artifacts created after this date" ), created_before: Optional[datetime] = Query( default=None, description="Filter artifacts created before this date" ), min_size: Optional[int] = Query( default=None, ge=0, description="Minimum artifact size in bytes" ), max_size: Optional[int] = Query( default=None, ge=0, description="Maximum artifact size in bytes" ), sort: Optional[str] = Query( default=None, description="Sort field: created_at, size, original_name" ), order: Optional[str] = Query(default="desc", description="Sort order: asc or desc"), db: Session = Depends(get_db), ): """List all unique artifacts uploaded to a package with filtering and sorting.""" project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Get distinct artifacts uploaded to this package via uploads table artifact_ids_subquery = ( db.query(func.distinct(Upload.artifact_id)) .filter(Upload.package_id == package.id) .subquery() ) query = db.query(Artifact).filter(Artifact.id.in_(artifact_ids_subquery)) # Apply content_type filter if content_type: query = query.filter(Artifact.content_type == content_type) # Apply date range filters if created_after: query = query.filter(Artifact.created_at >= created_after) if created_before: query = query.filter(Artifact.created_at <= created_before) # Apply size range filters if min_size is not None: query = query.filter(Artifact.size >= min_size) if max_size is not None: query = query.filter(Artifact.size <= max_size) # Validate and apply sorting valid_sort_fields = { "created_at": Artifact.created_at, "size": Artifact.size, "original_name": Artifact.original_name, } if sort and sort not in valid_sort_fields: raise HTTPException( status_code=400, detail=f"Invalid sort field. Valid options: {', '.join(valid_sort_fields.keys())}", ) sort_column = valid_sort_fields.get(sort, Artifact.created_at) if order and order.lower() not in ("asc", "desc"): raise HTTPException( status_code=400, detail="Invalid order. Valid options: asc, desc" ) sort_order = ( sort_column.asc() if order and order.lower() == "asc" else sort_column.desc() ) # Get total count before pagination total = query.count() # Apply pagination offset = (page - 1) * limit artifacts = query.order_by(sort_order).offset(offset).limit(limit).all() # Calculate total pages total_pages = math.ceil(total / limit) if total > 0 else 1 # Build responses with tag info artifact_responses = [] for artifact in artifacts: # Get tags pointing to this artifact in this package tags = ( db.query(Tag.name) .filter(Tag.package_id == package.id, Tag.artifact_id == artifact.id) .all() ) tag_names = [t.name for t in tags] artifact_responses.append( PackageArtifactResponse( id=artifact.id, size=artifact.size, content_type=artifact.content_type, original_name=artifact.original_name, created_at=artifact.created_at, created_by=artifact.created_by, format_metadata=artifact.format_metadata, tags=tag_names, ) ) return PaginatedResponse( items=artifact_responses, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) # Global artifacts listing @router.get( "/api/v1/artifacts", response_model=PaginatedResponse[GlobalArtifactResponse], ) def list_all_artifacts( project: Optional[str] = Query(None, description="Filter by project name"), package: Optional[str] = Query(None, description="Filter by package name"), tag: Optional[str] = Query( None, description="Filter by tag name. Supports wildcards (*) and comma-separated values", ), content_type: Optional[str] = Query(None, description="Filter by content type"), min_size: Optional[int] = Query(None, ge=0, description="Minimum size in bytes"), max_size: Optional[int] = Query(None, ge=0, description="Maximum size in bytes"), from_date: Optional[datetime] = Query( None, alias="from", description="Created after" ), to_date: Optional[datetime] = Query(None, alias="to", description="Created before"), sort: Optional[str] = Query(None, description="Sort field: created_at, size"), order: Optional[str] = Query("desc", description="Sort order: asc or desc"), page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), ): """ List all artifacts globally with filtering by project, package, tag, etc. Returns artifacts with context about which projects/packages/tags reference them. """ # Start with base query query = db.query(Artifact) # If filtering by project/package/tag, need to join through tags if project or package or tag: # Subquery to get artifact IDs that match the filters tag_query = ( db.query(Tag.artifact_id) .join(Package, Tag.package_id == Package.id) .join(Project, Package.project_id == Project.id) ) if project: tag_query = tag_query.filter(Project.name == project) if package: tag_query = tag_query.filter(Package.name == package) if tag: # Support multiple values (comma-separated) and wildcards (*) tag_values = [t.strip() for t in tag.split(",") if t.strip()] if len(tag_values) == 1: tag_val = tag_values[0] if "*" in tag_val: # Wildcard: convert * to SQL LIKE % tag_query = tag_query.filter( Tag.name.ilike(tag_val.replace("*", "%")) ) else: tag_query = tag_query.filter(Tag.name == tag_val) else: # Multiple values: check if any match (with wildcard support) tag_conditions = [] for tag_val in tag_values: if "*" in tag_val: tag_conditions.append(Tag.name.ilike(tag_val.replace("*", "%"))) else: tag_conditions.append(Tag.name == tag_val) tag_query = tag_query.filter(or_(*tag_conditions)) artifact_ids = tag_query.distinct().subquery() query = query.filter(Artifact.id.in_(artifact_ids)) # Apply content type filter if content_type: query = query.filter(Artifact.content_type == content_type) # Apply size filters if min_size is not None: query = query.filter(Artifact.size >= min_size) if max_size is not None: query = query.filter(Artifact.size <= max_size) # Apply date filters if from_date: query = query.filter(Artifact.created_at >= from_date) if to_date: query = query.filter(Artifact.created_at <= to_date) # Validate and apply sorting valid_sort_fields = {"created_at": Artifact.created_at, "size": Artifact.size} if sort and sort not in valid_sort_fields: raise HTTPException( status_code=400, detail=f"Invalid sort field. Valid options: {', '.join(valid_sort_fields.keys())}", ) sort_column = valid_sort_fields.get(sort, Artifact.created_at) if order and order.lower() not in ("asc", "desc"): raise HTTPException( status_code=400, detail="Invalid order. Valid options: asc, desc" ) sort_order = ( sort_column.asc() if order and order.lower() == "asc" else sort_column.desc() ) total = query.count() total_pages = math.ceil(total / limit) if total > 0 else 1 artifacts = query.order_by(sort_order).offset((page - 1) * limit).limit(limit).all() # Build responses with context items = [] for artifact in artifacts: # Get all tags referencing this artifact with project/package info tags_info = ( db.query(Tag, Package, Project) .join(Package, Tag.package_id == Package.id) .join(Project, Package.project_id == Project.id) .filter(Tag.artifact_id == artifact.id) .all() ) projects = list(set(proj.name for _, _, proj in tags_info)) packages = list(set(f"{proj.name}/{pkg.name}" for _, pkg, proj in tags_info)) tags = [f"{proj.name}/{pkg.name}:{t.name}" for t, pkg, proj in tags_info] items.append( GlobalArtifactResponse( id=artifact.id, sha256=artifact.id, size=artifact.size, content_type=artifact.content_type, original_name=artifact.original_name, created_at=artifact.created_at, created_by=artifact.created_by, format_metadata=artifact.artifact_metadata, ref_count=artifact.ref_count, projects=projects, packages=packages, tags=tags, ) ) return PaginatedResponse( items=items, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) # Global tags listing @router.get( "/api/v1/tags", response_model=PaginatedResponse[GlobalTagResponse], ) def list_all_tags( project: Optional[str] = Query(None, description="Filter by project name"), package: Optional[str] = Query(None, description="Filter by package name"), search: Optional[str] = Query( None, description="Search by tag name. Supports wildcards (*) and comma-separated values", ), from_date: Optional[datetime] = Query( None, alias="from", description="Created after" ), to_date: Optional[datetime] = Query(None, alias="to", description="Created before"), sort: Optional[str] = Query(None, description="Sort field: name, created_at"), order: Optional[str] = Query("desc", description="Sort order: asc or desc"), page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), ): """ List all tags globally with filtering by project, package, name, etc. """ query = ( db.query(Tag, Package, Project, Artifact, PackageVersion.version) .join(Package, Tag.package_id == Package.id) .join(Project, Package.project_id == Project.id) .join(Artifact, Tag.artifact_id == Artifact.id) .outerjoin( PackageVersion, and_( PackageVersion.package_id == Tag.package_id, PackageVersion.artifact_id == Tag.artifact_id, ), ) ) # Apply filters if project: query = query.filter(Project.name == project) if package: query = query.filter(Package.name == package) if search: # Support multiple values (comma-separated) and wildcards (*) search_values = [s.strip() for s in search.split(",") if s.strip()] if len(search_values) == 1: search_val = search_values[0] if "*" in search_val: query = query.filter(Tag.name.ilike(search_val.replace("*", "%"))) else: query = query.filter(Tag.name.ilike(f"%{search_val}%")) else: search_conditions = [] for search_val in search_values: if "*" in search_val: search_conditions.append( Tag.name.ilike(search_val.replace("*", "%")) ) else: search_conditions.append(Tag.name.ilike(f"%{search_val}%")) query = query.filter(or_(*search_conditions)) if from_date: query = query.filter(Tag.created_at >= from_date) if to_date: query = query.filter(Tag.created_at <= to_date) # Validate and apply sorting valid_sort_fields = {"name": Tag.name, "created_at": Tag.created_at} if sort and sort not in valid_sort_fields: raise HTTPException( status_code=400, detail=f"Invalid sort field. Valid options: {', '.join(valid_sort_fields.keys())}", ) sort_column = valid_sort_fields.get(sort, Tag.created_at) if order and order.lower() not in ("asc", "desc"): raise HTTPException( status_code=400, detail="Invalid order. Valid options: asc, desc" ) sort_order = ( sort_column.asc() if order and order.lower() == "asc" else sort_column.desc() ) total = query.count() total_pages = math.ceil(total / limit) if total > 0 else 1 results = query.order_by(sort_order).offset((page - 1) * limit).limit(limit).all() items = [ GlobalTagResponse( id=tag.id, name=tag.name, artifact_id=tag.artifact_id, created_at=tag.created_at, created_by=tag.created_by, project_name=proj.name, package_name=pkg.name, artifact_size=artifact.size, artifact_content_type=artifact.content_type, version=version, ) for tag, pkg, proj, artifact, version in results ] return PaginatedResponse( items=items, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) # Artifact by ID @router.get("/api/v1/artifact/{artifact_id}", response_model=ArtifactDetailResponse) def get_artifact(artifact_id: str, db: Session = Depends(get_db)): """Get artifact metadata including list of packages/tags referencing it""" artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first() if not artifact: raise HTTPException(status_code=404, detail="Artifact not found") # Get all tags referencing this artifact with package and project info tags_with_context = ( db.query(Tag, Package, Project) .join(Package, Tag.package_id == Package.id) .join(Project, Package.project_id == Project.id) .filter(Tag.artifact_id == artifact_id) .all() ) tag_infos = [ ArtifactTagInfo( id=tag.id, name=tag.name, package_id=package.id, package_name=package.name, project_name=project.name, ) for tag, package, project in tags_with_context ] return ArtifactDetailResponse( id=artifact.id, sha256=artifact.id, # SHA256 hash is the artifact ID size=artifact.size, content_type=artifact.content_type, original_name=artifact.original_name, checksum_md5=artifact.checksum_md5, checksum_sha1=artifact.checksum_sha1, s3_etag=artifact.s3_etag, created_at=artifact.created_at, created_by=artifact.created_by, ref_count=artifact.ref_count, format_metadata=artifact.format_metadata, tags=tag_infos, ) # ============================================================================= # Garbage Collection Endpoints (ISSUE 36) # ============================================================================= @router.get( "/api/v1/admin/orphaned-artifacts", response_model=List[OrphanedArtifactResponse], ) def list_orphaned_artifacts( request: Request, limit: int = Query( default=100, ge=1, le=1000, description="Max artifacts to return" ), db: Session = Depends(get_db), ): """ List artifacts with ref_count=0 (orphaned artifacts not referenced by any tag). These artifacts can be safely cleaned up as they are not referenced by any tag. """ orphaned = ( db.query(Artifact) .filter(Artifact.ref_count == 0) .order_by(Artifact.created_at.asc()) .limit(limit) .all() ) return [ OrphanedArtifactResponse( id=a.id, size=a.size, created_at=a.created_at, created_by=a.created_by, original_name=a.original_name, ) for a in orphaned ] @router.post( "/api/v1/admin/garbage-collect", response_model=GarbageCollectionResponse, ) def garbage_collect( request: Request, dry_run: bool = Query( default=True, description="If true, only report what would be deleted" ), limit: int = Query( default=100, ge=1, le=1000, description="Max artifacts to delete per run" ), db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), ): """ Clean up orphaned artifacts (ref_count=0) from storage and database. By default runs in dry-run mode (only reports what would be deleted). Set dry_run=false to actually delete artifacts. Returns list of deleted artifact IDs and total bytes freed. """ user_id = get_user_id(request) # Find orphaned artifacts orphaned = ( db.query(Artifact) .filter(Artifact.ref_count == 0) .order_by(Artifact.created_at.asc()) .limit(limit) .all() ) deleted_ids = [] bytes_freed = 0 for artifact in orphaned: if not dry_run: # Delete from S3 try: storage.delete(artifact.s3_key) except Exception as e: logger.error(f"Failed to delete S3 object {artifact.s3_key}: {e}") continue # Delete from database db.delete(artifact) logger.info( f"Garbage collected artifact {artifact.id[:12]}... ({artifact.size} bytes)" ) deleted_ids.append(artifact.id) bytes_freed += artifact.size if not dry_run: # Audit log _log_audit( db, action="garbage_collect", resource="artifacts", user_id=user_id, source_ip=request.client.host if request.client else None, details={ "artifacts_deleted": len(deleted_ids), "bytes_freed": bytes_freed, "artifact_ids": deleted_ids[:10], # Log first 10 for brevity }, ) db.commit() return GarbageCollectionResponse( artifacts_deleted=len(deleted_ids), bytes_freed=bytes_freed, artifact_ids=deleted_ids, dry_run=dry_run, ) @router.get( "/api/v1/admin/consistency-check", response_model=ConsistencyCheckResponse, ) def check_consistency( limit: int = Query( default=100, ge=1, le=1000, description="Max items to report per category" ), db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), ): """ Check consistency between database records and S3 storage. Reports: - Orphaned S3 objects (in S3 but not in database) - Missing S3 objects (in database but not in S3) - Size mismatches (database size != S3 size) This is a read-only operation. Use garbage-collect to clean up issues. """ orphaned_s3_keys = [] missing_s3_keys = [] size_mismatches = [] # Get all artifacts from database artifacts = db.query(Artifact).all() total_checked = len(artifacts) # Check each artifact exists in S3 and sizes match for artifact in artifacts: try: s3_info = storage.get_object_info(artifact.s3_key) if s3_info is None: if len(missing_s3_keys) < limit: missing_s3_keys.append(artifact.s3_key) else: s3_size = s3_info.get("size", 0) if s3_size != artifact.size: if len(size_mismatches) < limit: size_mismatches.append( { "artifact_id": artifact.id, "s3_key": artifact.s3_key, "db_size": artifact.size, "s3_size": s3_size, } ) except Exception as e: logger.error(f"Error checking S3 object {artifact.s3_key}: {e}") if len(missing_s3_keys) < limit: missing_s3_keys.append(artifact.s3_key) # Check for orphaned S3 objects (objects in S3 bucket but not in database) # Note: This is expensive for large buckets, so we limit the scan try: # List objects in the fruits/ prefix (where artifacts are stored) paginator = storage.client.get_paginator("list_objects_v2") artifact_ids_in_db = {a.id for a in artifacts} objects_checked = 0 for page in paginator.paginate( Bucket=storage.bucket, Prefix="fruits/", MaxKeys=1000 ): if "Contents" not in page: break for obj in page["Contents"]: objects_checked += 1 # Extract hash from key: fruits/ab/cd/abcdef... key = obj["Key"] parts = key.split("/") if len(parts) == 4 and parts[0] == "fruits": sha256_hash = parts[3] if sha256_hash not in artifact_ids_in_db: if len(orphaned_s3_keys) < limit: orphaned_s3_keys.append(key) # Limit total objects checked if objects_checked >= 10000: break if objects_checked >= 10000: break except Exception as e: logger.error(f"Error listing S3 objects for consistency check: {e}") healthy = ( len(orphaned_s3_keys) == 0 and len(missing_s3_keys) == 0 and len(size_mismatches) == 0 ) return ConsistencyCheckResponse( total_artifacts_checked=total_checked, orphaned_s3_objects=len(orphaned_s3_keys), missing_s3_objects=len(missing_s3_keys), size_mismatches=len(size_mismatches), healthy=healthy, orphaned_s3_keys=orphaned_s3_keys, missing_s3_keys=missing_s3_keys, size_mismatch_artifacts=size_mismatches, ) # ============================================================================= # Statistics Endpoints (ISSUE 34) # ============================================================================= @router.get("/api/v1/stats", response_model=StorageStatsResponse) def get_storage_stats(db: Session = Depends(get_db)): """ Get global storage statistics including deduplication metrics. """ # Total artifacts and size total_stats = db.query( func.count(Artifact.id), func.coalesce(func.sum(Artifact.size), 0), ).first() total_artifacts = total_stats[0] or 0 total_size_bytes = total_stats[1] or 0 # Unique artifacts (ref_count > 0) and their size unique_stats = ( db.query( func.count(Artifact.id), ) .filter(Artifact.ref_count > 0) .first() ) unique_artifacts = unique_stats[0] or 0 # Orphaned artifacts (ref_count = 0) orphaned_stats = ( db.query( func.count(Artifact.id), func.coalesce(func.sum(Artifact.size), 0), ) .filter(Artifact.ref_count == 0) .first() ) orphaned_artifacts = orphaned_stats[0] or 0 orphaned_size_bytes = orphaned_stats[1] or 0 # Total uploads and deduplicated uploads upload_stats = db.query( func.count(Upload.id), func.count(Upload.id).filter(Upload.deduplicated == True), ).first() total_uploads = upload_stats[0] or 0 deduplicated_uploads = upload_stats[1] or 0 # Calculate deduplication ratio deduplication_ratio = ( total_uploads / unique_artifacts if unique_artifacts > 0 else 0.0 ) # Calculate storage saved (sum of size * (ref_count - 1) for artifacts with ref_count > 1) # This represents bytes that would have been stored without deduplication saved_query = ( db.query(func.coalesce(func.sum(Artifact.size * (Artifact.ref_count - 1)), 0)) .filter(Artifact.ref_count > 1) .first() ) storage_saved_bytes = saved_query[0] or 0 return StorageStatsResponse( total_artifacts=total_artifacts, total_size_bytes=total_size_bytes, unique_artifacts=unique_artifacts, orphaned_artifacts=orphaned_artifacts, orphaned_size_bytes=orphaned_size_bytes, total_uploads=total_uploads, deduplicated_uploads=deduplicated_uploads, deduplication_ratio=deduplication_ratio, storage_saved_bytes=storage_saved_bytes, ) @router.get("/api/v1/stats/storage", response_model=StorageStatsResponse) def get_storage_stats_alias(db: Session = Depends(get_db)): """Alias for /api/v1/stats - get global storage statistics.""" return get_storage_stats(db) @router.get("/api/v1/stats/deduplication", response_model=DeduplicationStatsResponse) def get_deduplication_stats( top_n: int = Query( default=10, ge=1, le=100, description="Number of top referenced artifacts to return", ), db: Session = Depends(get_db), ): """ Get detailed deduplication effectiveness statistics. """ # Total logical bytes (sum of all upload sizes - what would be stored without dedup) # We calculate this as: sum(artifact.size * artifact.ref_count) for all artifacts logical_query = db.query( func.coalesce(func.sum(Artifact.size * Artifact.ref_count), 0) ).first() total_logical_bytes = logical_query[0] or 0 # Total physical bytes (actual storage used) physical_query = ( db.query(func.coalesce(func.sum(Artifact.size), 0)) .filter(Artifact.ref_count > 0) .first() ) total_physical_bytes = physical_query[0] or 0 # Bytes saved bytes_saved = total_logical_bytes - total_physical_bytes # Savings percentage savings_percentage = ( (bytes_saved / total_logical_bytes * 100) if total_logical_bytes > 0 else 0.0 ) # Upload counts total_uploads = db.query(func.count(Upload.id)).scalar() or 0 unique_artifacts = ( db.query(func.count(Artifact.id)).filter(Artifact.ref_count > 0).scalar() or 0 ) duplicate_uploads = ( total_uploads - unique_artifacts if total_uploads > unique_artifacts else 0 ) # Average and max ref_count ref_stats = ( db.query( func.coalesce(func.avg(Artifact.ref_count), 0), func.coalesce(func.max(Artifact.ref_count), 0), ) .filter(Artifact.ref_count > 0) .first() ) average_ref_count = float(ref_stats[0] or 0) max_ref_count = ref_stats[1] or 0 # Top N most referenced artifacts top_artifacts = ( db.query(Artifact) .filter(Artifact.ref_count > 1) .order_by(Artifact.ref_count.desc()) .limit(top_n) .all() ) most_referenced = [ { "artifact_id": a.id, "ref_count": a.ref_count, "size": a.size, "storage_saved": a.size * (a.ref_count - 1), "original_name": a.original_name, "content_type": a.content_type, } for a in top_artifacts ] return DeduplicationStatsResponse( total_logical_bytes=total_logical_bytes, total_physical_bytes=total_physical_bytes, bytes_saved=bytes_saved, savings_percentage=savings_percentage, total_uploads=total_uploads, unique_artifacts=unique_artifacts, duplicate_uploads=duplicate_uploads, average_ref_count=average_ref_count, max_ref_count=max_ref_count, most_referenced_artifacts=most_referenced, ) @router.get( "/api/v1/projects/{project_name}/stats", response_model=ProjectStatsResponse ) def get_project_stats( project_name: str, db: Session = Depends(get_db), ): """ Get statistics for a specific project. """ project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") # Package count package_count = ( db.query(func.count(Package.id)) .filter(Package.project_id == project.id) .scalar() or 0 ) # Get all package IDs for this project package_ids = ( db.query(Package.id).filter(Package.project_id == project.id).subquery() ) # Tag count tag_count = ( db.query(func.count(Tag.id)).filter(Tag.package_id.in_(package_ids)).scalar() or 0 ) # Unique artifact count and total size (via uploads) artifact_stats = ( db.query( func.count(func.distinct(Upload.artifact_id)), func.coalesce(func.sum(Artifact.size), 0), ) .join(Artifact, Upload.artifact_id == Artifact.id) .filter(Upload.package_id.in_(package_ids)) .first() ) artifact_count = artifact_stats[0] if artifact_stats else 0 total_size_bytes = artifact_stats[1] if artifact_stats else 0 # Upload counts and storage saved upload_stats = ( db.query( func.count(Upload.id), func.count(Upload.id).filter(Upload.deduplicated == True), func.coalesce( func.sum(Artifact.size).filter(Upload.deduplicated == True), 0 ), ) .join(Artifact, Upload.artifact_id == Artifact.id) .filter(Upload.package_id.in_(package_ids)) .first() ) upload_count = upload_stats[0] if upload_stats else 0 deduplicated_uploads = upload_stats[1] if upload_stats else 0 storage_saved_bytes = upload_stats[2] if upload_stats else 0 # Calculate deduplication ratio deduplication_ratio = upload_count / artifact_count if artifact_count > 0 else 1.0 return ProjectStatsResponse( project_id=str(project.id), project_name=project.name, package_count=package_count, tag_count=tag_count, artifact_count=artifact_count, total_size_bytes=total_size_bytes, upload_count=upload_count, deduplicated_uploads=deduplicated_uploads, storage_saved_bytes=storage_saved_bytes, deduplication_ratio=deduplication_ratio, ) # ============================================================================= # Package Statistics Endpoint # ============================================================================= @router.get( "/api/v1/project/{project_name}/packages/{package_name}/stats", response_model=PackageStatsResponse, ) def get_package_stats( project_name: str, package_name: str, db: Session = Depends(get_db), ): """Get statistics for a specific package.""" project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Tag count tag_count = ( db.query(func.count(Tag.id)).filter(Tag.package_id == package.id).scalar() or 0 ) # Artifact stats via uploads artifact_stats = ( db.query( func.count(func.distinct(Upload.artifact_id)), func.coalesce(func.sum(Artifact.size), 0), ) .join(Artifact, Upload.artifact_id == Artifact.id) .filter(Upload.package_id == package.id) .first() ) artifact_count = artifact_stats[0] if artifact_stats else 0 total_size_bytes = artifact_stats[1] if artifact_stats else 0 # Upload stats upload_stats = ( db.query( func.count(Upload.id), func.count(Upload.id).filter(Upload.deduplicated == True), func.coalesce( func.sum(Artifact.size).filter(Upload.deduplicated == True), 0 ), ) .join(Artifact, Upload.artifact_id == Artifact.id) .filter(Upload.package_id == package.id) .first() ) upload_count = upload_stats[0] if upload_stats else 0 deduplicated_uploads = upload_stats[1] if upload_stats else 0 storage_saved_bytes = upload_stats[2] if upload_stats else 0 deduplication_ratio = upload_count / artifact_count if artifact_count > 0 else 1.0 return PackageStatsResponse( package_id=str(package.id), package_name=package.name, project_name=project.name, tag_count=tag_count, artifact_count=artifact_count, total_size_bytes=total_size_bytes, upload_count=upload_count, deduplicated_uploads=deduplicated_uploads, storage_saved_bytes=storage_saved_bytes, deduplication_ratio=deduplication_ratio, ) # ============================================================================= # Artifact Statistics Endpoint # ============================================================================= @router.get( "/api/v1/artifact/{artifact_id}/stats", response_model=ArtifactStatsResponse ) def get_artifact_stats( artifact_id: str, db: Session = Depends(get_db), ): """Get detailed statistics for a specific artifact.""" artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first() if not artifact: raise HTTPException(status_code=404, detail="Artifact not found") # Get all tags referencing this artifact tags = ( db.query(Tag, Package, Project) .join(Package, Tag.package_id == Package.id) .join(Project, Package.project_id == Project.id) .filter(Tag.artifact_id == artifact_id) .all() ) tag_list = [ { "tag_name": tag.name, "package_name": pkg.name, "project_name": proj.name, "created_at": tag.created_at.isoformat() if tag.created_at else None, } for tag, pkg, proj in tags ] # Get unique projects and packages projects = list(set(proj.name for _, _, proj in tags)) packages = list(set(f"{proj.name}/{pkg.name}" for _, pkg, proj in tags)) # Get first and last upload times upload_times = ( db.query(func.min(Upload.uploaded_at), func.max(Upload.uploaded_at)) .filter(Upload.artifact_id == artifact_id) .first() ) return ArtifactStatsResponse( artifact_id=artifact.id, sha256=artifact.id, size=artifact.size, ref_count=artifact.ref_count, storage_savings=(artifact.ref_count - 1) * artifact.size if artifact.ref_count > 1 else 0, tags=tag_list, projects=projects, packages=packages, first_uploaded=upload_times[0] if upload_times else None, last_referenced=upload_times[1] if upload_times else None, ) # ============================================================================= # Cross-Project Deduplication Endpoint # ============================================================================= @router.get( "/api/v1/stats/cross-project", response_model=CrossProjectDeduplicationResponse ) def get_cross_project_deduplication( limit: int = Query(default=20, ge=1, le=100), db: Session = Depends(get_db), ): """Get statistics about artifacts shared across multiple projects.""" # Find artifacts that appear in multiple projects # Subquery to count distinct projects per artifact project_counts = ( db.query( Upload.artifact_id, func.count(func.distinct(Package.project_id)).label("project_count"), ) .join(Package, Upload.package_id == Package.id) .group_by(Upload.artifact_id) .subquery() ) # Get artifacts with more than one project shared_artifacts_query = ( db.query(Artifact, project_counts.c.project_count) .join(project_counts, Artifact.id == project_counts.c.artifact_id) .filter(project_counts.c.project_count > 1) .order_by(project_counts.c.project_count.desc(), Artifact.size.desc()) .limit(limit) ) shared_artifacts = [] total_savings = 0 for artifact, project_count in shared_artifacts_query: # Calculate savings: (project_count - 1) * size savings = (project_count - 1) * artifact.size total_savings += savings # Get project names project_names = ( db.query(func.distinct(Project.name)) .join(Package, Package.project_id == Project.id) .join(Upload, Upload.package_id == Package.id) .filter(Upload.artifact_id == artifact.id) .all() ) shared_artifacts.append( { "artifact_id": artifact.id, "size": artifact.size, "project_count": project_count, "projects": [p[0] for p in project_names], "storage_savings": savings, } ) # Total count of shared artifacts shared_count = ( db.query(func.count()) .select_from(project_counts) .filter(project_counts.c.project_count > 1) .scalar() or 0 ) return CrossProjectDeduplicationResponse( shared_artifacts_count=shared_count, total_cross_project_savings=total_savings, shared_artifacts=shared_artifacts, ) # ============================================================================= # Time-Based Statistics Endpoint # ============================================================================= @router.get("/api/v1/stats/timeline", response_model=TimeBasedStatsResponse) def get_time_based_stats( period: str = Query(default="daily", regex="^(daily|weekly|monthly)$"), from_date: Optional[datetime] = Query(default=None), to_date: Optional[datetime] = Query(default=None), db: Session = Depends(get_db), ): """Get deduplication statistics over time.""" from datetime import timedelta # Default date range: last 30 days if to_date is None: to_date = datetime.utcnow() if from_date is None: from_date = to_date - timedelta(days=30) # Determine date truncation based on period if period == "daily": date_trunc = func.date_trunc("day", Upload.uploaded_at) elif period == "weekly": date_trunc = func.date_trunc("week", Upload.uploaded_at) else: # monthly date_trunc = func.date_trunc("month", Upload.uploaded_at) # Query uploads grouped by period stats = ( db.query( date_trunc.label("period_start"), func.count(Upload.id).label("total_uploads"), func.count(func.distinct(Upload.artifact_id)).label("unique_artifacts"), func.count(Upload.id) .filter(Upload.deduplicated == True) .label("duplicated"), func.coalesce( func.sum(Artifact.size).filter(Upload.deduplicated == True), 0 ).label("bytes_saved"), ) .join(Artifact, Upload.artifact_id == Artifact.id) .filter(Upload.uploaded_at >= from_date, Upload.uploaded_at <= to_date) .group_by(date_trunc) .order_by(date_trunc) .all() ) data_points = [ { "date": row.period_start.isoformat() if row.period_start else None, "total_uploads": row.total_uploads, "unique_artifacts": row.unique_artifacts, "duplicated_uploads": row.duplicated, "bytes_saved": row.bytes_saved, } for row in stats ] return TimeBasedStatsResponse( period=period, start_date=from_date, end_date=to_date, data_points=data_points, ) # ============================================================================= # CSV Export Endpoint # ============================================================================= @router.get("/api/v1/stats/export") def export_stats( format: str = Query(default="json", regex="^(json|csv)$"), db: Session = Depends(get_db), ): """Export global statistics in JSON or CSV format.""" from fastapi.responses import Response # Gather all stats total_artifacts = db.query(func.count(Artifact.id)).scalar() or 0 total_size = db.query(func.coalesce(func.sum(Artifact.size), 0)).scalar() or 0 total_uploads = db.query(func.count(Upload.id)).scalar() or 0 deduplicated_uploads = ( db.query(func.count(Upload.id)).filter(Upload.deduplicated == True).scalar() or 0 ) unique_artifacts = ( db.query(func.count(Artifact.id)).filter(Artifact.ref_count > 0).scalar() or 0 ) storage_saved = ( db.query(func.coalesce(func.sum(Artifact.size), 0)) .join(Upload, Upload.artifact_id == Artifact.id) .filter(Upload.deduplicated == True) .scalar() or 0 ) stats = { "generated_at": datetime.utcnow().isoformat(), "total_artifacts": total_artifacts, "total_size_bytes": total_size, "total_uploads": total_uploads, "unique_artifacts": unique_artifacts, "deduplicated_uploads": deduplicated_uploads, "storage_saved_bytes": storage_saved, "deduplication_ratio": total_uploads / unique_artifacts if unique_artifacts > 0 else 1.0, } if format == "csv": import csv import io output = io.StringIO() writer = csv.writer(output) writer.writerow(["Metric", "Value"]) for key, value in stats.items(): writer.writerow([key, value]) return Response( content=output.getvalue(), media_type="text/csv", headers={"Content-Disposition": "attachment; filename=orchard_stats.csv"}, ) return stats # ============================================================================= # Summary Report Endpoint # ============================================================================= @router.get("/api/v1/stats/report", response_model=StatsReportResponse) def generate_stats_report( format: str = Query(default="markdown", regex="^(markdown|json)$"), db: Session = Depends(get_db), ): """Generate a summary report of storage and deduplication statistics.""" # Gather stats total_artifacts = db.query(func.count(Artifact.id)).scalar() or 0 total_size = int(db.query(func.coalesce(func.sum(Artifact.size), 0)).scalar() or 0) total_uploads = db.query(func.count(Upload.id)).scalar() or 0 deduplicated_uploads = ( db.query(func.count(Upload.id)).filter(Upload.deduplicated == True).scalar() or 0 ) unique_artifacts = ( db.query(func.count(Artifact.id)).filter(Artifact.ref_count > 0).scalar() or 0 ) orphaned_artifacts = ( db.query(func.count(Artifact.id)).filter(Artifact.ref_count == 0).scalar() or 0 ) storage_saved = int( db.query(func.coalesce(func.sum(Artifact.size), 0)) .join(Upload, Upload.artifact_id == Artifact.id) .filter(Upload.deduplicated == True) .scalar() or 0 ) project_count = db.query(func.count(Project.id)).scalar() or 0 package_count = db.query(func.count(Package.id)).scalar() or 0 # Top 5 most referenced artifacts top_artifacts = ( db.query(Artifact) .filter(Artifact.ref_count > 1) .order_by(Artifact.ref_count.desc()) .limit(5) .all() ) def format_bytes(b): for unit in ["B", "KB", "MB", "GB", "TB"]: if b < 1024: return f"{b:.2f} {unit}" b /= 1024 return f"{b:.2f} PB" generated_at = datetime.utcnow() if format == "markdown": report = f"""# Orchard Storage Report Generated: {generated_at.strftime("%Y-%m-%d %H:%M:%S UTC")} ## Overview | Metric | Value | |--------|-------| | Projects | {project_count} | | Packages | {package_count} | | Total Artifacts | {total_artifacts} | | Unique Artifacts | {unique_artifacts} | | Orphaned Artifacts | {orphaned_artifacts} | ## Storage | Metric | Value | |--------|-------| | Total Storage Used | {format_bytes(total_size)} | | Storage Saved | {format_bytes(storage_saved)} | | Savings Percentage | {(storage_saved / (total_size + storage_saved) * 100) if (total_size + storage_saved) > 0 else 0:.1f}% | ## Uploads | Metric | Value | |--------|-------| | Total Uploads | {total_uploads} | | Deduplicated Uploads | {deduplicated_uploads} | | Deduplication Ratio | {total_uploads / unique_artifacts if unique_artifacts > 0 else 1:.2f}x | ## Top Referenced Artifacts | Artifact ID | Size | References | Savings | |-------------|------|------------|---------| """ for art in top_artifacts: savings = (art.ref_count - 1) * art.size report += f"| `{art.id[:12]}...` | {format_bytes(art.size)} | {art.ref_count} | {format_bytes(savings)} |\n" return StatsReportResponse( format="markdown", generated_at=generated_at, content=report, ) # JSON format return StatsReportResponse( format="json", generated_at=generated_at, content=json.dumps( { "overview": { "projects": project_count, "packages": package_count, "total_artifacts": total_artifacts, "unique_artifacts": unique_artifacts, "orphaned_artifacts": orphaned_artifacts, }, "storage": { "total_bytes": total_size, "saved_bytes": storage_saved, "savings_percentage": ( storage_saved / (total_size + storage_saved) * 100 ) if (total_size + storage_saved) > 0 else 0, }, "uploads": { "total": total_uploads, "deduplicated": deduplicated_uploads, "ratio": total_uploads / unique_artifacts if unique_artifacts > 0 else 1, }, "top_artifacts": [ { "id": art.id, "size": art.size, "ref_count": art.ref_count, "savings": (art.ref_count - 1) * art.size, } for art in top_artifacts ], }, indent=2, ), ) # ============================================================================= # Audit Log Endpoints # ============================================================================= @router.get("/api/v1/audit-logs", response_model=PaginatedResponse[AuditLogResponse]) def list_audit_logs( action: Optional[str] = Query(None, description="Filter by action type"), resource: Optional[str] = Query(None, description="Filter by resource pattern"), user_id: Optional[str] = Query(None, description="Filter by user"), from_date: Optional[datetime] = Query(None, alias="from", description="Start date"), to_date: Optional[datetime] = Query(None, alias="to", description="End date"), page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), ): """ List audit logs with filtering and pagination. Filters: - action: Filter by action type (e.g., 'project.create', 'artifact.upload') - resource: Filter by resource pattern (partial match) - user_id: Filter by user ID - from/to: Filter by timestamp range """ query = db.query(AuditLog) if action: query = query.filter(AuditLog.action == action) if resource: query = query.filter(AuditLog.resource.ilike(f"%{resource}%")) if user_id: query = query.filter(AuditLog.user_id == user_id) if from_date: query = query.filter(AuditLog.timestamp >= from_date) if to_date: query = query.filter(AuditLog.timestamp <= to_date) total = query.count() total_pages = math.ceil(total / limit) if total > 0 else 1 logs = ( query.order_by(AuditLog.timestamp.desc()) .offset((page - 1) * limit) .limit(limit) .all() ) return PaginatedResponse( items=logs, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) @router.get( "/api/v1/projects/{project_name}/audit-logs", response_model=PaginatedResponse[AuditLogResponse], ) def list_project_audit_logs( project_name: str, action: Optional[str] = Query(None, description="Filter by action type"), from_date: Optional[datetime] = Query(None, alias="from", description="Start date"), to_date: Optional[datetime] = Query(None, alias="to", description="End date"), page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), ): """List audit logs for a specific project.""" project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") # Match resources that start with project name resource_pattern = f"{project_name}%" query = db.query(AuditLog).filter(AuditLog.resource.like(resource_pattern)) if action: query = query.filter(AuditLog.action == action) if from_date: query = query.filter(AuditLog.timestamp >= from_date) if to_date: query = query.filter(AuditLog.timestamp <= to_date) total = query.count() total_pages = math.ceil(total / limit) if total > 0 else 1 logs = ( query.order_by(AuditLog.timestamp.desc()) .offset((page - 1) * limit) .limit(limit) .all() ) return PaginatedResponse( items=logs, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) @router.get( "/api/v1/project/{project_name}/{package_name}/audit-logs", response_model=PaginatedResponse[AuditLogResponse], ) def list_package_audit_logs( project_name: str, package_name: str, action: Optional[str] = Query(None, description="Filter by action type"), from_date: Optional[datetime] = Query(None, alias="from", description="Start date"), to_date: Optional[datetime] = Query(None, alias="to", description="End date"), page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), ): """List audit logs for a specific package.""" project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") # Match resources that contain project/package resource_pattern = f"{project_name}/{package_name}%" query = db.query(AuditLog).filter(AuditLog.resource.like(resource_pattern)) if action: query = query.filter(AuditLog.action == action) if from_date: query = query.filter(AuditLog.timestamp >= from_date) if to_date: query = query.filter(AuditLog.timestamp <= to_date) total = query.count() total_pages = math.ceil(total / limit) if total > 0 else 1 logs = ( query.order_by(AuditLog.timestamp.desc()) .offset((page - 1) * limit) .limit(limit) .all() ) return PaginatedResponse( items=logs, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) # ============================================================================= # Upload History Endpoints # ============================================================================= @router.get( "/api/v1/uploads", response_model=PaginatedResponse[UploadHistoryResponse], ) def list_all_uploads( request: Request, project: Optional[str] = Query(None, description="Filter by project name"), package: Optional[str] = Query(None, description="Filter by package name"), uploaded_by: Optional[str] = Query(None, description="Filter by uploader"), from_date: Optional[datetime] = Query(None, alias="from", description="Start date"), to_date: Optional[datetime] = Query(None, alias="to", description="End date"), deduplicated: Optional[bool] = Query( None, description="Filter by deduplication status" ), search: Optional[str] = Query(None, description="Search by original filename"), tag: Optional[str] = Query( None, description="Filter by tag name. Supports wildcards (*) and comma-separated values", ), sort: Optional[str] = Query( None, description="Sort field: uploaded_at, original_name, size" ), order: Optional[str] = Query("desc", description="Sort order: asc or desc"), page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), ): """ List all upload events globally (admin endpoint). Supports filtering by: - project: Filter by project name - package: Filter by package name (requires project) - uploaded_by: Filter by user ID - from/to: Filter by timestamp range - deduplicated: Filter by deduplication status - search: Search by original filename (case-insensitive) - tag: Filter by tag name """ query = ( db.query(Upload, Package, Project, Artifact) .join(Package, Upload.package_id == Package.id) .join(Project, Package.project_id == Project.id) .join(Artifact, Upload.artifact_id == Artifact.id) ) # Apply filters if project: query = query.filter(Project.name == project) if package: query = query.filter(Package.name == package) if uploaded_by: query = query.filter(Upload.uploaded_by == uploaded_by) if from_date: query = query.filter(Upload.uploaded_at >= from_date) if to_date: query = query.filter(Upload.uploaded_at <= to_date) if deduplicated is not None: query = query.filter(Upload.deduplicated == deduplicated) if search: query = query.filter(Upload.original_name.ilike(f"%{search}%")) if tag: # Support multiple values (comma-separated) and wildcards (*) tag_values = [t.strip() for t in tag.split(",") if t.strip()] if len(tag_values) == 1: tag_val = tag_values[0] if "*" in tag_val: query = query.filter(Upload.tag_name.ilike(tag_val.replace("*", "%"))) else: query = query.filter(Upload.tag_name == tag_val) else: tag_conditions = [] for tag_val in tag_values: if "*" in tag_val: tag_conditions.append( Upload.tag_name.ilike(tag_val.replace("*", "%")) ) else: tag_conditions.append(Upload.tag_name == tag_val) query = query.filter(or_(*tag_conditions)) # Validate and apply sorting valid_sort_fields = { "uploaded_at": Upload.uploaded_at, "original_name": Upload.original_name, "size": Artifact.size, } if sort and sort not in valid_sort_fields: raise HTTPException( status_code=400, detail=f"Invalid sort field. Valid options: {', '.join(valid_sort_fields.keys())}", ) sort_column = valid_sort_fields.get(sort, Upload.uploaded_at) if order and order.lower() not in ("asc", "desc"): raise HTTPException( status_code=400, detail="Invalid order. Valid options: asc, desc" ) sort_order = ( sort_column.asc() if order and order.lower() == "asc" else sort_column.desc() ) total = query.count() total_pages = math.ceil(total / limit) if total > 0 else 1 results = query.order_by(sort_order).offset((page - 1) * limit).limit(limit).all() items = [ UploadHistoryResponse( id=upload.id, artifact_id=upload.artifact_id, package_id=upload.package_id, package_name=pkg.name, project_name=proj.name, original_name=upload.original_name, tag_name=upload.tag_name, uploaded_at=upload.uploaded_at, uploaded_by=upload.uploaded_by, source_ip=upload.source_ip, deduplicated=upload.deduplicated or False, artifact_size=artifact.size, artifact_content_type=artifact.content_type, ) for upload, pkg, proj, artifact in results ] return PaginatedResponse( items=items, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) @router.get( "/api/v1/project/{project_name}/uploads", response_model=PaginatedResponse[UploadHistoryResponse], ) def list_project_uploads( project_name: str, package: Optional[str] = Query(None, description="Filter by package name"), uploaded_by: Optional[str] = Query(None, description="Filter by uploader"), from_date: Optional[datetime] = Query(None, alias="from", description="Start date"), to_date: Optional[datetime] = Query(None, alias="to", description="End date"), deduplicated: Optional[bool] = Query( None, description="Filter by deduplication status" ), page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), ): """ List upload events for a specific project. Supports filtering by: - package: Filter by package name within the project - uploaded_by: Filter by user ID - from/to: Filter by timestamp range - deduplicated: Filter by deduplication status """ project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") # Get all package IDs for this project package_ids_query = db.query(Package.id).filter(Package.project_id == project.id) if package: package_ids_query = package_ids_query.filter(Package.name == package) package_ids = package_ids_query.subquery() query = ( db.query(Upload, Package, Artifact) .join(Package, Upload.package_id == Package.id) .join(Artifact, Upload.artifact_id == Artifact.id) .filter(Upload.package_id.in_(package_ids)) ) if uploaded_by: query = query.filter(Upload.uploaded_by == uploaded_by) if from_date: query = query.filter(Upload.uploaded_at >= from_date) if to_date: query = query.filter(Upload.uploaded_at <= to_date) if deduplicated is not None: query = query.filter(Upload.deduplicated == deduplicated) total = query.count() total_pages = math.ceil(total / limit) if total > 0 else 1 results = ( query.order_by(Upload.uploaded_at.desc()) .offset((page - 1) * limit) .limit(limit) .all() ) items = [ UploadHistoryResponse( id=upload.id, artifact_id=upload.artifact_id, package_id=upload.package_id, package_name=pkg.name, project_name=project_name, original_name=upload.original_name, tag_name=upload.tag_name, uploaded_at=upload.uploaded_at, uploaded_by=upload.uploaded_by, source_ip=upload.source_ip, deduplicated=upload.deduplicated or False, artifact_size=artifact.size, artifact_content_type=artifact.content_type, ) for upload, pkg, artifact in results ] return PaginatedResponse( items=items, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) @router.get( "/api/v1/project/{project_name}/{package_name}/uploads", response_model=PaginatedResponse[UploadHistoryResponse], ) def list_package_uploads( project_name: str, package_name: str, from_date: Optional[datetime] = Query(None, alias="from", description="Start date"), to_date: Optional[datetime] = Query(None, alias="to", description="End date"), page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), ): """List upload events for a specific package.""" project = db.query(Project).filter(Project.name == project_name).first() if not project: raise HTTPException(status_code=404, detail="Project not found") package = ( db.query(Package) .filter(Package.project_id == project.id, Package.name == package_name) .first() ) if not package: raise HTTPException(status_code=404, detail="Package not found") query = db.query(Upload).filter(Upload.package_id == package.id) if from_date: query = query.filter(Upload.uploaded_at >= from_date) if to_date: query = query.filter(Upload.uploaded_at <= to_date) total = query.count() total_pages = math.ceil(total / limit) if total > 0 else 1 uploads = ( query.order_by(Upload.uploaded_at.desc()) .offset((page - 1) * limit) .limit(limit) .all() ) # Build response with artifact metadata items = [] for upload in uploads: artifact = db.query(Artifact).filter(Artifact.id == upload.artifact_id).first() items.append( UploadHistoryResponse( id=upload.id, artifact_id=upload.artifact_id, package_id=upload.package_id, package_name=package_name, project_name=project_name, original_name=upload.original_name, tag_name=upload.tag_name, uploaded_at=upload.uploaded_at, uploaded_by=upload.uploaded_by, source_ip=upload.source_ip, deduplicated=upload.deduplicated or False, artifact_size=artifact.size if artifact else 0, artifact_content_type=artifact.content_type if artifact else None, ) ) return PaginatedResponse( items=items, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) @router.get( "/api/v1/artifact/{artifact_id}/uploads", response_model=PaginatedResponse[UploadHistoryResponse], ) def list_artifact_uploads( artifact_id: str, page: int = Query(1, ge=1), limit: int = Query(20, ge=1, le=100), db: Session = Depends(get_db), ): """List all upload events for a specific artifact.""" artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first() if not artifact: raise HTTPException(status_code=404, detail="Artifact not found") query = db.query(Upload).filter(Upload.artifact_id == artifact_id) total = query.count() total_pages = math.ceil(total / limit) if total > 0 else 1 uploads = ( query.order_by(Upload.uploaded_at.desc()) .offset((page - 1) * limit) .limit(limit) .all() ) # Build response with package/project metadata items = [] for upload in uploads: package = db.query(Package).filter(Package.id == upload.package_id).first() project = ( db.query(Project).filter(Project.id == package.project_id).first() if package else None ) items.append( UploadHistoryResponse( id=upload.id, artifact_id=upload.artifact_id, package_id=upload.package_id, package_name=package.name if package else "unknown", project_name=project.name if project else "unknown", original_name=upload.original_name, tag_name=upload.tag_name, uploaded_at=upload.uploaded_at, uploaded_by=upload.uploaded_by, source_ip=upload.source_ip, deduplicated=upload.deduplicated or False, artifact_size=artifact.size, artifact_content_type=artifact.content_type, ) ) return PaginatedResponse( items=items, pagination=PaginationMeta( page=page, limit=limit, total=total, total_pages=total_pages, has_more=page < total_pages, ), ) # ============================================================================= # Artifact Provenance/History Endpoint # ============================================================================= @router.get( "/api/v1/artifact/{artifact_id}/history", response_model=ArtifactProvenanceResponse ) def get_artifact_provenance( artifact_id: str, db: Session = Depends(get_db), ): """ Get full provenance/history of an artifact. Returns: - Artifact metadata - First upload information - All packages/tags referencing the artifact - Complete upload history """ artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first() if not artifact: raise HTTPException(status_code=404, detail="Artifact not found") # Get all uploads for this artifact uploads = ( db.query(Upload) .filter(Upload.artifact_id == artifact_id) .order_by(Upload.uploaded_at.asc()) .all() ) # Get first upload info first_upload = uploads[0] if uploads else None # Get all tags referencing this artifact tags = db.query(Tag).filter(Tag.artifact_id == artifact_id).all() # Build package list with tags package_map = {} # package_id -> {project_name, package_name, tag_names} tag_list = [] for tag in tags: package = db.query(Package).filter(Package.id == tag.package_id).first() if package: project = db.query(Project).filter(Project.id == package.project_id).first() project_name = project.name if project else "unknown" # Add to package map pkg_key = str(package.id) if pkg_key not in package_map: package_map[pkg_key] = { "project_name": project_name, "package_name": package.name, "tag_names": [], } package_map[pkg_key]["tag_names"].append(tag.name) # Add to tag list tag_list.append( { "project_name": project_name, "package_name": package.name, "tag_name": tag.name, "created_at": tag.created_at.isoformat() if tag.created_at else None, } ) # Build upload history upload_history = [] for upload in uploads: package = db.query(Package).filter(Package.id == upload.package_id).first() project = ( db.query(Project).filter(Project.id == package.project_id).first() if package else None ) upload_history.append( { "upload_id": str(upload.id), "project_name": project.name if project else "unknown", "package_name": package.name if package else "unknown", "original_name": upload.original_name, "tag_name": upload.tag_name, "uploaded_at": upload.uploaded_at.isoformat() if upload.uploaded_at else None, "uploaded_by": upload.uploaded_by, "deduplicated": upload.deduplicated or False, } ) return ArtifactProvenanceResponse( artifact_id=artifact.id, sha256=artifact.id, size=artifact.size, content_type=artifact.content_type, original_name=artifact.original_name, created_at=artifact.created_at, created_by=artifact.created_by, ref_count=artifact.ref_count, first_uploaded_at=first_upload.uploaded_at if first_upload else artifact.created_at, first_uploaded_by=first_upload.uploaded_by if first_upload else artifact.created_by, upload_count=len(uploads), packages=list(package_map.values()), tags=tag_list, uploads=upload_history, )