From 3d0f5028676e8ac9ee041ea802c99d1f60312eac Mon Sep 17 00:00:00 2001 From: Mondo Diaz Date: Tue, 6 Jan 2026 14:12:56 -0600 Subject: [PATCH] Add audit log and history API endpoints (#20) - Add audit log endpoints: GET /api/v1/audit-logs with filters, project-scoped, package-scoped - Add upload history endpoints: package uploads, artifact uploads - Add artifact provenance endpoint: GET /api/v1/artifact/{id}/history - Add audit logging for project.create, package.create, tag.create, tag.update - Add AuditLogResponse, UploadHistoryResponse, ArtifactProvenanceResponse schemas - Add 18 integration tests for new endpoints --- CHANGELOG.md | 13 + backend/app/routes.py | 488 +++++++++++++++++++++++++++- backend/app/schemas.py | 87 +++++ backend/tests/test_audit_history.py | 309 ++++++++++++++++++ 4 files changed, 896 insertions(+), 1 deletion(-) create mode 100644 backend/tests/test_audit_history.py diff --git a/CHANGELOG.md b/CHANGELOG.md index db52574..ea51122 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Added audit log API endpoints with filtering and pagination (#20) + - `GET /api/v1/audit-logs` - list all audit logs with action/resource/user/date filters + - `GET /api/v1/projects/{project}/audit-logs` - project-scoped audit logs + - `GET /api/v1/project/{project}/{package}/audit-logs` - package-scoped audit logs +- Added upload history API endpoints (#20) + - `GET /api/v1/project/{project}/{package}/uploads` - list upload events for a package + - `GET /api/v1/artifact/{id}/uploads` - list all uploads of a specific artifact +- Added artifact provenance endpoint `GET /api/v1/artifact/{id}/history` (#20) + - Returns full artifact history including packages, tags, and upload events +- Added audit logging for project.create, package.create, tag.create, tag.update actions (#20) +- Added `AuditLogResponse`, `UploadHistoryResponse`, `ArtifactProvenanceResponse` schemas (#20) +- Added `TagHistoryDetailResponse` schema with artifact metadata (#20) +- Added 18 integration tests for audit log and history endpoints (#20) - Added `StorageBackend` protocol/interface for backend-agnostic storage (#33) - Added `health_check()` method to storage backend with `/health` endpoint integration (#33) - Added `verify_integrity()` method for post-upload hash validation (#33) diff --git a/backend/app/routes.py b/backend/app/routes.py index c06c394..927cbdf 100644 --- a/backend/app/routes.py +++ b/backend/app/routes.py @@ -58,6 +58,10 @@ from .schemas import ( TagResponse, TagDetailResponse, TagHistoryResponse, + TagHistoryDetailResponse, + AuditLogResponse, + UploadHistoryResponse, + ArtifactProvenanceResponse, UploadResponse, ConsumerResponse, HealthResponse, @@ -499,6 +503,17 @@ def create_project( created_by=user_id, ) db.add(db_project) + + # Audit log + _log_audit( + db=db, + action="project.create", + resource=f"project/{project.name}", + user_id=user_id, + source_ip=request.client.host if request.client else None, + details={"is_public": project.is_public}, + ) + db.commit() db.refresh(db_project) return db_project @@ -835,7 +850,10 @@ def get_package( @router.post("/api/v1/project/{project_name}/packages", response_model=PackageResponse) def create_package( - project_name: str, package: PackageCreate, db: Session = Depends(get_db) + project_name: str, + package: PackageCreate, + request: Request, + db: Session = Depends(get_db), ): project = db.query(Project).filter(Project.name == project_name).first() if not project: @@ -873,6 +891,17 @@ def create_package( platform=package.platform, ) db.add(db_package) + + # Audit log + _log_audit( + db=db, + action="package.create", + resource=f"project/{project_name}/{package.name}", + user_id=get_user_id(request), + source_ip=request.client.host if request.client else None, + details={"format": package.format, "platform": package.platform}, + ) + db.commit() db.refresh(db_package) return db_package @@ -1850,8 +1879,23 @@ def create_tag( db.query(Tag).filter(Tag.package_id == package.id, Tag.name == tag.name).first() ) if existing: + old_artifact_id = existing.artifact_id existing.artifact_id = tag.artifact_id existing.created_by = user_id + + # Audit log for tag update + _log_audit( + db=db, + action="tag.update", + resource=f"project/{project_name}/{package_name}/tag/{tag.name}", + user_id=user_id, + source_ip=request.client.host if request.client else None, + details={ + "old_artifact_id": old_artifact_id, + "new_artifact_id": tag.artifact_id, + }, + ) + db.commit() db.refresh(existing) return existing @@ -1863,6 +1907,17 @@ def create_tag( created_by=user_id, ) db.add(db_tag) + + # Audit log for tag create + _log_audit( + db=db, + action="tag.create", + resource=f"project/{project_name}/{package_name}/tag/{tag.name}", + user_id=user_id, + source_ip=request.client.host if request.client else None, + details={"artifact_id": tag.artifact_id}, + ) + db.commit() db.refresh(db_tag) return db_tag @@ -3069,3 +3124,434 @@ Generated: {generated_at.strftime("%Y-%m-%d %H:%M:%S UTC")} indent=2, ), ) + + +# ============================================================================= +# Audit Log Endpoints +# ============================================================================= + + +@router.get("/api/v1/audit-logs", response_model=PaginatedResponse[AuditLogResponse]) +def list_audit_logs( + action: Optional[str] = Query(None, description="Filter by action type"), + resource: Optional[str] = Query(None, description="Filter by resource pattern"), + user_id: Optional[str] = Query(None, description="Filter by user"), + from_date: Optional[datetime] = Query(None, alias="from", description="Start date"), + to_date: Optional[datetime] = Query(None, alias="to", description="End date"), + page: int = Query(1, ge=1), + limit: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db), +): + """ + List audit logs with filtering and pagination. + + Filters: + - action: Filter by action type (e.g., 'project.create', 'artifact.upload') + - resource: Filter by resource pattern (partial match) + - user_id: Filter by user ID + - from/to: Filter by timestamp range + """ + query = db.query(AuditLog) + + if action: + query = query.filter(AuditLog.action == action) + if resource: + query = query.filter(AuditLog.resource.ilike(f"%{resource}%")) + if user_id: + query = query.filter(AuditLog.user_id == user_id) + if from_date: + query = query.filter(AuditLog.timestamp >= from_date) + if to_date: + query = query.filter(AuditLog.timestamp <= to_date) + + total = query.count() + total_pages = math.ceil(total / limit) if total > 0 else 1 + + logs = ( + query.order_by(AuditLog.timestamp.desc()) + .offset((page - 1) * limit) + .limit(limit) + .all() + ) + + return PaginatedResponse( + items=logs, + pagination=PaginationMeta( + page=page, + limit=limit, + total=total, + total_pages=total_pages, + ), + ) + + +@router.get( + "/api/v1/projects/{project_name}/audit-logs", + response_model=PaginatedResponse[AuditLogResponse], +) +def list_project_audit_logs( + project_name: str, + action: Optional[str] = Query(None, description="Filter by action type"), + from_date: Optional[datetime] = Query(None, alias="from", description="Start date"), + to_date: Optional[datetime] = Query(None, alias="to", description="End date"), + page: int = Query(1, ge=1), + limit: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db), +): + """List audit logs for a specific project.""" + project = db.query(Project).filter(Project.name == project_name).first() + if not project: + raise HTTPException(status_code=404, detail="Project not found") + + # Match resources that start with project name + resource_pattern = f"{project_name}%" + query = db.query(AuditLog).filter(AuditLog.resource.like(resource_pattern)) + + if action: + query = query.filter(AuditLog.action == action) + if from_date: + query = query.filter(AuditLog.timestamp >= from_date) + if to_date: + query = query.filter(AuditLog.timestamp <= to_date) + + total = query.count() + total_pages = math.ceil(total / limit) if total > 0 else 1 + + logs = ( + query.order_by(AuditLog.timestamp.desc()) + .offset((page - 1) * limit) + .limit(limit) + .all() + ) + + return PaginatedResponse( + items=logs, + pagination=PaginationMeta( + page=page, + limit=limit, + total=total, + total_pages=total_pages, + ), + ) + + +@router.get( + "/api/v1/project/{project_name}/{package_name}/audit-logs", + response_model=PaginatedResponse[AuditLogResponse], +) +def list_package_audit_logs( + project_name: str, + package_name: str, + action: Optional[str] = Query(None, description="Filter by action type"), + from_date: Optional[datetime] = Query(None, alias="from", description="Start date"), + to_date: Optional[datetime] = Query(None, alias="to", description="End date"), + page: int = Query(1, ge=1), + limit: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db), +): + """List audit logs for a specific package.""" + project = db.query(Project).filter(Project.name == project_name).first() + if not project: + raise HTTPException(status_code=404, detail="Project not found") + + package = ( + db.query(Package) + .filter(Package.project_id == project.id, Package.name == package_name) + .first() + ) + if not package: + raise HTTPException(status_code=404, detail="Package not found") + + # Match resources that contain project/package + resource_pattern = f"{project_name}/{package_name}%" + query = db.query(AuditLog).filter(AuditLog.resource.like(resource_pattern)) + + if action: + query = query.filter(AuditLog.action == action) + if from_date: + query = query.filter(AuditLog.timestamp >= from_date) + if to_date: + query = query.filter(AuditLog.timestamp <= to_date) + + total = query.count() + total_pages = math.ceil(total / limit) if total > 0 else 1 + + logs = ( + query.order_by(AuditLog.timestamp.desc()) + .offset((page - 1) * limit) + .limit(limit) + .all() + ) + + return PaginatedResponse( + items=logs, + pagination=PaginationMeta( + page=page, + limit=limit, + total=total, + total_pages=total_pages, + ), + ) + + +# ============================================================================= +# Upload History Endpoints +# ============================================================================= + + +@router.get( + "/api/v1/project/{project_name}/{package_name}/uploads", + response_model=PaginatedResponse[UploadHistoryResponse], +) +def list_package_uploads( + project_name: str, + package_name: str, + from_date: Optional[datetime] = Query(None, alias="from", description="Start date"), + to_date: Optional[datetime] = Query(None, alias="to", description="End date"), + page: int = Query(1, ge=1), + limit: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db), +): + """List upload events for a specific package.""" + project = db.query(Project).filter(Project.name == project_name).first() + if not project: + raise HTTPException(status_code=404, detail="Project not found") + + package = ( + db.query(Package) + .filter(Package.project_id == project.id, Package.name == package_name) + .first() + ) + if not package: + raise HTTPException(status_code=404, detail="Package not found") + + query = db.query(Upload).filter(Upload.package_id == package.id) + + if from_date: + query = query.filter(Upload.uploaded_at >= from_date) + if to_date: + query = query.filter(Upload.uploaded_at <= to_date) + + total = query.count() + total_pages = math.ceil(total / limit) if total > 0 else 1 + + uploads = ( + query.order_by(Upload.uploaded_at.desc()) + .offset((page - 1) * limit) + .limit(limit) + .all() + ) + + # Build response with artifact metadata + items = [] + for upload in uploads: + artifact = db.query(Artifact).filter(Artifact.id == upload.artifact_id).first() + items.append( + UploadHistoryResponse( + id=upload.id, + artifact_id=upload.artifact_id, + package_id=upload.package_id, + package_name=package_name, + project_name=project_name, + original_name=upload.original_name, + tag_name=upload.tag_name, + uploaded_at=upload.uploaded_at, + uploaded_by=upload.uploaded_by, + source_ip=upload.source_ip, + deduplicated=upload.deduplicated or False, + artifact_size=artifact.size if artifact else 0, + artifact_content_type=artifact.content_type if artifact else None, + ) + ) + + return PaginatedResponse( + items=items, + pagination=PaginationMeta( + page=page, + limit=limit, + total=total, + total_pages=total_pages, + ), + ) + + +@router.get( + "/api/v1/artifact/{artifact_id}/uploads", + response_model=PaginatedResponse[UploadHistoryResponse], +) +def list_artifact_uploads( + artifact_id: str, + page: int = Query(1, ge=1), + limit: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db), +): + """List all upload events for a specific artifact.""" + artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first() + if not artifact: + raise HTTPException(status_code=404, detail="Artifact not found") + + query = db.query(Upload).filter(Upload.artifact_id == artifact_id) + + total = query.count() + total_pages = math.ceil(total / limit) if total > 0 else 1 + + uploads = ( + query.order_by(Upload.uploaded_at.desc()) + .offset((page - 1) * limit) + .limit(limit) + .all() + ) + + # Build response with package/project metadata + items = [] + for upload in uploads: + package = db.query(Package).filter(Package.id == upload.package_id).first() + project = ( + db.query(Project).filter(Project.id == package.project_id).first() + if package + else None + ) + items.append( + UploadHistoryResponse( + id=upload.id, + artifact_id=upload.artifact_id, + package_id=upload.package_id, + package_name=package.name if package else "unknown", + project_name=project.name if project else "unknown", + original_name=upload.original_name, + tag_name=upload.tag_name, + uploaded_at=upload.uploaded_at, + uploaded_by=upload.uploaded_by, + source_ip=upload.source_ip, + deduplicated=upload.deduplicated or False, + artifact_size=artifact.size, + artifact_content_type=artifact.content_type, + ) + ) + + return PaginatedResponse( + items=items, + pagination=PaginationMeta( + page=page, + limit=limit, + total=total, + total_pages=total_pages, + ), + ) + + +# ============================================================================= +# Artifact Provenance/History Endpoint +# ============================================================================= + + +@router.get( + "/api/v1/artifact/{artifact_id}/history", response_model=ArtifactProvenanceResponse +) +def get_artifact_provenance( + artifact_id: str, + db: Session = Depends(get_db), +): + """ + Get full provenance/history of an artifact. + + Returns: + - Artifact metadata + - First upload information + - All packages/tags referencing the artifact + - Complete upload history + """ + artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first() + if not artifact: + raise HTTPException(status_code=404, detail="Artifact not found") + + # Get all uploads for this artifact + uploads = ( + db.query(Upload) + .filter(Upload.artifact_id == artifact_id) + .order_by(Upload.uploaded_at.asc()) + .all() + ) + + # Get first upload info + first_upload = uploads[0] if uploads else None + + # Get all tags referencing this artifact + tags = db.query(Tag).filter(Tag.artifact_id == artifact_id).all() + + # Build package list with tags + package_map = {} # package_id -> {project_name, package_name, tag_names} + tag_list = [] + + for tag in tags: + package = db.query(Package).filter(Package.id == tag.package_id).first() + if package: + project = db.query(Project).filter(Project.id == package.project_id).first() + project_name = project.name if project else "unknown" + + # Add to package map + pkg_key = str(package.id) + if pkg_key not in package_map: + package_map[pkg_key] = { + "project_name": project_name, + "package_name": package.name, + "tag_names": [], + } + package_map[pkg_key]["tag_names"].append(tag.name) + + # Add to tag list + tag_list.append( + { + "project_name": project_name, + "package_name": package.name, + "tag_name": tag.name, + "created_at": tag.created_at.isoformat() + if tag.created_at + else None, + } + ) + + # Build upload history + upload_history = [] + for upload in uploads: + package = db.query(Package).filter(Package.id == upload.package_id).first() + project = ( + db.query(Project).filter(Project.id == package.project_id).first() + if package + else None + ) + upload_history.append( + { + "upload_id": str(upload.id), + "project_name": project.name if project else "unknown", + "package_name": package.name if package else "unknown", + "original_name": upload.original_name, + "tag_name": upload.tag_name, + "uploaded_at": upload.uploaded_at.isoformat() + if upload.uploaded_at + else None, + "uploaded_by": upload.uploaded_by, + "deduplicated": upload.deduplicated or False, + } + ) + + return ArtifactProvenanceResponse( + artifact_id=artifact.id, + sha256=artifact.id, + size=artifact.size, + content_type=artifact.content_type, + original_name=artifact.original_name, + created_at=artifact.created_at, + created_by=artifact.created_by, + ref_count=artifact.ref_count, + first_uploaded_at=first_upload.uploaded_at + if first_upload + else artifact.created_at, + first_uploaded_by=first_upload.uploaded_by + if first_upload + else artifact.created_by, + upload_count=len(uploads), + packages=list(package_map.values()), + tags=tag_list, + uploads=upload_history, + ) diff --git a/backend/app/schemas.py b/backend/app/schemas.py index 4c7db29..bb60e76 100644 --- a/backend/app/schemas.py +++ b/backend/app/schemas.py @@ -189,6 +189,93 @@ class TagHistoryResponse(BaseModel): from_attributes = True +class TagHistoryDetailResponse(BaseModel): + """Tag history with artifact metadata for each version""" + + id: UUID + tag_id: UUID + tag_name: str + old_artifact_id: Optional[str] + new_artifact_id: str + changed_at: datetime + changed_by: str + # Artifact metadata for new artifact + artifact_size: int + artifact_original_name: Optional[str] + artifact_content_type: Optional[str] + + class Config: + from_attributes = True + + +# Audit log schemas +class AuditLogResponse(BaseModel): + """Audit log entry response""" + + id: UUID + action: str + resource: str + user_id: str + details: Optional[Dict[str, Any]] + timestamp: datetime + source_ip: Optional[str] + + class Config: + from_attributes = True + + +# Upload history schemas +class UploadHistoryResponse(BaseModel): + """Upload event with artifact details""" + + id: UUID + artifact_id: str + package_id: UUID + package_name: str + project_name: str + original_name: Optional[str] + tag_name: Optional[str] + uploaded_at: datetime + uploaded_by: str + source_ip: Optional[str] + deduplicated: bool + # Artifact metadata + artifact_size: int + artifact_content_type: Optional[str] + + class Config: + from_attributes = True + + +# Artifact provenance schemas +class ArtifactProvenanceResponse(BaseModel): + """Full provenance/history of an artifact""" + + artifact_id: str + sha256: str + size: int + content_type: Optional[str] + original_name: Optional[str] + created_at: datetime + created_by: str + ref_count: int + # First upload info + first_uploaded_at: datetime + first_uploaded_by: str + # Usage statistics + upload_count: int + # References + packages: List[Dict[str, Any]] # List of {project_name, package_name, tag_names} + tags: List[ + Dict[str, Any] + ] # List of {project_name, package_name, tag_name, created_at} + # Upload history + uploads: List[Dict[str, Any]] # List of upload events + + class Config: + from_attributes = True + + class ArtifactTagInfo(BaseModel): """Tag info for embedding in artifact responses""" diff --git a/backend/tests/test_audit_history.py b/backend/tests/test_audit_history.py new file mode 100644 index 0000000..3c68388 --- /dev/null +++ b/backend/tests/test_audit_history.py @@ -0,0 +1,309 @@ +"""Integration tests for audit logs and history endpoints.""" + +import pytest +from datetime import datetime, timedelta + +from tests.conftest import upload_test_file + + +class TestAuditLogsEndpoint: + """Tests for /api/v1/audit-logs endpoint.""" + + @pytest.mark.integration + def test_list_audit_logs_returns_valid_response(self, integration_client): + """Test that audit logs endpoint returns valid paginated response.""" + response = integration_client.get("/api/v1/audit-logs") + assert response.status_code == 200 + + data = response.json() + assert "items" in data + assert "pagination" in data + assert isinstance(data["items"], list) + + pagination = data["pagination"] + assert "page" in pagination + assert "limit" in pagination + assert "total" in pagination + assert "total_pages" in pagination + + @pytest.mark.integration + def test_audit_logs_respects_pagination(self, integration_client): + """Test that audit logs endpoint respects limit parameter.""" + response = integration_client.get("/api/v1/audit-logs?limit=5") + assert response.status_code == 200 + + data = response.json() + assert len(data["items"]) <= 5 + assert data["pagination"]["limit"] == 5 + + @pytest.mark.integration + def test_audit_logs_filter_by_action(self, integration_client, test_package): + """Test filtering audit logs by action type.""" + # Create an action that will be logged + project_name, package_name = test_package + + response = integration_client.get("/api/v1/audit-logs?action=project.create") + assert response.status_code == 200 + + data = response.json() + # All items should have the filtered action + for item in data["items"]: + assert item["action"] == "project.create" + + @pytest.mark.integration + def test_audit_log_entry_has_required_fields( + self, integration_client, test_project + ): + """Test that audit log entries have all required fields.""" + # Force some audit logs by operations on test_project + response = integration_client.get("/api/v1/audit-logs?limit=10") + assert response.status_code == 200 + + data = response.json() + if data["items"]: + item = data["items"][0] + assert "id" in item + assert "action" in item + assert "resource" in item + assert "user_id" in item + assert "timestamp" in item + + +class TestProjectAuditLogs: + """Tests for /api/v1/projects/{project}/audit-logs endpoint.""" + + @pytest.mark.integration + def test_project_audit_logs_returns_200(self, integration_client, test_project): + """Test that project audit logs endpoint returns 200.""" + response = integration_client.get(f"/api/v1/projects/{test_project}/audit-logs") + assert response.status_code == 200 + + data = response.json() + assert "items" in data + assert "pagination" in data + + @pytest.mark.integration + def test_project_audit_logs_not_found(self, integration_client): + """Test that non-existent project returns 404.""" + response = integration_client.get( + "/api/v1/projects/nonexistent-project/audit-logs" + ) + assert response.status_code == 404 + + +class TestPackageAuditLogs: + """Tests for /api/v1/project/{project}/{package}/audit-logs endpoint.""" + + @pytest.mark.integration + def test_package_audit_logs_returns_200(self, integration_client, test_package): + """Test that package audit logs endpoint returns 200.""" + project_name, package_name = test_package + response = integration_client.get( + f"/api/v1/project/{project_name}/{package_name}/audit-logs" + ) + assert response.status_code == 200 + + data = response.json() + assert "items" in data + assert "pagination" in data + + @pytest.mark.integration + def test_package_audit_logs_project_not_found(self, integration_client): + """Test that non-existent project returns 404.""" + response = integration_client.get( + "/api/v1/project/nonexistent/nonexistent/audit-logs" + ) + assert response.status_code == 404 + + @pytest.mark.integration + def test_package_audit_logs_package_not_found( + self, integration_client, test_project + ): + """Test that non-existent package returns 404.""" + response = integration_client.get( + f"/api/v1/project/{test_project}/nonexistent-package/audit-logs" + ) + assert response.status_code == 404 + + +class TestPackageUploads: + """Tests for /api/v1/project/{project}/{package}/uploads endpoint.""" + + @pytest.mark.integration + def test_package_uploads_returns_200(self, integration_client, test_package): + """Test that package uploads endpoint returns 200.""" + project_name, package_name = test_package + response = integration_client.get( + f"/api/v1/project/{project_name}/{package_name}/uploads" + ) + assert response.status_code == 200 + + data = response.json() + assert "items" in data + assert "pagination" in data + + @pytest.mark.integration + def test_package_uploads_after_upload(self, integration_client, test_package): + """Test that uploads are recorded after file upload.""" + project_name, package_name = test_package + + # Upload a file + upload_result = upload_test_file( + integration_client, + project_name, + package_name, + b"test upload content", + "test.txt", + ) + assert upload_result["artifact_id"] + + # Check uploads endpoint + response = integration_client.get( + f"/api/v1/project/{project_name}/{package_name}/uploads" + ) + assert response.status_code == 200 + + data = response.json() + assert len(data["items"]) >= 1 + + # Verify upload record fields + upload = data["items"][0] + assert "artifact_id" in upload + assert "package_name" in upload + assert "project_name" in upload + assert "uploaded_at" in upload + assert "uploaded_by" in upload + + @pytest.mark.integration + def test_package_uploads_project_not_found(self, integration_client): + """Test that non-existent project returns 404.""" + response = integration_client.get( + "/api/v1/project/nonexistent/nonexistent/uploads" + ) + assert response.status_code == 404 + + +class TestArtifactUploads: + """Tests for /api/v1/artifact/{id}/uploads endpoint.""" + + @pytest.mark.integration + def test_artifact_uploads_returns_200(self, integration_client, test_package): + """Test that artifact uploads endpoint returns 200.""" + project_name, package_name = test_package + + # Upload a file + upload_result = upload_test_file( + integration_client, + project_name, + package_name, + b"artifact upload test", + "artifact.txt", + ) + artifact_id = upload_result["artifact_id"] + + response = integration_client.get(f"/api/v1/artifact/{artifact_id}/uploads") + assert response.status_code == 200 + + data = response.json() + assert "items" in data + assert "pagination" in data + assert len(data["items"]) >= 1 + + @pytest.mark.integration + def test_artifact_uploads_not_found(self, integration_client): + """Test that non-existent artifact returns 404.""" + fake_hash = "a" * 64 + response = integration_client.get(f"/api/v1/artifact/{fake_hash}/uploads") + assert response.status_code == 404 + + +class TestArtifactProvenance: + """Tests for /api/v1/artifact/{id}/history endpoint.""" + + @pytest.mark.integration + def test_artifact_history_returns_200(self, integration_client, test_package): + """Test that artifact history endpoint returns 200.""" + project_name, package_name = test_package + + # Upload a file + upload_result = upload_test_file( + integration_client, + project_name, + package_name, + b"provenance test content", + "prov.txt", + ) + artifact_id = upload_result["artifact_id"] + + response = integration_client.get(f"/api/v1/artifact/{artifact_id}/history") + assert response.status_code == 200 + + @pytest.mark.integration + def test_artifact_history_has_required_fields( + self, integration_client, test_package + ): + """Test that artifact history has all required fields.""" + project_name, package_name = test_package + + # Upload a file + upload_result = upload_test_file( + integration_client, + project_name, + package_name, + b"provenance fields test", + "fields.txt", + ) + artifact_id = upload_result["artifact_id"] + + response = integration_client.get(f"/api/v1/artifact/{artifact_id}/history") + assert response.status_code == 200 + + data = response.json() + assert "artifact_id" in data + assert "sha256" in data + assert "size" in data + assert "created_at" in data + assert "created_by" in data + assert "ref_count" in data + assert "first_uploaded_at" in data + assert "first_uploaded_by" in data + assert "upload_count" in data + assert "packages" in data + assert "tags" in data + assert "uploads" in data + + @pytest.mark.integration + def test_artifact_history_not_found(self, integration_client): + """Test that non-existent artifact returns 404.""" + fake_hash = "b" * 64 + response = integration_client.get(f"/api/v1/artifact/{fake_hash}/history") + assert response.status_code == 404 + + @pytest.mark.integration + def test_artifact_history_with_tag(self, integration_client, test_package): + """Test artifact history includes tag information when tagged.""" + project_name, package_name = test_package + + # Upload a file with a tag + upload_result = upload_test_file( + integration_client, + project_name, + package_name, + b"tagged provenance test", + "tagged.txt", + tag="v1.0.0", + ) + artifact_id = upload_result["artifact_id"] + + response = integration_client.get(f"/api/v1/artifact/{artifact_id}/history") + assert response.status_code == 200 + + data = response.json() + # Should have at least one tag + assert len(data["tags"]) >= 1 + + # Tag should have required fields + tag = data["tags"][0] + assert "project_name" in tag + assert "package_name" in tag + assert "tag_name" in tag