Files
orchard/backend/app/schemas.py
Mondo Diaz c4c9c20763 Remove tag system, use versions only for artifact references
Tags were mutable aliases that caused confusion alongside the immutable
version system. This removes tags entirely, keeping only PackageVersion
for artifact references.

Changes:
- Remove tags and tag_history tables (migration 012)
- Remove Tag model, TagRepository, and 6 tag API endpoints
- Update cache system to create versions instead of tags
- Update frontend to display versions instead of tags
- Remove tag-related schemas and types
- Update artifact cleanup service for version-based ref_count
2026-02-03 12:18:19 -06:00

1329 lines
36 KiB
Python

from datetime import datetime
from typing import Optional, List, Dict, Any, Generic, TypeVar
from pydantic import BaseModel, field_validator
from uuid import UUID
T = TypeVar("T")
# Pagination schemas
class PaginationMeta(BaseModel):
page: int
limit: int
total: int
total_pages: int
has_more: bool = False # True if there are more pages after current page
class PaginatedResponse(BaseModel, Generic[T]):
items: List[T]
pagination: PaginationMeta
# Project schemas
class ProjectCreate(BaseModel):
name: str
description: Optional[str] = None
is_public: bool = True
team_id: Optional[UUID] = None
class ProjectResponse(BaseModel):
id: UUID
name: str
description: Optional[str]
is_public: bool
is_system: bool = False
created_at: datetime
updated_at: datetime
created_by: str
team_id: Optional[UUID] = None
team_slug: Optional[str] = None
team_name: Optional[str] = None
class Config:
from_attributes = True
class ProjectUpdate(BaseModel):
"""Schema for updating a project"""
description: Optional[str] = None
is_public: Optional[bool] = None
class ProjectWithAccessResponse(ProjectResponse):
"""Project response with user's access level included"""
access_level: Optional[str] = None # 'read', 'write', 'admin', or None
is_owner: bool = False
# Package format and platform enums
PACKAGE_FORMATS = [
"generic",
"npm",
"pypi",
"docker",
"deb",
"rpm",
"maven",
"nuget",
"helm",
]
PACKAGE_PLATFORMS = [
"any",
"linux",
"darwin",
"windows",
"linux-amd64",
"linux-arm64",
"darwin-amd64",
"darwin-arm64",
"windows-amd64",
]
# Package schemas
class PackageCreate(BaseModel):
name: str
description: Optional[str] = None
format: str = "generic"
platform: str = "any"
class PackageResponse(BaseModel):
id: UUID
project_id: UUID
name: str
description: Optional[str]
format: str
platform: str
created_at: datetime
updated_at: datetime
class Config:
from_attributes = True
class PackageUpdate(BaseModel):
"""Schema for updating a package"""
description: Optional[str] = None
format: Optional[str] = None
platform: Optional[str] = None
class PackageDetailResponse(BaseModel):
"""Package with aggregated metadata"""
id: UUID
project_id: UUID
name: str
description: Optional[str]
format: str
platform: str
created_at: datetime
updated_at: datetime
# Aggregated fields
artifact_count: int = 0
total_size: int = 0
latest_upload_at: Optional[datetime] = None
class Config:
from_attributes = True
# Artifact schemas
class ArtifactResponse(BaseModel):
id: str
sha256: str # Explicit SHA256 field (same as id)
size: int
content_type: Optional[str]
original_name: Optional[str]
checksum_md5: Optional[str] = None
checksum_sha1: Optional[str] = None
s3_etag: Optional[str] = None
created_at: datetime
created_by: str
ref_count: int
format_metadata: Optional[Dict[str, Any]] = None
class Config:
from_attributes = True
# Audit log schemas
class AuditLogResponse(BaseModel):
"""Audit log entry response"""
id: UUID
action: str
resource: str
user_id: str
details: Optional[Dict[str, Any]]
timestamp: datetime
source_ip: Optional[str]
class Config:
from_attributes = True
# Upload history schemas
class UploadHistoryResponse(BaseModel):
"""Upload event with artifact details"""
id: UUID
artifact_id: str
package_id: UUID
package_name: str
project_name: str
original_name: Optional[str]
version: Optional[str]
uploaded_at: datetime
uploaded_by: str
source_ip: Optional[str]
deduplicated: bool
# Artifact metadata
artifact_size: int
artifact_content_type: Optional[str]
class Config:
from_attributes = True
# Artifact provenance schemas
class ArtifactProvenanceResponse(BaseModel):
"""Full provenance/history of an artifact"""
artifact_id: str
sha256: str
size: int
content_type: Optional[str]
original_name: Optional[str]
created_at: datetime
created_by: str
ref_count: int
# First upload info
first_uploaded_at: datetime
first_uploaded_by: str
# Usage statistics
upload_count: int
# References
packages: List[Dict[str, Any]] # List of {project_name, package_name, tag_names}
tags: List[
Dict[str, Any]
] # List of {project_name, package_name, tag_name, created_at}
# Upload history
uploads: List[Dict[str, Any]] # List of upload events
class Config:
from_attributes = True
class ArtifactDetailResponse(BaseModel):
"""Artifact with metadata"""
id: str
sha256: str # Explicit SHA256 field (same as id)
size: int
content_type: Optional[str]
original_name: Optional[str]
checksum_md5: Optional[str] = None
checksum_sha1: Optional[str] = None
s3_etag: Optional[str] = None
created_at: datetime
created_by: str
ref_count: int
format_metadata: Optional[Dict[str, Any]] = None
class Config:
from_attributes = True
class PackageArtifactResponse(BaseModel):
"""Artifact for package artifact listing"""
id: str
sha256: str # Explicit SHA256 field (same as id)
size: int
content_type: Optional[str]
original_name: Optional[str]
checksum_md5: Optional[str] = None
checksum_sha1: Optional[str] = None
s3_etag: Optional[str] = None
created_at: datetime
created_by: str
format_metadata: Optional[Dict[str, Any]] = None
class Config:
from_attributes = True
class GlobalArtifactResponse(BaseModel):
"""Artifact with project/package context for global listing"""
id: str
sha256: str
size: int
content_type: Optional[str]
original_name: Optional[str]
created_at: datetime
created_by: str
format_metadata: Optional[Dict[str, Any]] = None
ref_count: int = 0
# Context from versions/packages
projects: List[str] = [] # List of project names containing this artifact
packages: List[str] = [] # List of "project/package" paths
class Config:
from_attributes = True
# Upload response
class UploadResponse(BaseModel):
artifact_id: str
sha256: str # Explicit SHA256 field (same as artifact_id)
size: int
project: str
package: str
version: Optional[str] = None # Version assigned to this artifact
version_source: Optional[str] = None # How version was determined: 'explicit', 'filename', 'metadata'
checksum_md5: Optional[str] = None
checksum_sha1: Optional[str] = None
s3_etag: Optional[str] = None
format_metadata: Optional[Dict[str, Any]] = None
deduplicated: bool = False
ref_count: int = 1 # Current reference count after this upload
# Enhanced metadata (Issue #19)
upload_id: Optional[UUID] = None # UUID of the upload record
content_type: Optional[str] = None
original_name: Optional[str] = None
created_at: Optional[datetime] = None
# Upload metrics (Issue #43)
duration_ms: Optional[int] = None # Upload duration in milliseconds
throughput_mbps: Optional[float] = None # Upload throughput in MB/s
# Resumable upload schemas
class ResumableUploadInitRequest(BaseModel):
"""Request to initiate a resumable upload"""
expected_hash: str # SHA256 hash of the file (client must compute)
filename: str
content_type: Optional[str] = None
size: int
version: Optional[str] = None # Explicit version (auto-detected if not provided)
@field_validator("expected_hash")
@classmethod
def validate_sha256_hash(cls, v: str) -> str:
"""Validate that expected_hash is a valid 64-character lowercase hex SHA256 hash."""
import re
if not re.match(r"^[a-f0-9]{64}$", v.lower()):
raise ValueError(
"expected_hash must be a valid 64-character lowercase hexadecimal SHA256 hash"
)
return v.lower() # Normalize to lowercase
class ResumableUploadInitResponse(BaseModel):
"""Response from initiating a resumable upload"""
upload_id: Optional[str] # None if file already exists
already_exists: bool
artifact_id: Optional[str] = None # Set if already_exists is True
chunk_size: int # Recommended chunk size for parts
class ResumableUploadPartResponse(BaseModel):
"""Response from uploading a part"""
part_number: int
etag: str
class ResumableUploadCompleteRequest(BaseModel):
"""Request to complete a resumable upload"""
pass
class ResumableUploadCompleteResponse(BaseModel):
"""Response from completing a resumable upload"""
artifact_id: str
size: int
project: str
package: str
class ResumableUploadStatusResponse(BaseModel):
"""Status of a resumable upload"""
upload_id: str
uploaded_parts: List[int]
total_uploaded_bytes: int
class UploadProgressResponse(BaseModel):
"""Progress information for an in-flight upload"""
upload_id: str
status: str # 'in_progress', 'completed', 'failed', 'not_found'
bytes_uploaded: int = 0
bytes_total: Optional[int] = None
percent_complete: Optional[float] = None
parts_uploaded: int = 0
parts_total: Optional[int] = None
started_at: Optional[datetime] = None
elapsed_seconds: Optional[float] = None
throughput_mbps: Optional[float] = None
# Consumer schemas
class ConsumerResponse(BaseModel):
id: UUID
package_id: UUID
project_url: str
last_access: datetime
created_at: datetime
class Config:
from_attributes = True
# Package version schemas
class PackageVersionResponse(BaseModel):
"""Immutable version record for an artifact in a package"""
id: UUID
package_id: UUID
artifact_id: str
version: str
version_source: Optional[str] = None # 'explicit', 'filename', 'metadata', 'migrated_from_tag'
created_at: datetime
created_by: str
# Enriched fields from joins
size: Optional[int] = None
content_type: Optional[str] = None
original_name: Optional[str] = None
class Config:
from_attributes = True
class PackageVersionDetailResponse(PackageVersionResponse):
"""Version with full artifact metadata"""
format_metadata: Optional[Dict[str, Any]] = None
checksum_md5: Optional[str] = None
checksum_sha1: Optional[str] = None
# Global search schemas
class SearchResultProject(BaseModel):
"""Project result for global search"""
id: UUID
name: str
description: Optional[str]
is_public: bool
class Config:
from_attributes = True
class SearchResultPackage(BaseModel):
"""Package result for global search"""
id: UUID
project_id: UUID
project_name: str
name: str
description: Optional[str]
format: str
class Config:
from_attributes = True
class SearchResultArtifact(BaseModel):
"""Artifact result for global search"""
artifact_id: str
version: Optional[str]
package_id: UUID
package_name: str
project_name: str
original_name: Optional[str]
class GlobalSearchResponse(BaseModel):
"""Combined search results across all entity types"""
query: str
projects: List[SearchResultProject]
packages: List[SearchResultPackage]
artifacts: List[SearchResultArtifact]
counts: Dict[str, int] # Total counts for each type
# Presigned URL response
class PresignedUrlResponse(BaseModel):
"""Response containing a presigned URL for direct S3 download"""
url: str
expires_at: datetime
method: str = "GET"
artifact_id: str
size: int
content_type: Optional[str] = None
original_name: Optional[str] = None
checksum_sha256: Optional[str] = None
checksum_md5: Optional[str] = None
# Health check
class HealthResponse(BaseModel):
status: str
version: str = "1.0.0"
storage_healthy: Optional[bool] = None
database_healthy: Optional[bool] = None
# Garbage collection schemas
class GarbageCollectionResponse(BaseModel):
"""Response from garbage collection operation"""
artifacts_deleted: int
bytes_freed: int
artifact_ids: List[str]
dry_run: bool
class OrphanedArtifactResponse(BaseModel):
"""Information about an orphaned artifact"""
id: str
size: int
created_at: datetime
created_by: str
original_name: Optional[str]
# Storage statistics schemas
class StorageStatsResponse(BaseModel):
"""Global storage statistics"""
total_artifacts: int
total_size_bytes: int
unique_artifacts: int # Artifacts with ref_count > 0
orphaned_artifacts: int # Artifacts with ref_count = 0
orphaned_size_bytes: int
total_uploads: int
deduplicated_uploads: int
deduplication_ratio: (
float # total_uploads / unique_artifacts (if > 1, deduplication is working)
)
storage_saved_bytes: int # Bytes saved through deduplication
class ConsistencyCheckResponse(BaseModel):
"""Result of S3/Database consistency check"""
total_artifacts_checked: int
orphaned_s3_objects: int # Objects in S3 but not in DB
missing_s3_objects: int # Records in DB but not in S3
size_mismatches: int # Records where DB size != S3 size
healthy: bool
orphaned_s3_keys: List[str] = [] # Limited list of orphaned S3 keys
missing_s3_keys: List[str] = [] # Limited list of missing S3 keys
size_mismatch_artifacts: List[Dict[str, Any]] = [] # Limited list of mismatches
class DeduplicationStatsResponse(BaseModel):
"""Deduplication effectiveness statistics"""
total_logical_bytes: (
int # Sum of all upload sizes (what would be stored without dedup)
)
total_physical_bytes: int # Actual storage used
bytes_saved: int
savings_percentage: float
total_uploads: int
unique_artifacts: int
duplicate_uploads: int
average_ref_count: float
max_ref_count: int
most_referenced_artifacts: List[Dict[str, Any]] # Top N most referenced
class ProjectStatsResponse(BaseModel):
"""Per-project statistics"""
project_id: str
project_name: str
package_count: int
artifact_count: int
total_size_bytes: int
upload_count: int
deduplicated_uploads: int
storage_saved_bytes: int = 0 # Bytes saved through deduplication
deduplication_ratio: float = 1.0 # upload_count / artifact_count
class PackageStatsResponse(BaseModel):
"""Per-package statistics"""
package_id: str
package_name: str
project_name: str
artifact_count: int
total_size_bytes: int
upload_count: int
deduplicated_uploads: int
storage_saved_bytes: int = 0
deduplication_ratio: float = 1.0
class ArtifactStatsResponse(BaseModel):
"""Per-artifact reference statistics"""
artifact_id: str
sha256: str
size: int
ref_count: int
storage_savings: int # (ref_count - 1) * size
projects: List[str] # Projects using this artifact
packages: List[str] # Packages using this artifact
first_uploaded: Optional[datetime] = None
last_referenced: Optional[datetime] = None
class CrossProjectDeduplicationResponse(BaseModel):
"""Cross-project deduplication statistics"""
shared_artifacts_count: int # Artifacts used in multiple projects
total_cross_project_savings: int # Bytes saved by cross-project sharing
shared_artifacts: List[Dict[str, Any]] # Details of shared artifacts
class TimeBasedStatsResponse(BaseModel):
"""Time-based deduplication statistics"""
period: str # "daily", "weekly", "monthly"
start_date: datetime
end_date: datetime
data_points: List[
Dict[str, Any]
] # List of {date, uploads, unique, duplicated, bytes_saved}
class StatsReportResponse(BaseModel):
"""Summary report in various formats"""
format: str # "json", "csv", "markdown"
generated_at: datetime
content: str # The report content
# Authentication schemas
class LoginRequest(BaseModel):
"""Login request with username and password"""
username: str
password: str
class LoginResponse(BaseModel):
"""Login response with user info"""
id: UUID
username: str
email: Optional[str]
is_admin: bool
must_change_password: bool
class ChangePasswordRequest(BaseModel):
"""Change password request"""
current_password: str
new_password: str
class UserResponse(BaseModel):
"""User information response"""
id: UUID
username: str
email: Optional[str]
is_admin: bool
is_active: bool
must_change_password: bool
created_at: datetime
last_login: Optional[datetime]
class Config:
from_attributes = True
class UserCreate(BaseModel):
"""Create user request (admin only)"""
username: str
password: str
email: Optional[str] = None
is_admin: bool = False
class UserUpdate(BaseModel):
"""Update user request (admin only)"""
email: Optional[str] = None
is_admin: Optional[bool] = None
is_active: Optional[bool] = None
class ResetPasswordRequest(BaseModel):
"""Reset password request (admin only)"""
new_password: str
class APIKeyCreate(BaseModel):
"""Create API key request"""
name: str
description: Optional[str] = None
scopes: Optional[List[str]] = None
class APIKeyResponse(BaseModel):
"""API key response (without the secret key)"""
id: UUID
name: str
description: Optional[str]
scopes: Optional[List[str]]
created_at: datetime
expires_at: Optional[datetime]
last_used: Optional[datetime]
class Config:
from_attributes = True
class APIKeyCreateResponse(BaseModel):
"""API key creation response (includes the secret key - only shown once)"""
id: UUID
name: str
description: Optional[str]
scopes: Optional[List[str]]
key: str # The actual API key - only returned on creation
created_at: datetime
expires_at: Optional[datetime]
# OIDC Configuration schemas
class OIDCConfigResponse(BaseModel):
"""OIDC configuration response (hides client secret)"""
enabled: bool
issuer_url: str
client_id: str
has_client_secret: bool # True if secret is configured, but don't expose it
scopes: List[str]
auto_create_users: bool
admin_group: str
class OIDCConfigUpdate(BaseModel):
"""Update OIDC configuration"""
enabled: Optional[bool] = None
issuer_url: Optional[str] = None
client_id: Optional[str] = None
client_secret: Optional[str] = None # Only set if changing
scopes: Optional[List[str]] = None
auto_create_users: Optional[bool] = None
admin_group: Optional[str] = None
class OIDCStatusResponse(BaseModel):
"""Public OIDC status response"""
enabled: bool
issuer_url: Optional[str] = None # Only included if enabled
class OIDCLoginResponse(BaseModel):
"""OIDC login initiation response"""
authorization_url: str
# Access Permission schemas
class AccessPermissionCreate(BaseModel):
"""Grant access to a user for a project"""
username: str
level: str # 'read', 'write', or 'admin'
expires_at: Optional[datetime] = None
@field_validator('level')
@classmethod
def validate_level(cls, v):
if v not in ('read', 'write', 'admin'):
raise ValueError("level must be 'read', 'write', or 'admin'")
return v
class AccessPermissionUpdate(BaseModel):
"""Update access permission"""
level: Optional[str] = None
expires_at: Optional[datetime] = None
@field_validator('level')
@classmethod
def validate_level(cls, v):
if v is not None and v not in ('read', 'write', 'admin'):
raise ValueError("level must be 'read', 'write', or 'admin'")
return v
class AccessPermissionResponse(BaseModel):
"""Access permission response"""
id: UUID
project_id: UUID
user_id: str
level: str
created_at: datetime
expires_at: Optional[datetime]
source: Optional[str] = "explicit" # "explicit" or "team"
team_slug: Optional[str] = None # Team slug if source is "team"
team_role: Optional[str] = None # Team role if source is "team"
class Config:
from_attributes = True
class ProjectWithAccessResponse(ProjectResponse):
"""Project response with user's access level"""
user_access_level: Optional[str] = None
# Artifact Dependency schemas
class DependencyCreate(BaseModel):
"""Schema for creating a dependency"""
project: str
package: str
version: str
class DependencyResponse(BaseModel):
"""Schema for dependency response"""
id: UUID
artifact_id: str
project: str
package: str
version: str
created_at: datetime
class Config:
from_attributes = True
@classmethod
def from_orm_model(cls, dep) -> "DependencyResponse":
"""Create from ORM model with field mapping"""
return cls(
id=dep.id,
artifact_id=dep.artifact_id,
project=dep.dependency_project,
package=dep.dependency_package,
version=dep.version_constraint,
created_at=dep.created_at,
)
class ArtifactDependenciesResponse(BaseModel):
"""Response containing all dependencies for an artifact"""
artifact_id: str
dependencies: List[DependencyResponse]
class DependentInfo(BaseModel):
"""Information about an artifact that depends on a package"""
artifact_id: str
project: str
package: str
version: Optional[str] = None
constraint_value: str
class ReverseDependenciesResponse(BaseModel):
"""Response containing packages that depend on a given package"""
project: str
package: str
dependents: List[DependentInfo]
pagination: PaginationMeta
class EnsureFileDependency(BaseModel):
"""Dependency entry from orchard.ensure file"""
project: str
package: str
version: str
class EnsureFileContent(BaseModel):
"""Parsed content of orchard.ensure file"""
dependencies: List[EnsureFileDependency] = []
class ResolvedArtifact(BaseModel):
"""A resolved artifact in the dependency tree"""
artifact_id: str
project: str
package: str
version: Optional[str] = None
size: int
download_url: str
class MissingDependency(BaseModel):
"""A dependency that could not be resolved (not cached on server)"""
project: str
package: str
constraint: Optional[str] = None
required_by: Optional[str] = None
class DependencyResolutionResponse(BaseModel):
"""Response from dependency resolution endpoint"""
requested: Dict[str, str] # project, package, ref
resolved: List[ResolvedArtifact]
missing: List[MissingDependency] = []
total_size: int
artifact_count: int
class DependencyConflict(BaseModel):
"""Details about a dependency conflict"""
project: str
package: str
requirements: List[Dict[str, Any]] # version and required_by info
class DependencyConflictError(BaseModel):
"""Error response for dependency conflicts"""
error: str = "dependency_conflict"
message: str
conflicts: List[DependencyConflict]
class CircularDependencyError(BaseModel):
"""Error response for circular dependencies"""
error: str = "circular_dependency"
message: str
cycle: List[str] # List of "project/package" strings showing the cycle
# Team schemas
TEAM_ROLES = ["owner", "admin", "member"]
RESERVED_TEAM_SLUGS = {"new", "api", "admin", "settings", "members", "projects", "search"}
class TeamCreate(BaseModel):
"""Create a new team"""
name: str
slug: str
description: Optional[str] = None
@field_validator('name')
@classmethod
def validate_name(cls, v: str) -> str:
"""Validate team name."""
if not v or not v.strip():
raise ValueError("Name cannot be empty")
if len(v) > 255:
raise ValueError("Name must be 255 characters or less")
return v.strip()
@field_validator('slug')
@classmethod
def validate_slug(cls, v: str) -> str:
"""Validate team slug format (lowercase alphanumeric with hyphens)."""
import re
if not v:
raise ValueError("Slug cannot be empty")
if len(v) < 2:
raise ValueError("Slug must be at least 2 characters")
if len(v) > 255:
raise ValueError("Slug must be 255 characters or less")
if not re.match(r'^[a-z0-9][a-z0-9-]*[a-z0-9]$', v) and not re.match(r'^[a-z0-9]$', v):
raise ValueError(
"Slug must be lowercase alphanumeric with hyphens, "
"starting and ending with alphanumeric characters"
)
if '--' in v:
raise ValueError("Slug cannot contain consecutive hyphens")
if v in RESERVED_TEAM_SLUGS:
raise ValueError(f"Slug '{v}' is reserved and cannot be used")
return v
@field_validator('description')
@classmethod
def validate_description(cls, v: Optional[str]) -> Optional[str]:
"""Validate team description."""
if v is not None and len(v) > 2000:
raise ValueError("Description must be 2000 characters or less")
return v
class TeamUpdate(BaseModel):
"""Update team details"""
name: Optional[str] = None
description: Optional[str] = None
@field_validator('name')
@classmethod
def validate_name(cls, v: Optional[str]) -> Optional[str]:
"""Validate team name."""
if v is not None:
if not v.strip():
raise ValueError("Name cannot be empty")
if len(v) > 255:
raise ValueError("Name must be 255 characters or less")
return v.strip()
return v
@field_validator('description')
@classmethod
def validate_description(cls, v: Optional[str]) -> Optional[str]:
"""Validate team description."""
if v is not None and len(v) > 2000:
raise ValueError("Description must be 2000 characters or less")
return v
class TeamResponse(BaseModel):
"""Team response with basic info"""
id: UUID
name: str
slug: str
description: Optional[str]
created_at: datetime
updated_at: datetime
member_count: int = 0
project_count: int = 0
class Config:
from_attributes = True
class TeamDetailResponse(TeamResponse):
"""Team response with user's role"""
user_role: Optional[str] = None # 'owner', 'admin', 'member', or None
class TeamMemberCreate(BaseModel):
"""Add a member to a team"""
username: str
role: str = "member"
@field_validator('role')
@classmethod
def validate_role(cls, v: str) -> str:
if v not in TEAM_ROLES:
raise ValueError(f"Role must be one of: {', '.join(TEAM_ROLES)}")
return v
class TeamMemberUpdate(BaseModel):
"""Update a team member's role"""
role: str
@field_validator('role')
@classmethod
def validate_role(cls, v: str) -> str:
if v not in TEAM_ROLES:
raise ValueError(f"Role must be one of: {', '.join(TEAM_ROLES)}")
return v
class TeamMemberResponse(BaseModel):
"""Team member response"""
id: UUID
user_id: UUID
username: str
email: Optional[str]
role: str
created_at: datetime
class Config:
from_attributes = True
# =============================================================================
# Upstream Caching Schemas
# =============================================================================
# Valid source types
SOURCE_TYPES = ["npm", "pypi", "maven", "docker", "helm", "nuget", "deb", "rpm", "generic"]
# Valid auth types
AUTH_TYPES = ["none", "basic", "bearer", "api_key"]
class UpstreamSourceCreate(BaseModel):
"""Create a new upstream source"""
name: str
source_type: str = "generic"
url: str
enabled: bool = False
auth_type: str = "none"
username: Optional[str] = None
password: Optional[str] = None # Write-only
headers: Optional[dict] = None # Write-only, custom headers
priority: int = 100
@field_validator('name')
@classmethod
def validate_name(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("name cannot be empty")
if len(v) > 255:
raise ValueError("name must be 255 characters or less")
return v
@field_validator('source_type')
@classmethod
def validate_source_type(cls, v: str) -> str:
if v not in SOURCE_TYPES:
raise ValueError(f"source_type must be one of: {', '.join(SOURCE_TYPES)}")
return v
@field_validator('url')
@classmethod
def validate_url(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("url cannot be empty")
if not (v.startswith('http://') or v.startswith('https://')):
raise ValueError("url must start with http:// or https://")
if len(v) > 2048:
raise ValueError("url must be 2048 characters or less")
return v
@field_validator('auth_type')
@classmethod
def validate_auth_type(cls, v: str) -> str:
if v not in AUTH_TYPES:
raise ValueError(f"auth_type must be one of: {', '.join(AUTH_TYPES)}")
return v
@field_validator('priority')
@classmethod
def validate_priority(cls, v: int) -> int:
if v <= 0:
raise ValueError("priority must be greater than 0")
return v
class UpstreamSourceUpdate(BaseModel):
"""Update an upstream source (partial)"""
name: Optional[str] = None
source_type: Optional[str] = None
url: Optional[str] = None
enabled: Optional[bool] = None
auth_type: Optional[str] = None
username: Optional[str] = None
password: Optional[str] = None # Write-only, None = keep existing, empty string = clear
headers: Optional[dict] = None # Write-only
priority: Optional[int] = None
@field_validator('name')
@classmethod
def validate_name(cls, v: Optional[str]) -> Optional[str]:
if v is not None:
v = v.strip()
if not v:
raise ValueError("name cannot be empty")
if len(v) > 255:
raise ValueError("name must be 255 characters or less")
return v
@field_validator('source_type')
@classmethod
def validate_source_type(cls, v: Optional[str]) -> Optional[str]:
if v is not None and v not in SOURCE_TYPES:
raise ValueError(f"source_type must be one of: {', '.join(SOURCE_TYPES)}")
return v
@field_validator('url')
@classmethod
def validate_url(cls, v: Optional[str]) -> Optional[str]:
if v is not None:
v = v.strip()
if not v:
raise ValueError("url cannot be empty")
if not (v.startswith('http://') or v.startswith('https://')):
raise ValueError("url must start with http:// or https://")
if len(v) > 2048:
raise ValueError("url must be 2048 characters or less")
return v
@field_validator('auth_type')
@classmethod
def validate_auth_type(cls, v: Optional[str]) -> Optional[str]:
if v is not None and v not in AUTH_TYPES:
raise ValueError(f"auth_type must be one of: {', '.join(AUTH_TYPES)}")
return v
@field_validator('priority')
@classmethod
def validate_priority(cls, v: Optional[int]) -> Optional[int]:
if v is not None and v <= 0:
raise ValueError("priority must be greater than 0")
return v
class UpstreamSourceResponse(BaseModel):
"""Upstream source response (credentials never included)"""
id: UUID
name: str
source_type: str
url: str
enabled: bool
auth_type: str
username: Optional[str]
has_password: bool # True if password is set
has_headers: bool # True if custom headers are set
priority: int
source: str = "database" # "database" or "env" (env = defined via environment variables)
created_at: Optional[datetime] = None # May be None for legacy/env data
updated_at: Optional[datetime] = None # May be None for legacy/env data
class Config:
from_attributes = True
class CacheSettingsResponse(BaseModel):
"""Global cache settings response"""
auto_create_system_projects: bool
auto_create_system_projects_env_override: Optional[bool] = None # Set if overridden by env var
created_at: Optional[datetime] = None # May be None for legacy data
updated_at: Optional[datetime] = None # May be None for legacy data
class Config:
from_attributes = True
class CacheSettingsUpdate(BaseModel):
"""Update cache settings (partial)"""
auto_create_system_projects: Optional[bool] = None
class CachedUrlResponse(BaseModel):
"""Cached URL response"""
id: UUID
url: str
url_hash: str
artifact_id: str
source_id: Optional[UUID]
source_name: Optional[str] = None # Populated from join
fetched_at: datetime
created_at: datetime
class Config:
from_attributes = True
class CacheRequest(BaseModel):
"""Request to cache an artifact from an upstream URL"""
url: str
source_type: str
package_name: Optional[str] = None # Auto-derived from URL if not provided
version: Optional[str] = None # Auto-derived from URL if not provided
user_project: Optional[str] = None # Cross-reference to user project
user_package: Optional[str] = None
user_version: Optional[str] = None
expected_hash: Optional[str] = None # Verify downloaded content
@field_validator('url')
@classmethod
def validate_url(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("url cannot be empty")
if not (v.startswith('http://') or v.startswith('https://')):
raise ValueError("url must start with http:// or https://")
if len(v) > 4096:
raise ValueError("url must be 4096 characters or less")
return v
@field_validator('source_type')
@classmethod
def validate_source_type(cls, v: str) -> str:
if v not in SOURCE_TYPES:
raise ValueError(f"source_type must be one of: {', '.join(SOURCE_TYPES)}")
return v
@field_validator('expected_hash')
@classmethod
def validate_expected_hash(cls, v: Optional[str]) -> Optional[str]:
if v is not None:
v = v.strip().lower()
# Remove sha256: prefix if present
if v.startswith('sha256:'):
v = v[7:]
# Validate hex format
if len(v) != 64 or not all(c in '0123456789abcdef' for c in v):
raise ValueError("expected_hash must be a 64-character hex string (SHA256)")
return v
class CacheResponse(BaseModel):
"""Response from caching an artifact"""
artifact_id: str
sha256: str
size: int
content_type: Optional[str]
already_cached: bool
source_url: str
source_name: Optional[str]
system_project: str
system_package: str
system_version: Optional[str]
user_reference: Optional[str] = None # e.g., "my-app/npm-deps/+/4.17.21"
class CacheResolveRequest(BaseModel):
"""Request to cache an artifact by package coordinates (no URL required).
The server will construct the appropriate URL based on source_type and
configured upstream sources.
"""
source_type: str
package: str
version: str
user_project: Optional[str] = None
user_package: Optional[str] = None
user_version: Optional[str] = None
@field_validator('source_type')
@classmethod
def validate_source_type(cls, v: str) -> str:
if v not in SOURCE_TYPES:
raise ValueError(f"source_type must be one of: {', '.join(SOURCE_TYPES)}")
return v
@field_validator('package')
@classmethod
def validate_package(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("package cannot be empty")
return v
@field_validator('version')
@classmethod
def validate_version(cls, v: str) -> str:
v = v.strip()
if not v:
raise ValueError("version cannot be empty")
return v