2 Commits

Author SHA1 Message Date
Mondo Diaz
a293432d2e Complete metadata query EPIC: add upload endpoints, enhance response fields, standardize audit actions (#18, #19, #20, #22)
- Add GET /api/v1/uploads global endpoint with project/package/user/date filters
- Add GET /api/v1/project/{project}/uploads project-level uploads endpoint
- Add has_more field to PaginationMeta for pagination UI
- Add upload_id, content_type, original_name, created_at to UploadResponse
- Standardize audit action names: project.delete, package.delete, tag.delete, artifact.upload
- Add 13 new integration tests for upload query endpoints and response fields
- 130 tests passing
2026-01-06 14:23:52 -06:00
Mondo Diaz
3d0f502867 Add audit log and history API endpoints (#20)
- Add audit log endpoints: GET /api/v1/audit-logs with filters, project-scoped, package-scoped
- Add upload history endpoints: package uploads, artifact uploads
- Add artifact provenance endpoint: GET /api/v1/artifact/{id}/history
- Add audit logging for project.create, package.create, tag.create, tag.update
- Add AuditLogResponse, UploadHistoryResponse, ArtifactProvenanceResponse schemas
- Add 18 integration tests for new endpoints
2026-01-06 14:12:56 -06:00
4 changed files with 1325 additions and 6 deletions

View File

@@ -7,6 +7,25 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added ### Added
- Added global uploads query endpoint `GET /api/v1/uploads` with project/package/user/date filters (#18)
- Added project-level uploads endpoint `GET /api/v1/project/{project}/uploads` (#18)
- Added `has_more` field to pagination metadata for easier pagination UI (#18)
- Added `upload_id`, `content_type`, `original_name`, `created_at` fields to upload response (#19)
- Added audit log API endpoints with filtering and pagination (#20)
- `GET /api/v1/audit-logs` - list all audit logs with action/resource/user/date filters
- `GET /api/v1/projects/{project}/audit-logs` - project-scoped audit logs
- `GET /api/v1/project/{project}/{package}/audit-logs` - package-scoped audit logs
- Added upload history API endpoints (#20)
- `GET /api/v1/project/{project}/{package}/uploads` - list upload events for a package
- `GET /api/v1/artifact/{id}/uploads` - list all uploads of a specific artifact
- Added artifact provenance endpoint `GET /api/v1/artifact/{id}/history` (#20)
- Returns full artifact history including packages, tags, and upload events
- Added audit logging for project.create, package.create, tag.create, tag.update, artifact.upload actions (#20)
- Added `AuditLogResponse`, `UploadHistoryResponse`, `ArtifactProvenanceResponse` schemas (#20)
- Added `TagHistoryDetailResponse` schema with artifact metadata (#20)
- Added 31 integration tests for audit log, history, and upload query endpoints (#22)
### Changed
- Standardized audit action naming to `{entity}.{action}` pattern (project.delete, package.delete, tag.delete) (#20)
- Added `StorageBackend` protocol/interface for backend-agnostic storage (#33) - Added `StorageBackend` protocol/interface for backend-agnostic storage (#33)
- Added `health_check()` method to storage backend with `/health` endpoint integration (#33) - Added `health_check()` method to storage backend with `/health` endpoint integration (#33)
- Added `verify_integrity()` method for post-upload hash validation (#33) - Added `verify_integrity()` method for post-upload hash validation (#33)

View File

@@ -58,6 +58,10 @@ from .schemas import (
TagResponse, TagResponse,
TagDetailResponse, TagDetailResponse,
TagHistoryResponse, TagHistoryResponse,
TagHistoryDetailResponse,
AuditLogResponse,
UploadHistoryResponse,
ArtifactProvenanceResponse,
UploadResponse, UploadResponse,
ConsumerResponse, ConsumerResponse,
HealthResponse, HealthResponse,
@@ -478,6 +482,7 @@ def list_projects(
limit=limit, limit=limit,
total=total, total=total,
total_pages=total_pages, total_pages=total_pages,
has_more=page < total_pages,
), ),
) )
@@ -499,6 +504,17 @@ def create_project(
created_by=user_id, created_by=user_id,
) )
db.add(db_project) db.add(db_project)
# Audit log
_log_audit(
db=db,
action="project.create",
resource=f"project/{project.name}",
user_id=user_id,
source_ip=request.client.host if request.client else None,
details={"is_public": project.is_public},
)
db.commit() db.commit()
db.refresh(db_project) db.refresh(db_project)
return db_project return db_project
@@ -555,7 +571,7 @@ def delete_project(
# Audit log (after commit) # Audit log (after commit)
_log_audit( _log_audit(
db, db,
action="delete_project", action="project.delete",
resource=f"project/{project_name}", resource=f"project/{project_name}",
user_id=user_id, user_id=user_id,
source_ip=request.client.host if request.client else None, source_ip=request.client.host if request.client else None,
@@ -740,6 +756,7 @@ def list_packages(
limit=limit, limit=limit,
total=total, total=total,
total_pages=total_pages, total_pages=total_pages,
has_more=page < total_pages,
), ),
) )
@@ -835,7 +852,10 @@ def get_package(
@router.post("/api/v1/project/{project_name}/packages", response_model=PackageResponse) @router.post("/api/v1/project/{project_name}/packages", response_model=PackageResponse)
def create_package( def create_package(
project_name: str, package: PackageCreate, db: Session = Depends(get_db) project_name: str,
package: PackageCreate,
request: Request,
db: Session = Depends(get_db),
): ):
project = db.query(Project).filter(Project.name == project_name).first() project = db.query(Project).filter(Project.name == project_name).first()
if not project: if not project:
@@ -873,6 +893,17 @@ def create_package(
platform=package.platform, platform=package.platform,
) )
db.add(db_package) db.add(db_package)
# Audit log
_log_audit(
db=db,
action="package.create",
resource=f"project/{project_name}/{package.name}",
user_id=get_user_id(request),
source_ip=request.client.host if request.client else None,
details={"format": package.format, "platform": package.platform},
)
db.commit() db.commit()
db.refresh(db_package) db.refresh(db_package)
return db_package return db_package
@@ -927,7 +958,7 @@ def delete_package(
# Audit log (after commit) # Audit log (after commit)
_log_audit( _log_audit(
db, db,
action="delete_package", action="package.delete",
resource=f"project/{project_name}/{package_name}", resource=f"project/{project_name}/{package_name}",
user_id=user_id, user_id=user_id,
source_ip=request.client.host if request.client else None, source_ip=request.client.host if request.client else None,
@@ -1102,6 +1133,7 @@ def upload_artifact(
deduplicated=deduplicated, deduplicated=deduplicated,
) )
db.add(upload) db.add(upload)
db.flush() # Flush to get upload ID
# Create or update tag if provided (with ref_count management and history) # Create or update tag if provided (with ref_count management and history)
if tag: if tag:
@@ -1117,7 +1149,7 @@ def upload_artifact(
# Audit log # Audit log
_log_audit( _log_audit(
db, db,
action="upload", action="artifact.upload",
resource=f"project/{project_name}/{package_name}/artifact/{storage_result.sha256[:12]}", resource=f"project/{project_name}/{package_name}/artifact/{storage_result.sha256[:12]}",
user_id=user_id, user_id=user_id,
source_ip=request.client.host if request.client else None, source_ip=request.client.host if request.client else None,
@@ -1145,6 +1177,10 @@ def upload_artifact(
format_metadata=artifact.artifact_metadata, format_metadata=artifact.artifact_metadata,
deduplicated=deduplicated, deduplicated=deduplicated,
ref_count=artifact.ref_count, ref_count=artifact.ref_count,
upload_id=upload.id,
content_type=artifact.content_type,
original_name=artifact.original_name,
created_at=artifact.created_at,
) )
@@ -1231,7 +1267,7 @@ def init_resumable_upload(
# Audit log # Audit log
_log_audit( _log_audit(
db, db,
action="upload", action="artifact.upload",
resource=f"project/{project_name}/{package_name}/artifact/{init_request.expected_hash[:12]}", resource=f"project/{project_name}/{package_name}/artifact/{init_request.expected_hash[:12]}",
user_id=user_id, user_id=user_id,
source_ip=request.client.host if request.client else None, source_ip=request.client.host if request.client else None,
@@ -1812,6 +1848,7 @@ def list_tags(
limit=limit, limit=limit,
total=total, total=total,
total_pages=total_pages, total_pages=total_pages,
has_more=page < total_pages,
), ),
) )
@@ -1850,8 +1887,23 @@ def create_tag(
db.query(Tag).filter(Tag.package_id == package.id, Tag.name == tag.name).first() db.query(Tag).filter(Tag.package_id == package.id, Tag.name == tag.name).first()
) )
if existing: if existing:
old_artifact_id = existing.artifact_id
existing.artifact_id = tag.artifact_id existing.artifact_id = tag.artifact_id
existing.created_by = user_id existing.created_by = user_id
# Audit log for tag update
_log_audit(
db=db,
action="tag.update",
resource=f"project/{project_name}/{package_name}/tag/{tag.name}",
user_id=user_id,
source_ip=request.client.host if request.client else None,
details={
"old_artifact_id": old_artifact_id,
"new_artifact_id": tag.artifact_id,
},
)
db.commit() db.commit()
db.refresh(existing) db.refresh(existing)
return existing return existing
@@ -1863,6 +1915,17 @@ def create_tag(
created_by=user_id, created_by=user_id,
) )
db.add(db_tag) db.add(db_tag)
# Audit log for tag create
_log_audit(
db=db,
action="tag.create",
resource=f"project/{project_name}/{package_name}/tag/{tag.name}",
user_id=user_id,
source_ip=request.client.host if request.client else None,
details={"artifact_id": tag.artifact_id},
)
db.commit() db.commit()
db.refresh(db_tag) db.refresh(db_tag)
return db_tag return db_tag
@@ -2016,7 +2079,7 @@ def delete_tag(
artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first() artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first()
_log_audit( _log_audit(
db, db,
action="delete_tag", action="tag.delete",
resource=f"project/{project_name}/{package_name}/tag/{tag_name}", resource=f"project/{project_name}/{package_name}/tag/{tag_name}",
user_id=user_id, user_id=user_id,
source_ip=request.client.host if request.client else None, source_ip=request.client.host if request.client else None,
@@ -2153,6 +2216,7 @@ def list_package_artifacts(
limit=limit, limit=limit,
total=total, total=total,
total_pages=total_pages, total_pages=total_pages,
has_more=page < total_pages,
), ),
) )
@@ -3069,3 +3133,624 @@ Generated: {generated_at.strftime("%Y-%m-%d %H:%M:%S UTC")}
indent=2, indent=2,
), ),
) )
# =============================================================================
# Audit Log Endpoints
# =============================================================================
@router.get("/api/v1/audit-logs", response_model=PaginatedResponse[AuditLogResponse])
def list_audit_logs(
action: Optional[str] = Query(None, description="Filter by action type"),
resource: Optional[str] = Query(None, description="Filter by resource pattern"),
user_id: Optional[str] = Query(None, description="Filter by user"),
from_date: Optional[datetime] = Query(None, alias="from", description="Start date"),
to_date: Optional[datetime] = Query(None, alias="to", description="End date"),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""
List audit logs with filtering and pagination.
Filters:
- action: Filter by action type (e.g., 'project.create', 'artifact.upload')
- resource: Filter by resource pattern (partial match)
- user_id: Filter by user ID
- from/to: Filter by timestamp range
"""
query = db.query(AuditLog)
if action:
query = query.filter(AuditLog.action == action)
if resource:
query = query.filter(AuditLog.resource.ilike(f"%{resource}%"))
if user_id:
query = query.filter(AuditLog.user_id == user_id)
if from_date:
query = query.filter(AuditLog.timestamp >= from_date)
if to_date:
query = query.filter(AuditLog.timestamp <= to_date)
total = query.count()
total_pages = math.ceil(total / limit) if total > 0 else 1
logs = (
query.order_by(AuditLog.timestamp.desc())
.offset((page - 1) * limit)
.limit(limit)
.all()
)
return PaginatedResponse(
items=logs,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
has_more=page < total_pages,
),
)
@router.get(
"/api/v1/projects/{project_name}/audit-logs",
response_model=PaginatedResponse[AuditLogResponse],
)
def list_project_audit_logs(
project_name: str,
action: Optional[str] = Query(None, description="Filter by action type"),
from_date: Optional[datetime] = Query(None, alias="from", description="Start date"),
to_date: Optional[datetime] = Query(None, alias="to", description="End date"),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""List audit logs for a specific project."""
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Match resources that start with project name
resource_pattern = f"{project_name}%"
query = db.query(AuditLog).filter(AuditLog.resource.like(resource_pattern))
if action:
query = query.filter(AuditLog.action == action)
if from_date:
query = query.filter(AuditLog.timestamp >= from_date)
if to_date:
query = query.filter(AuditLog.timestamp <= to_date)
total = query.count()
total_pages = math.ceil(total / limit) if total > 0 else 1
logs = (
query.order_by(AuditLog.timestamp.desc())
.offset((page - 1) * limit)
.limit(limit)
.all()
)
return PaginatedResponse(
items=logs,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
has_more=page < total_pages,
),
)
@router.get(
"/api/v1/project/{project_name}/{package_name}/audit-logs",
response_model=PaginatedResponse[AuditLogResponse],
)
def list_package_audit_logs(
project_name: str,
package_name: str,
action: Optional[str] = Query(None, description="Filter by action type"),
from_date: Optional[datetime] = Query(None, alias="from", description="Start date"),
to_date: Optional[datetime] = Query(None, alias="to", description="End date"),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""List audit logs for a specific package."""
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
package = (
db.query(Package)
.filter(Package.project_id == project.id, Package.name == package_name)
.first()
)
if not package:
raise HTTPException(status_code=404, detail="Package not found")
# Match resources that contain project/package
resource_pattern = f"{project_name}/{package_name}%"
query = db.query(AuditLog).filter(AuditLog.resource.like(resource_pattern))
if action:
query = query.filter(AuditLog.action == action)
if from_date:
query = query.filter(AuditLog.timestamp >= from_date)
if to_date:
query = query.filter(AuditLog.timestamp <= to_date)
total = query.count()
total_pages = math.ceil(total / limit) if total > 0 else 1
logs = (
query.order_by(AuditLog.timestamp.desc())
.offset((page - 1) * limit)
.limit(limit)
.all()
)
return PaginatedResponse(
items=logs,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
has_more=page < total_pages,
),
)
# =============================================================================
# Upload History Endpoints
# =============================================================================
@router.get(
"/api/v1/uploads",
response_model=PaginatedResponse[UploadHistoryResponse],
)
def list_all_uploads(
request: Request,
project: Optional[str] = Query(None, description="Filter by project name"),
package: Optional[str] = Query(None, description="Filter by package name"),
uploaded_by: Optional[str] = Query(None, description="Filter by uploader"),
from_date: Optional[datetime] = Query(None, alias="from", description="Start date"),
to_date: Optional[datetime] = Query(None, alias="to", description="End date"),
deduplicated: Optional[bool] = Query(
None, description="Filter by deduplication status"
),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""
List all upload events globally (admin endpoint).
Supports filtering by:
- project: Filter by project name
- package: Filter by package name (requires project)
- uploaded_by: Filter by user ID
- from/to: Filter by timestamp range
- deduplicated: Filter by deduplication status
"""
query = (
db.query(Upload, Package, Project, Artifact)
.join(Package, Upload.package_id == Package.id)
.join(Project, Package.project_id == Project.id)
.join(Artifact, Upload.artifact_id == Artifact.id)
)
# Apply filters
if project:
query = query.filter(Project.name == project)
if package:
query = query.filter(Package.name == package)
if uploaded_by:
query = query.filter(Upload.uploaded_by == uploaded_by)
if from_date:
query = query.filter(Upload.uploaded_at >= from_date)
if to_date:
query = query.filter(Upload.uploaded_at <= to_date)
if deduplicated is not None:
query = query.filter(Upload.deduplicated == deduplicated)
total = query.count()
total_pages = math.ceil(total / limit) if total > 0 else 1
results = (
query.order_by(Upload.uploaded_at.desc())
.offset((page - 1) * limit)
.limit(limit)
.all()
)
items = [
UploadHistoryResponse(
id=upload.id,
artifact_id=upload.artifact_id,
package_id=upload.package_id,
package_name=pkg.name,
project_name=proj.name,
original_name=upload.original_name,
tag_name=upload.tag_name,
uploaded_at=upload.uploaded_at,
uploaded_by=upload.uploaded_by,
source_ip=upload.source_ip,
deduplicated=upload.deduplicated or False,
artifact_size=artifact.size,
artifact_content_type=artifact.content_type,
)
for upload, pkg, proj, artifact in results
]
return PaginatedResponse(
items=items,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
has_more=page < total_pages,
),
)
@router.get(
"/api/v1/project/{project_name}/uploads",
response_model=PaginatedResponse[UploadHistoryResponse],
)
def list_project_uploads(
project_name: str,
package: Optional[str] = Query(None, description="Filter by package name"),
uploaded_by: Optional[str] = Query(None, description="Filter by uploader"),
from_date: Optional[datetime] = Query(None, alias="from", description="Start date"),
to_date: Optional[datetime] = Query(None, alias="to", description="End date"),
deduplicated: Optional[bool] = Query(
None, description="Filter by deduplication status"
),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""
List upload events for a specific project.
Supports filtering by:
- package: Filter by package name within the project
- uploaded_by: Filter by user ID
- from/to: Filter by timestamp range
- deduplicated: Filter by deduplication status
"""
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Get all package IDs for this project
package_ids_query = db.query(Package.id).filter(Package.project_id == project.id)
if package:
package_ids_query = package_ids_query.filter(Package.name == package)
package_ids = package_ids_query.subquery()
query = (
db.query(Upload, Package, Artifact)
.join(Package, Upload.package_id == Package.id)
.join(Artifact, Upload.artifact_id == Artifact.id)
.filter(Upload.package_id.in_(package_ids))
)
if uploaded_by:
query = query.filter(Upload.uploaded_by == uploaded_by)
if from_date:
query = query.filter(Upload.uploaded_at >= from_date)
if to_date:
query = query.filter(Upload.uploaded_at <= to_date)
if deduplicated is not None:
query = query.filter(Upload.deduplicated == deduplicated)
total = query.count()
total_pages = math.ceil(total / limit) if total > 0 else 1
results = (
query.order_by(Upload.uploaded_at.desc())
.offset((page - 1) * limit)
.limit(limit)
.all()
)
items = [
UploadHistoryResponse(
id=upload.id,
artifact_id=upload.artifact_id,
package_id=upload.package_id,
package_name=pkg.name,
project_name=project_name,
original_name=upload.original_name,
tag_name=upload.tag_name,
uploaded_at=upload.uploaded_at,
uploaded_by=upload.uploaded_by,
source_ip=upload.source_ip,
deduplicated=upload.deduplicated or False,
artifact_size=artifact.size,
artifact_content_type=artifact.content_type,
)
for upload, pkg, artifact in results
]
return PaginatedResponse(
items=items,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
has_more=page < total_pages,
),
)
@router.get(
"/api/v1/project/{project_name}/{package_name}/uploads",
response_model=PaginatedResponse[UploadHistoryResponse],
)
def list_package_uploads(
project_name: str,
package_name: str,
from_date: Optional[datetime] = Query(None, alias="from", description="Start date"),
to_date: Optional[datetime] = Query(None, alias="to", description="End date"),
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""List upload events for a specific package."""
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
package = (
db.query(Package)
.filter(Package.project_id == project.id, Package.name == package_name)
.first()
)
if not package:
raise HTTPException(status_code=404, detail="Package not found")
query = db.query(Upload).filter(Upload.package_id == package.id)
if from_date:
query = query.filter(Upload.uploaded_at >= from_date)
if to_date:
query = query.filter(Upload.uploaded_at <= to_date)
total = query.count()
total_pages = math.ceil(total / limit) if total > 0 else 1
uploads = (
query.order_by(Upload.uploaded_at.desc())
.offset((page - 1) * limit)
.limit(limit)
.all()
)
# Build response with artifact metadata
items = []
for upload in uploads:
artifact = db.query(Artifact).filter(Artifact.id == upload.artifact_id).first()
items.append(
UploadHistoryResponse(
id=upload.id,
artifact_id=upload.artifact_id,
package_id=upload.package_id,
package_name=package_name,
project_name=project_name,
original_name=upload.original_name,
tag_name=upload.tag_name,
uploaded_at=upload.uploaded_at,
uploaded_by=upload.uploaded_by,
source_ip=upload.source_ip,
deduplicated=upload.deduplicated or False,
artifact_size=artifact.size if artifact else 0,
artifact_content_type=artifact.content_type if artifact else None,
)
)
return PaginatedResponse(
items=items,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
has_more=page < total_pages,
),
)
@router.get(
"/api/v1/artifact/{artifact_id}/uploads",
response_model=PaginatedResponse[UploadHistoryResponse],
)
def list_artifact_uploads(
artifact_id: str,
page: int = Query(1, ge=1),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
):
"""List all upload events for a specific artifact."""
artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first()
if not artifact:
raise HTTPException(status_code=404, detail="Artifact not found")
query = db.query(Upload).filter(Upload.artifact_id == artifact_id)
total = query.count()
total_pages = math.ceil(total / limit) if total > 0 else 1
uploads = (
query.order_by(Upload.uploaded_at.desc())
.offset((page - 1) * limit)
.limit(limit)
.all()
)
# Build response with package/project metadata
items = []
for upload in uploads:
package = db.query(Package).filter(Package.id == upload.package_id).first()
project = (
db.query(Project).filter(Project.id == package.project_id).first()
if package
else None
)
items.append(
UploadHistoryResponse(
id=upload.id,
artifact_id=upload.artifact_id,
package_id=upload.package_id,
package_name=package.name if package else "unknown",
project_name=project.name if project else "unknown",
original_name=upload.original_name,
tag_name=upload.tag_name,
uploaded_at=upload.uploaded_at,
uploaded_by=upload.uploaded_by,
source_ip=upload.source_ip,
deduplicated=upload.deduplicated or False,
artifact_size=artifact.size,
artifact_content_type=artifact.content_type,
)
)
return PaginatedResponse(
items=items,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
has_more=page < total_pages,
),
)
# =============================================================================
# Artifact Provenance/History Endpoint
# =============================================================================
@router.get(
"/api/v1/artifact/{artifact_id}/history", response_model=ArtifactProvenanceResponse
)
def get_artifact_provenance(
artifact_id: str,
db: Session = Depends(get_db),
):
"""
Get full provenance/history of an artifact.
Returns:
- Artifact metadata
- First upload information
- All packages/tags referencing the artifact
- Complete upload history
"""
artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first()
if not artifact:
raise HTTPException(status_code=404, detail="Artifact not found")
# Get all uploads for this artifact
uploads = (
db.query(Upload)
.filter(Upload.artifact_id == artifact_id)
.order_by(Upload.uploaded_at.asc())
.all()
)
# Get first upload info
first_upload = uploads[0] if uploads else None
# Get all tags referencing this artifact
tags = db.query(Tag).filter(Tag.artifact_id == artifact_id).all()
# Build package list with tags
package_map = {} # package_id -> {project_name, package_name, tag_names}
tag_list = []
for tag in tags:
package = db.query(Package).filter(Package.id == tag.package_id).first()
if package:
project = db.query(Project).filter(Project.id == package.project_id).first()
project_name = project.name if project else "unknown"
# Add to package map
pkg_key = str(package.id)
if pkg_key not in package_map:
package_map[pkg_key] = {
"project_name": project_name,
"package_name": package.name,
"tag_names": [],
}
package_map[pkg_key]["tag_names"].append(tag.name)
# Add to tag list
tag_list.append(
{
"project_name": project_name,
"package_name": package.name,
"tag_name": tag.name,
"created_at": tag.created_at.isoformat()
if tag.created_at
else None,
}
)
# Build upload history
upload_history = []
for upload in uploads:
package = db.query(Package).filter(Package.id == upload.package_id).first()
project = (
db.query(Project).filter(Project.id == package.project_id).first()
if package
else None
)
upload_history.append(
{
"upload_id": str(upload.id),
"project_name": project.name if project else "unknown",
"package_name": package.name if package else "unknown",
"original_name": upload.original_name,
"tag_name": upload.tag_name,
"uploaded_at": upload.uploaded_at.isoformat()
if upload.uploaded_at
else None,
"uploaded_by": upload.uploaded_by,
"deduplicated": upload.deduplicated or False,
}
)
return ArtifactProvenanceResponse(
artifact_id=artifact.id,
sha256=artifact.id,
size=artifact.size,
content_type=artifact.content_type,
original_name=artifact.original_name,
created_at=artifact.created_at,
created_by=artifact.created_by,
ref_count=artifact.ref_count,
first_uploaded_at=first_upload.uploaded_at
if first_upload
else artifact.created_at,
first_uploaded_by=first_upload.uploaded_by
if first_upload
else artifact.created_by,
upload_count=len(uploads),
packages=list(package_map.values()),
tags=tag_list,
uploads=upload_history,
)

View File

@@ -12,6 +12,7 @@ class PaginationMeta(BaseModel):
limit: int limit: int
total: int total: int
total_pages: int total_pages: int
has_more: bool = False # True if there are more pages after current page
class PaginatedResponse(BaseModel, Generic[T]): class PaginatedResponse(BaseModel, Generic[T]):
@@ -189,6 +190,93 @@ class TagHistoryResponse(BaseModel):
from_attributes = True from_attributes = True
class TagHistoryDetailResponse(BaseModel):
"""Tag history with artifact metadata for each version"""
id: UUID
tag_id: UUID
tag_name: str
old_artifact_id: Optional[str]
new_artifact_id: str
changed_at: datetime
changed_by: str
# Artifact metadata for new artifact
artifact_size: int
artifact_original_name: Optional[str]
artifact_content_type: Optional[str]
class Config:
from_attributes = True
# Audit log schemas
class AuditLogResponse(BaseModel):
"""Audit log entry response"""
id: UUID
action: str
resource: str
user_id: str
details: Optional[Dict[str, Any]]
timestamp: datetime
source_ip: Optional[str]
class Config:
from_attributes = True
# Upload history schemas
class UploadHistoryResponse(BaseModel):
"""Upload event with artifact details"""
id: UUID
artifact_id: str
package_id: UUID
package_name: str
project_name: str
original_name: Optional[str]
tag_name: Optional[str]
uploaded_at: datetime
uploaded_by: str
source_ip: Optional[str]
deduplicated: bool
# Artifact metadata
artifact_size: int
artifact_content_type: Optional[str]
class Config:
from_attributes = True
# Artifact provenance schemas
class ArtifactProvenanceResponse(BaseModel):
"""Full provenance/history of an artifact"""
artifact_id: str
sha256: str
size: int
content_type: Optional[str]
original_name: Optional[str]
created_at: datetime
created_by: str
ref_count: int
# First upload info
first_uploaded_at: datetime
first_uploaded_by: str
# Usage statistics
upload_count: int
# References
packages: List[Dict[str, Any]] # List of {project_name, package_name, tag_names}
tags: List[
Dict[str, Any]
] # List of {project_name, package_name, tag_name, created_at}
# Upload history
uploads: List[Dict[str, Any]] # List of upload events
class Config:
from_attributes = True
class ArtifactTagInfo(BaseModel): class ArtifactTagInfo(BaseModel):
"""Tag info for embedding in artifact responses""" """Tag info for embedding in artifact responses"""
@@ -254,6 +342,11 @@ class UploadResponse(BaseModel):
format_metadata: Optional[Dict[str, Any]] = None format_metadata: Optional[Dict[str, Any]] = None
deduplicated: bool = False deduplicated: bool = False
ref_count: int = 1 # Current reference count after this upload ref_count: int = 1 # Current reference count after this upload
# Enhanced metadata (Issue #19)
upload_id: Optional[UUID] = None # UUID of the upload record
content_type: Optional[str] = None
original_name: Optional[str] = None
created_at: Optional[datetime] = None
# Resumable upload schemas # Resumable upload schemas

View File

@@ -0,0 +1,522 @@
"""Integration tests for audit logs and history endpoints."""
import pytest
from datetime import datetime, timedelta
from tests.conftest import upload_test_file
class TestAuditLogsEndpoint:
"""Tests for /api/v1/audit-logs endpoint."""
@pytest.mark.integration
def test_list_audit_logs_returns_valid_response(self, integration_client):
"""Test that audit logs endpoint returns valid paginated response."""
response = integration_client.get("/api/v1/audit-logs")
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "pagination" in data
assert isinstance(data["items"], list)
pagination = data["pagination"]
assert "page" in pagination
assert "limit" in pagination
assert "total" in pagination
assert "total_pages" in pagination
@pytest.mark.integration
def test_audit_logs_respects_pagination(self, integration_client):
"""Test that audit logs endpoint respects limit parameter."""
response = integration_client.get("/api/v1/audit-logs?limit=5")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) <= 5
assert data["pagination"]["limit"] == 5
@pytest.mark.integration
def test_audit_logs_filter_by_action(self, integration_client, test_package):
"""Test filtering audit logs by action type."""
# Create an action that will be logged
project_name, package_name = test_package
response = integration_client.get("/api/v1/audit-logs?action=project.create")
assert response.status_code == 200
data = response.json()
# All items should have the filtered action
for item in data["items"]:
assert item["action"] == "project.create"
@pytest.mark.integration
def test_audit_log_entry_has_required_fields(
self, integration_client, test_project
):
"""Test that audit log entries have all required fields."""
# Force some audit logs by operations on test_project
response = integration_client.get("/api/v1/audit-logs?limit=10")
assert response.status_code == 200
data = response.json()
if data["items"]:
item = data["items"][0]
assert "id" in item
assert "action" in item
assert "resource" in item
assert "user_id" in item
assert "timestamp" in item
class TestProjectAuditLogs:
"""Tests for /api/v1/projects/{project}/audit-logs endpoint."""
@pytest.mark.integration
def test_project_audit_logs_returns_200(self, integration_client, test_project):
"""Test that project audit logs endpoint returns 200."""
response = integration_client.get(f"/api/v1/projects/{test_project}/audit-logs")
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "pagination" in data
@pytest.mark.integration
def test_project_audit_logs_not_found(self, integration_client):
"""Test that non-existent project returns 404."""
response = integration_client.get(
"/api/v1/projects/nonexistent-project/audit-logs"
)
assert response.status_code == 404
class TestPackageAuditLogs:
"""Tests for /api/v1/project/{project}/{package}/audit-logs endpoint."""
@pytest.mark.integration
def test_package_audit_logs_returns_200(self, integration_client, test_package):
"""Test that package audit logs endpoint returns 200."""
project_name, package_name = test_package
response = integration_client.get(
f"/api/v1/project/{project_name}/{package_name}/audit-logs"
)
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "pagination" in data
@pytest.mark.integration
def test_package_audit_logs_project_not_found(self, integration_client):
"""Test that non-existent project returns 404."""
response = integration_client.get(
"/api/v1/project/nonexistent/nonexistent/audit-logs"
)
assert response.status_code == 404
@pytest.mark.integration
def test_package_audit_logs_package_not_found(
self, integration_client, test_project
):
"""Test that non-existent package returns 404."""
response = integration_client.get(
f"/api/v1/project/{test_project}/nonexistent-package/audit-logs"
)
assert response.status_code == 404
class TestPackageUploads:
"""Tests for /api/v1/project/{project}/{package}/uploads endpoint."""
@pytest.mark.integration
def test_package_uploads_returns_200(self, integration_client, test_package):
"""Test that package uploads endpoint returns 200."""
project_name, package_name = test_package
response = integration_client.get(
f"/api/v1/project/{project_name}/{package_name}/uploads"
)
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "pagination" in data
@pytest.mark.integration
def test_package_uploads_after_upload(self, integration_client, test_package):
"""Test that uploads are recorded after file upload."""
project_name, package_name = test_package
# Upload a file
upload_result = upload_test_file(
integration_client,
project_name,
package_name,
b"test upload content",
"test.txt",
)
assert upload_result["artifact_id"]
# Check uploads endpoint
response = integration_client.get(
f"/api/v1/project/{project_name}/{package_name}/uploads"
)
assert response.status_code == 200
data = response.json()
assert len(data["items"]) >= 1
# Verify upload record fields
upload = data["items"][0]
assert "artifact_id" in upload
assert "package_name" in upload
assert "project_name" in upload
assert "uploaded_at" in upload
assert "uploaded_by" in upload
@pytest.mark.integration
def test_package_uploads_project_not_found(self, integration_client):
"""Test that non-existent project returns 404."""
response = integration_client.get(
"/api/v1/project/nonexistent/nonexistent/uploads"
)
assert response.status_code == 404
class TestArtifactUploads:
"""Tests for /api/v1/artifact/{id}/uploads endpoint."""
@pytest.mark.integration
def test_artifact_uploads_returns_200(self, integration_client, test_package):
"""Test that artifact uploads endpoint returns 200."""
project_name, package_name = test_package
# Upload a file
upload_result = upload_test_file(
integration_client,
project_name,
package_name,
b"artifact upload test",
"artifact.txt",
)
artifact_id = upload_result["artifact_id"]
response = integration_client.get(f"/api/v1/artifact/{artifact_id}/uploads")
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "pagination" in data
assert len(data["items"]) >= 1
@pytest.mark.integration
def test_artifact_uploads_not_found(self, integration_client):
"""Test that non-existent artifact returns 404."""
fake_hash = "a" * 64
response = integration_client.get(f"/api/v1/artifact/{fake_hash}/uploads")
assert response.status_code == 404
class TestArtifactProvenance:
"""Tests for /api/v1/artifact/{id}/history endpoint."""
@pytest.mark.integration
def test_artifact_history_returns_200(self, integration_client, test_package):
"""Test that artifact history endpoint returns 200."""
project_name, package_name = test_package
# Upload a file
upload_result = upload_test_file(
integration_client,
project_name,
package_name,
b"provenance test content",
"prov.txt",
)
artifact_id = upload_result["artifact_id"]
response = integration_client.get(f"/api/v1/artifact/{artifact_id}/history")
assert response.status_code == 200
@pytest.mark.integration
def test_artifact_history_has_required_fields(
self, integration_client, test_package
):
"""Test that artifact history has all required fields."""
project_name, package_name = test_package
# Upload a file
upload_result = upload_test_file(
integration_client,
project_name,
package_name,
b"provenance fields test",
"fields.txt",
)
artifact_id = upload_result["artifact_id"]
response = integration_client.get(f"/api/v1/artifact/{artifact_id}/history")
assert response.status_code == 200
data = response.json()
assert "artifact_id" in data
assert "sha256" in data
assert "size" in data
assert "created_at" in data
assert "created_by" in data
assert "ref_count" in data
assert "first_uploaded_at" in data
assert "first_uploaded_by" in data
assert "upload_count" in data
assert "packages" in data
assert "tags" in data
assert "uploads" in data
@pytest.mark.integration
def test_artifact_history_not_found(self, integration_client):
"""Test that non-existent artifact returns 404."""
fake_hash = "b" * 64
response = integration_client.get(f"/api/v1/artifact/{fake_hash}/history")
assert response.status_code == 404
@pytest.mark.integration
def test_artifact_history_with_tag(self, integration_client, test_package):
"""Test artifact history includes tag information when tagged."""
project_name, package_name = test_package
# Upload a file with a tag
upload_result = upload_test_file(
integration_client,
project_name,
package_name,
b"tagged provenance test",
"tagged.txt",
tag="v1.0.0",
)
artifact_id = upload_result["artifact_id"]
response = integration_client.get(f"/api/v1/artifact/{artifact_id}/history")
assert response.status_code == 200
data = response.json()
# Should have at least one tag
assert len(data["tags"]) >= 1
# Tag should have required fields
tag = data["tags"][0]
assert "project_name" in tag
assert "package_name" in tag
assert "tag_name" in tag
class TestGlobalUploadsEndpoint:
"""Tests for /api/v1/uploads endpoint (global admin)."""
@pytest.mark.integration
def test_global_uploads_returns_200(self, integration_client):
"""Test that global uploads endpoint returns 200."""
response = integration_client.get("/api/v1/uploads")
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "pagination" in data
@pytest.mark.integration
def test_global_uploads_pagination(self, integration_client):
"""Test that global uploads endpoint respects pagination."""
response = integration_client.get("/api/v1/uploads?limit=5&page=1")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) <= 5
assert data["pagination"]["limit"] == 5
assert data["pagination"]["page"] == 1
@pytest.mark.integration
def test_global_uploads_filter_by_project(self, integration_client, test_package):
"""Test filtering global uploads by project name."""
project_name, package_name = test_package
# Upload a file
upload_test_file(
integration_client,
project_name,
package_name,
b"global filter test",
"global.txt",
)
response = integration_client.get(f"/api/v1/uploads?project={project_name}")
assert response.status_code == 200
data = response.json()
for item in data["items"]:
assert item["project_name"] == project_name
@pytest.mark.integration
def test_global_uploads_filter_by_uploader(self, integration_client, test_package):
"""Test filtering global uploads by uploaded_by."""
project_name, package_name = test_package
# Upload a file
upload_test_file(
integration_client,
project_name,
package_name,
b"uploader filter test",
"uploader.txt",
)
# Filter by anonymous (default user)
response = integration_client.get("/api/v1/uploads?uploaded_by=anonymous")
assert response.status_code == 200
data = response.json()
for item in data["items"]:
assert item["uploaded_by"] == "anonymous"
@pytest.mark.integration
def test_global_uploads_has_more_field(self, integration_client):
"""Test that pagination includes has_more field."""
response = integration_client.get("/api/v1/uploads?limit=1")
assert response.status_code == 200
data = response.json()
assert "has_more" in data["pagination"]
assert isinstance(data["pagination"]["has_more"], bool)
class TestProjectUploadsEndpoint:
"""Tests for /api/v1/project/{project}/uploads endpoint."""
@pytest.mark.integration
def test_project_uploads_returns_200(self, integration_client, test_project):
"""Test that project uploads endpoint returns 200."""
response = integration_client.get(f"/api/v1/project/{test_project}/uploads")
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "pagination" in data
@pytest.mark.integration
def test_project_uploads_after_upload(self, integration_client, test_package):
"""Test that uploads are recorded in project uploads."""
project_name, package_name = test_package
# Upload a file
upload_test_file(
integration_client,
project_name,
package_name,
b"project uploads test",
"project.txt",
)
response = integration_client.get(f"/api/v1/project/{project_name}/uploads")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) >= 1
# Verify project name matches
for item in data["items"]:
assert item["project_name"] == project_name
@pytest.mark.integration
def test_project_uploads_filter_by_package(self, integration_client, test_package):
"""Test filtering project uploads by package name."""
project_name, package_name = test_package
# Upload a file
upload_test_file(
integration_client,
project_name,
package_name,
b"package filter test",
"pkgfilter.txt",
)
response = integration_client.get(
f"/api/v1/project/{project_name}/uploads?package={package_name}"
)
assert response.status_code == 200
data = response.json()
for item in data["items"]:
assert item["package_name"] == package_name
@pytest.mark.integration
def test_project_uploads_not_found(self, integration_client):
"""Test that non-existent project returns 404."""
response = integration_client.get("/api/v1/project/nonexistent/uploads")
assert response.status_code == 404
class TestUploadResponseFields:
"""Tests for enhanced UploadResponse fields (Issue #19)."""
@pytest.mark.integration
def test_upload_response_has_upload_id(self, integration_client, test_package):
"""Test that upload response includes upload_id."""
project_name, package_name = test_package
upload_result = upload_test_file(
integration_client,
project_name,
package_name,
b"upload id test",
"uploadid.txt",
)
# upload_id should be present
assert "upload_id" in upload_result
assert upload_result["upload_id"] is not None
@pytest.mark.integration
def test_upload_response_has_content_type(self, integration_client, test_package):
"""Test that upload response includes content_type."""
project_name, package_name = test_package
upload_result = upload_test_file(
integration_client,
project_name,
package_name,
b"content type test",
"content.txt",
)
assert "content_type" in upload_result
@pytest.mark.integration
def test_upload_response_has_original_name(self, integration_client, test_package):
"""Test that upload response includes original_name."""
project_name, package_name = test_package
upload_result = upload_test_file(
integration_client,
project_name,
package_name,
b"original name test",
"originalname.txt",
)
assert "original_name" in upload_result
assert upload_result["original_name"] == "originalname.txt"
@pytest.mark.integration
def test_upload_response_has_created_at(self, integration_client, test_package):
"""Test that upload response includes created_at."""
project_name, package_name = test_package
upload_result = upload_test_file(
integration_client,
project_name,
package_name,
b"created at test",
"createdat.txt",
)
assert "created_at" in upload_result
assert upload_result["created_at"] is not None