Add audit log and history API endpoints (#20)

- Add audit log endpoints: GET /api/v1/audit-logs with filters, project-scoped, package-scoped
- Add upload history endpoints: package uploads, artifact uploads
- Add artifact provenance endpoint: GET /api/v1/artifact/{id}/history
- Add audit logging for project.create, package.create, tag.create, tag.update
- Add AuditLogResponse, UploadHistoryResponse, ArtifactProvenanceResponse schemas
- Add 18 integration tests for new endpoints
This commit is contained in:
Mondo Diaz
2026-01-06 14:12:56 -06:00
parent 81458b3bcb
commit 3d0f502867
4 changed files with 896 additions and 1 deletions

View File

@@ -58,6 +58,10 @@ from .schemas import (
TagResponse,
TagDetailResponse,
TagHistoryResponse,
TagHistoryDetailResponse,
AuditLogResponse,
UploadHistoryResponse,
ArtifactProvenanceResponse,
UploadResponse,
ConsumerResponse,
HealthResponse,
@@ -499,6 +503,17 @@ def create_project(
created_by=user_id,
)
db.add(db_project)
# Audit log
_log_audit(
db=db,
action="project.create",
resource=f"project/{project.name}",
user_id=user_id,
source_ip=request.client.host if request.client else None,
details={"is_public": project.is_public},
)
db.commit()
db.refresh(db_project)
return db_project
@@ -835,7 +850,10 @@ def get_package(
@router.post("/api/v1/project/{project_name}/packages", response_model=PackageResponse)
def create_package(
project_name: str, package: PackageCreate, db: Session = Depends(get_db)
project_name: str,
package: PackageCreate,
request: Request,
db: Session = Depends(get_db),
):
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
@@ -873,6 +891,17 @@ def create_package(
platform=package.platform,
)
db.add(db_package)
# Audit log
_log_audit(
db=db,
action="package.create",
resource=f"project/{project_name}/{package.name}",
user_id=get_user_id(request),
source_ip=request.client.host if request.client else None,
details={"format": package.format, "platform": package.platform},
)
db.commit()
db.refresh(db_package)
return db_package
@@ -1850,8 +1879,23 @@ def create_tag(
db.query(Tag).filter(Tag.package_id == package.id, Tag.name == tag.name).first()
)
if existing:
old_artifact_id = existing.artifact_id
existing.artifact_id = tag.artifact_id
existing.created_by = user_id
# Audit log for tag update
_log_audit(
db=db,
action="tag.update",
resource=f"project/{project_name}/{package_name}/tag/{tag.name}",
user_id=user_id,
source_ip=request.client.host if request.client else None,
details={
"old_artifact_id": old_artifact_id,
"new_artifact_id": tag.artifact_id,
},
)
db.commit()
db.refresh(existing)
return existing
@@ -1863,6 +1907,17 @@ def create_tag(
created_by=user_id,
)
db.add(db_tag)
# Audit log for tag create
_log_audit(
db=db,
action="tag.create",
resource=f"project/{project_name}/{package_name}/tag/{tag.name}",
user_id=user_id,
source_ip=request.client.host if request.client else None,
details={"artifact_id": tag.artifact_id},
)
db.commit()
db.refresh(db_tag)
return db_tag
@@ -3069,3 +3124,434 @@ Generated: {generated_at.strftime("%Y-%m-%d %H:%M:%S UTC")}
indent=2,
),
)
# =============================================================================
# Audit Log Endpoints
# =============================================================================
@router.get("/api/v1/audit-logs", response_model=PaginatedResponse[AuditLogResponse])
def list_audit_logs(
action: Optional[str] = Query(None, description="Filter by action type"),
resource: Optional[str] = Query(None, description="Filter by resource pattern"),
user_id: Optional[str] = Query(None, description="Filter by user"),
from_date: Optional[datetime] = Query(None, alias="from", description="Start date"),
to_date: Optional[datetime] = Query(None, alias="to", description="End date"),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""
List audit logs with filtering and pagination.
Filters:
- action: Filter by action type (e.g., 'project.create', 'artifact.upload')
- resource: Filter by resource pattern (partial match)
- user_id: Filter by user ID
- from/to: Filter by timestamp range
"""
query = db.query(AuditLog)
if action:
query = query.filter(AuditLog.action == action)
if resource:
query = query.filter(AuditLog.resource.ilike(f"%{resource}%"))
if user_id:
query = query.filter(AuditLog.user_id == user_id)
if from_date:
query = query.filter(AuditLog.timestamp >= from_date)
if to_date:
query = query.filter(AuditLog.timestamp <= to_date)
total = query.count()
total_pages = math.ceil(total / limit) if total > 0 else 1
logs = (
query.order_by(AuditLog.timestamp.desc())
.offset((page - 1) * limit)
.limit(limit)
.all()
)
return PaginatedResponse(
items=logs,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
),
)
@router.get(
"/api/v1/projects/{project_name}/audit-logs",
response_model=PaginatedResponse[AuditLogResponse],
)
def list_project_audit_logs(
project_name: str,
action: Optional[str] = Query(None, description="Filter by action type"),
from_date: Optional[datetime] = Query(None, alias="from", description="Start date"),
to_date: Optional[datetime] = Query(None, alias="to", description="End date"),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""List audit logs for a specific project."""
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Match resources that start with project name
resource_pattern = f"{project_name}%"
query = db.query(AuditLog).filter(AuditLog.resource.like(resource_pattern))
if action:
query = query.filter(AuditLog.action == action)
if from_date:
query = query.filter(AuditLog.timestamp >= from_date)
if to_date:
query = query.filter(AuditLog.timestamp <= to_date)
total = query.count()
total_pages = math.ceil(total / limit) if total > 0 else 1
logs = (
query.order_by(AuditLog.timestamp.desc())
.offset((page - 1) * limit)
.limit(limit)
.all()
)
return PaginatedResponse(
items=logs,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
),
)
@router.get(
"/api/v1/project/{project_name}/{package_name}/audit-logs",
response_model=PaginatedResponse[AuditLogResponse],
)
def list_package_audit_logs(
project_name: str,
package_name: str,
action: Optional[str] = Query(None, description="Filter by action type"),
from_date: Optional[datetime] = Query(None, alias="from", description="Start date"),
to_date: Optional[datetime] = Query(None, alias="to", description="End date"),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""List audit logs for a specific package."""
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
package = (
db.query(Package)
.filter(Package.project_id == project.id, Package.name == package_name)
.first()
)
if not package:
raise HTTPException(status_code=404, detail="Package not found")
# Match resources that contain project/package
resource_pattern = f"{project_name}/{package_name}%"
query = db.query(AuditLog).filter(AuditLog.resource.like(resource_pattern))
if action:
query = query.filter(AuditLog.action == action)
if from_date:
query = query.filter(AuditLog.timestamp >= from_date)
if to_date:
query = query.filter(AuditLog.timestamp <= to_date)
total = query.count()
total_pages = math.ceil(total / limit) if total > 0 else 1
logs = (
query.order_by(AuditLog.timestamp.desc())
.offset((page - 1) * limit)
.limit(limit)
.all()
)
return PaginatedResponse(
items=logs,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
),
)
# =============================================================================
# Upload History Endpoints
# =============================================================================
@router.get(
"/api/v1/project/{project_name}/{package_name}/uploads",
response_model=PaginatedResponse[UploadHistoryResponse],
)
def list_package_uploads(
project_name: str,
package_name: str,
from_date: Optional[datetime] = Query(None, alias="from", description="Start date"),
to_date: Optional[datetime] = Query(None, alias="to", description="End date"),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""List upload events for a specific package."""
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
package = (
db.query(Package)
.filter(Package.project_id == project.id, Package.name == package_name)
.first()
)
if not package:
raise HTTPException(status_code=404, detail="Package not found")
query = db.query(Upload).filter(Upload.package_id == package.id)
if from_date:
query = query.filter(Upload.uploaded_at >= from_date)
if to_date:
query = query.filter(Upload.uploaded_at <= to_date)
total = query.count()
total_pages = math.ceil(total / limit) if total > 0 else 1
uploads = (
query.order_by(Upload.uploaded_at.desc())
.offset((page - 1) * limit)
.limit(limit)
.all()
)
# Build response with artifact metadata
items = []
for upload in uploads:
artifact = db.query(Artifact).filter(Artifact.id == upload.artifact_id).first()
items.append(
UploadHistoryResponse(
id=upload.id,
artifact_id=upload.artifact_id,
package_id=upload.package_id,
package_name=package_name,
project_name=project_name,
original_name=upload.original_name,
tag_name=upload.tag_name,
uploaded_at=upload.uploaded_at,
uploaded_by=upload.uploaded_by,
source_ip=upload.source_ip,
deduplicated=upload.deduplicated or False,
artifact_size=artifact.size if artifact else 0,
artifact_content_type=artifact.content_type if artifact else None,
)
)
return PaginatedResponse(
items=items,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
),
)
@router.get(
"/api/v1/artifact/{artifact_id}/uploads",
response_model=PaginatedResponse[UploadHistoryResponse],
)
def list_artifact_uploads(
artifact_id: str,
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""List all upload events for a specific artifact."""
artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first()
if not artifact:
raise HTTPException(status_code=404, detail="Artifact not found")
query = db.query(Upload).filter(Upload.artifact_id == artifact_id)
total = query.count()
total_pages = math.ceil(total / limit) if total > 0 else 1
uploads = (
query.order_by(Upload.uploaded_at.desc())
.offset((page - 1) * limit)
.limit(limit)
.all()
)
# Build response with package/project metadata
items = []
for upload in uploads:
package = db.query(Package).filter(Package.id == upload.package_id).first()
project = (
db.query(Project).filter(Project.id == package.project_id).first()
if package
else None
)
items.append(
UploadHistoryResponse(
id=upload.id,
artifact_id=upload.artifact_id,
package_id=upload.package_id,
package_name=package.name if package else "unknown",
project_name=project.name if project else "unknown",
original_name=upload.original_name,
tag_name=upload.tag_name,
uploaded_at=upload.uploaded_at,
uploaded_by=upload.uploaded_by,
source_ip=upload.source_ip,
deduplicated=upload.deduplicated or False,
artifact_size=artifact.size,
artifact_content_type=artifact.content_type,
)
)
return PaginatedResponse(
items=items,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
),
)
# =============================================================================
# Artifact Provenance/History Endpoint
# =============================================================================
@router.get(
"/api/v1/artifact/{artifact_id}/history", response_model=ArtifactProvenanceResponse
)
def get_artifact_provenance(
artifact_id: str,
db: Session = Depends(get_db),
):
"""
Get full provenance/history of an artifact.
Returns:
- Artifact metadata
- First upload information
- All packages/tags referencing the artifact
- Complete upload history
"""
artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first()
if not artifact:
raise HTTPException(status_code=404, detail="Artifact not found")
# Get all uploads for this artifact
uploads = (
db.query(Upload)
.filter(Upload.artifact_id == artifact_id)
.order_by(Upload.uploaded_at.asc())
.all()
)
# Get first upload info
first_upload = uploads[0] if uploads else None
# Get all tags referencing this artifact
tags = db.query(Tag).filter(Tag.artifact_id == artifact_id).all()
# Build package list with tags
package_map = {} # package_id -> {project_name, package_name, tag_names}
tag_list = []
for tag in tags:
package = db.query(Package).filter(Package.id == tag.package_id).first()
if package:
project = db.query(Project).filter(Project.id == package.project_id).first()
project_name = project.name if project else "unknown"
# Add to package map
pkg_key = str(package.id)
if pkg_key not in package_map:
package_map[pkg_key] = {
"project_name": project_name,
"package_name": package.name,
"tag_names": [],
}
package_map[pkg_key]["tag_names"].append(tag.name)
# Add to tag list
tag_list.append(
{
"project_name": project_name,
"package_name": package.name,
"tag_name": tag.name,
"created_at": tag.created_at.isoformat()
if tag.created_at
else None,
}
)
# Build upload history
upload_history = []
for upload in uploads:
package = db.query(Package).filter(Package.id == upload.package_id).first()
project = (
db.query(Project).filter(Project.id == package.project_id).first()
if package
else None
)
upload_history.append(
{
"upload_id": str(upload.id),
"project_name": project.name if project else "unknown",
"package_name": package.name if package else "unknown",
"original_name": upload.original_name,
"tag_name": upload.tag_name,
"uploaded_at": upload.uploaded_at.isoformat()
if upload.uploaded_at
else None,
"uploaded_by": upload.uploaded_by,
"deduplicated": upload.deduplicated or False,
}
)
return ArtifactProvenanceResponse(
artifact_id=artifact.id,
sha256=artifact.id,
size=artifact.size,
content_type=artifact.content_type,
original_name=artifact.original_name,
created_at=artifact.created_at,
created_by=artifact.created_by,
ref_count=artifact.ref_count,
first_uploaded_at=first_upload.uploaded_at
if first_upload
else artifact.created_at,
first_uploaded_by=first_upload.uploaded_by
if first_upload
else artifact.created_by,
upload_count=len(uploads),
packages=list(package_map.values()),
tags=tag_list,
uploads=upload_history,
)

View File

@@ -189,6 +189,93 @@ class TagHistoryResponse(BaseModel):
from_attributes = True
class TagHistoryDetailResponse(BaseModel):
"""Tag history with artifact metadata for each version"""
id: UUID
tag_id: UUID
tag_name: str
old_artifact_id: Optional[str]
new_artifact_id: str
changed_at: datetime
changed_by: str
# Artifact metadata for new artifact
artifact_size: int
artifact_original_name: Optional[str]
artifact_content_type: Optional[str]
class Config:
from_attributes = True
# Audit log schemas
class AuditLogResponse(BaseModel):
"""Audit log entry response"""
id: UUID
action: str
resource: str
user_id: str
details: Optional[Dict[str, Any]]
timestamp: datetime
source_ip: Optional[str]
class Config:
from_attributes = True
# Upload history schemas
class UploadHistoryResponse(BaseModel):
"""Upload event with artifact details"""
id: UUID
artifact_id: str
package_id: UUID
package_name: str
project_name: str
original_name: Optional[str]
tag_name: Optional[str]
uploaded_at: datetime
uploaded_by: str
source_ip: Optional[str]
deduplicated: bool
# Artifact metadata
artifact_size: int
artifact_content_type: Optional[str]
class Config:
from_attributes = True
# Artifact provenance schemas
class ArtifactProvenanceResponse(BaseModel):
"""Full provenance/history of an artifact"""
artifact_id: str
sha256: str
size: int
content_type: Optional[str]
original_name: Optional[str]
created_at: datetime
created_by: str
ref_count: int
# First upload info
first_uploaded_at: datetime
first_uploaded_by: str
# Usage statistics
upload_count: int
# References
packages: List[Dict[str, Any]] # List of {project_name, package_name, tag_names}
tags: List[
Dict[str, Any]
] # List of {project_name, package_name, tag_name, created_at}
# Upload history
uploads: List[Dict[str, Any]] # List of upload events
class Config:
from_attributes = True
class ArtifactTagInfo(BaseModel):
"""Tag info for embedding in artifact responses"""