This commit is contained in:
2025-10-14 15:37:37 -05:00
commit 6821e717cd
39 changed files with 3346 additions and 0 deletions

0
app/__init__.py Normal file
View File

0
app/api/__init__.py Normal file
View File

242
app/api/artifacts.py Normal file
View File

@@ -0,0 +1,242 @@
from fastapi import APIRouter, UploadFile, File, Form, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from typing import List, Optional
import uuid
import json
import io
from datetime import datetime
from app.database import get_db
from app.models.artifact import Artifact
from app.schemas.artifact import ArtifactCreate, ArtifactResponse, ArtifactQuery
from app.storage import get_storage_backend
router = APIRouter(prefix="/api/v1/artifacts", tags=["artifacts"])
def get_file_type(filename: str) -> str:
"""Determine file type from filename"""
extension = filename.lower().split('.')[-1]
type_mapping = {
'csv': 'csv',
'json': 'json',
'pcap': 'pcap',
'pcapng': 'pcap',
'bin': 'binary',
'dat': 'binary',
}
return type_mapping.get(extension, 'binary')
@router.post("/upload", response_model=ArtifactResponse, status_code=201)
async def upload_artifact(
file: UploadFile = File(...),
test_name: Optional[str] = Form(None),
test_suite: Optional[str] = Form(None),
test_config: Optional[str] = Form(None),
test_result: Optional[str] = Form(None),
metadata: Optional[str] = Form(None),
description: Optional[str] = Form(None),
tags: Optional[str] = Form(None),
version: Optional[str] = Form(None),
parent_id: Optional[int] = Form(None),
db: Session = Depends(get_db)
):
"""
Upload a new artifact file with metadata
- **file**: The file to upload (CSV, JSON, binary, PCAP)
- **test_name**: Name of the test
- **test_suite**: Test suite identifier
- **test_config**: JSON string of test configuration
- **test_result**: Test result (pass, fail, skip, error)
- **metadata**: JSON string of additional metadata
- **description**: Text description of the artifact
- **tags**: JSON array of tags (as string)
- **version**: Version identifier
- **parent_id**: ID of parent artifact (for versioning)
"""
try:
# Parse JSON fields
test_config_dict = json.loads(test_config) if test_config else None
metadata_dict = json.loads(metadata) if metadata else None
tags_list = json.loads(tags) if tags else None
# Generate unique storage path
file_extension = file.filename.split('.')[-1] if '.' in file.filename else ''
object_name = f"{uuid.uuid4()}.{file_extension}" if file_extension else str(uuid.uuid4())
# Upload to storage backend
storage = get_storage_backend()
file_content = await file.read()
file_size = len(file_content)
storage_path = await storage.upload_file(
io.BytesIO(file_content),
object_name
)
# Create database record
artifact = Artifact(
filename=file.filename,
file_type=get_file_type(file.filename),
file_size=file_size,
storage_path=storage_path,
content_type=file.content_type,
test_name=test_name,
test_suite=test_suite,
test_config=test_config_dict,
test_result=test_result,
metadata=metadata_dict,
description=description,
tags=tags_list,
version=version,
parent_id=parent_id
)
db.add(artifact)
db.commit()
db.refresh(artifact)
return artifact
except json.JSONDecodeError as e:
raise HTTPException(status_code=400, detail=f"Invalid JSON in metadata fields: {str(e)}")
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Upload failed: {str(e)}")
@router.get("/{artifact_id}", response_model=ArtifactResponse)
async def get_artifact(artifact_id: int, db: Session = Depends(get_db)):
"""Get artifact metadata by ID"""
artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first()
if not artifact:
raise HTTPException(status_code=404, detail="Artifact not found")
return artifact
@router.get("/{artifact_id}/download")
async def download_artifact(artifact_id: int, db: Session = Depends(get_db)):
"""Download artifact file by ID"""
artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first()
if not artifact:
raise HTTPException(status_code=404, detail="Artifact not found")
try:
storage = get_storage_backend()
# Extract object name from storage path
object_name = artifact.storage_path.split('/')[-1]
file_data = await storage.download_file(object_name)
return StreamingResponse(
io.BytesIO(file_data),
media_type=artifact.content_type or "application/octet-stream",
headers={
"Content-Disposition": f'attachment; filename="{artifact.filename}"'
}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Download failed: {str(e)}")
@router.get("/{artifact_id}/url")
async def get_artifact_url(
artifact_id: int,
expiration: int = Query(default=3600, ge=60, le=86400),
db: Session = Depends(get_db)
):
"""Get presigned URL for artifact download"""
artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first()
if not artifact:
raise HTTPException(status_code=404, detail="Artifact not found")
try:
storage = get_storage_backend()
object_name = artifact.storage_path.split('/')[-1]
url = await storage.get_file_url(object_name, expiration)
return {"url": url, "expires_in": expiration}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to generate URL: {str(e)}")
@router.delete("/{artifact_id}")
async def delete_artifact(artifact_id: int, db: Session = Depends(get_db)):
"""Delete artifact and its file"""
artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first()
if not artifact:
raise HTTPException(status_code=404, detail="Artifact not found")
try:
# Delete from storage
storage = get_storage_backend()
object_name = artifact.storage_path.split('/')[-1]
await storage.delete_file(object_name)
# Delete from database
db.delete(artifact)
db.commit()
return {"message": "Artifact deleted successfully"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"Delete failed: {str(e)}")
@router.post("/query", response_model=List[ArtifactResponse])
async def query_artifacts(query: ArtifactQuery, db: Session = Depends(get_db)):
"""
Query artifacts with filters
- **filename**: Filter by filename (partial match)
- **file_type**: Filter by file type
- **test_name**: Filter by test name
- **test_suite**: Filter by test suite
- **test_result**: Filter by test result
- **tags**: Filter by tags (must contain all specified tags)
- **start_date**: Filter by creation date (from)
- **end_date**: Filter by creation date (to)
- **limit**: Maximum number of results
- **offset**: Number of results to skip
"""
q = db.query(Artifact)
if query.filename:
q = q.filter(Artifact.filename.ilike(f"%{query.filename}%"))
if query.file_type:
q = q.filter(Artifact.file_type == query.file_type)
if query.test_name:
q = q.filter(Artifact.test_name.ilike(f"%{query.test_name}%"))
if query.test_suite:
q = q.filter(Artifact.test_suite == query.test_suite)
if query.test_result:
q = q.filter(Artifact.test_result == query.test_result)
if query.tags:
for tag in query.tags:
q = q.filter(Artifact.tags.contains([tag]))
if query.start_date:
q = q.filter(Artifact.created_at >= query.start_date)
if query.end_date:
q = q.filter(Artifact.created_at <= query.end_date)
# Order by creation date descending
q = q.order_by(Artifact.created_at.desc())
# Apply pagination
artifacts = q.offset(query.offset).limit(query.limit).all()
return artifacts
@router.get("/", response_model=List[ArtifactResponse])
async def list_artifacts(
limit: int = Query(default=100, le=1000),
offset: int = Query(default=0, ge=0),
db: Session = Depends(get_db)
):
"""List all artifacts with pagination"""
artifacts = db.query(Artifact).order_by(
Artifact.created_at.desc()
).offset(offset).limit(limit).all()
return artifacts

35
app/config.py Normal file
View File

@@ -0,0 +1,35 @@
from pydantic_settings import BaseSettings
from typing import Literal
class Settings(BaseSettings):
# Database
database_url: str = "postgresql://user:password@localhost:5432/datalake"
# Storage Backend
storage_backend: Literal["s3", "minio"] = "minio"
# AWS S3
aws_access_key_id: str = ""
aws_secret_access_key: str = ""
aws_region: str = "us-east-1"
s3_bucket_name: str = "test-artifacts"
# MinIO
minio_endpoint: str = "localhost:9000"
minio_access_key: str = "minioadmin"
minio_secret_key: str = "minioadmin"
minio_bucket_name: str = "test-artifacts"
minio_secure: bool = False
# Application
api_host: str = "0.0.0.0"
api_port: int = 8000
max_upload_size: int = 524288000 # 500MB
class Config:
env_file = ".env"
case_sensitive = False
settings = Settings()

21
app/database.py Normal file
View File

@@ -0,0 +1,21 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.config import settings
from app.models.artifact import Base
engine = create_engine(settings.database_url)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def init_db():
"""Initialize database tables"""
Base.metadata.create_all(bind=engine)
def get_db():
"""Dependency for getting database session"""
db = SessionLocal()
try:
yield db
finally:
db.close()

71
app/main.py Normal file
View File

@@ -0,0 +1,71 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.api.artifacts import router as artifacts_router
from app.database import init_db
from app.config import settings
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Create FastAPI app
app = FastAPI(
title="Test Artifact Data Lake",
description="API for storing and querying test artifacts including CSV, JSON, binary files, and packet captures",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(artifacts_router)
@app.on_event("startup")
async def startup_event():
"""Initialize database on startup"""
logger.info("Initializing database...")
init_db()
logger.info(f"Using storage backend: {settings.storage_backend}")
logger.info("Application started successfully")
@app.get("/")
async def root():
"""Root endpoint"""
return {
"message": "Test Artifact Data Lake API",
"version": "1.0.0",
"docs": "/docs",
"storage_backend": settings.storage_backend
}
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"app.main:app",
host=settings.api_host,
port=settings.api_port,
reload=True
)

3
app/models/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .artifact import Artifact
__all__ = ["Artifact"]

38
app/models/artifact.py Normal file
View File

@@ -0,0 +1,38 @@
from sqlalchemy import Column, String, Integer, DateTime, JSON, BigInteger, Text
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
Base = declarative_base()
class Artifact(Base):
__tablename__ = "artifacts"
id = Column(Integer, primary_key=True, index=True)
filename = Column(String(500), nullable=False, index=True)
file_type = Column(String(50), nullable=False, index=True) # csv, json, binary, pcap
file_size = Column(BigInteger, nullable=False)
storage_path = Column(String(1000), nullable=False)
content_type = Column(String(100))
# Test metadata
test_name = Column(String(500), index=True)
test_suite = Column(String(500), index=True)
test_config = Column(JSON)
test_result = Column(String(50), index=True) # pass, fail, skip, error
# Additional metadata
metadata = Column(JSON)
description = Column(Text)
tags = Column(JSON) # Array of tags for categorization
# Timestamps
created_at = Column(DateTime, default=datetime.utcnow, index=True)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Versioning
version = Column(String(50))
parent_id = Column(Integer, index=True) # For file versioning
def __repr__(self):
return f"<Artifact(id={self.id}, filename='{self.filename}', test_name='{self.test_name}')>"

3
app/schemas/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
from .artifact import ArtifactCreate, ArtifactResponse, ArtifactQuery
__all__ = ["ArtifactCreate", "ArtifactResponse", "ArtifactQuery"]

51
app/schemas/artifact.py Normal file
View File

@@ -0,0 +1,51 @@
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any, List
from datetime import datetime
class ArtifactCreate(BaseModel):
test_name: Optional[str] = None
test_suite: Optional[str] = None
test_config: Optional[Dict[str, Any]] = None
test_result: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
description: Optional[str] = None
tags: Optional[List[str]] = None
version: Optional[str] = None
parent_id: Optional[int] = None
class ArtifactResponse(BaseModel):
id: int
filename: str
file_type: str
file_size: int
storage_path: str
content_type: Optional[str] = None
test_name: Optional[str] = None
test_suite: Optional[str] = None
test_config: Optional[Dict[str, Any]] = None
test_result: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
description: Optional[str] = None
tags: Optional[List[str]] = None
created_at: datetime
updated_at: datetime
version: Optional[str] = None
parent_id: Optional[int] = None
class Config:
from_attributes = True
class ArtifactQuery(BaseModel):
filename: Optional[str] = None
file_type: Optional[str] = None
test_name: Optional[str] = None
test_suite: Optional[str] = None
test_result: Optional[str] = None
tags: Optional[List[str]] = None
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
limit: int = Field(default=100, le=1000)
offset: int = Field(default=0, ge=0)

6
app/storage/__init__.py Normal file
View File

@@ -0,0 +1,6 @@
from .base import StorageBackend
from .s3_backend import S3Backend
from .minio_backend import MinIOBackend
from .factory import get_storage_backend
__all__ = ["StorageBackend", "S3Backend", "MinIOBackend", "get_storage_backend"]

73
app/storage/base.py Normal file
View File

@@ -0,0 +1,73 @@
from abc import ABC, abstractmethod
from typing import BinaryIO
class StorageBackend(ABC):
"""Abstract base class for storage backends"""
@abstractmethod
async def upload_file(self, file_data: BinaryIO, object_name: str) -> str:
"""
Upload a file to storage
Args:
file_data: Binary file data
object_name: Name/path of the object in storage
Returns:
Storage path/URL of uploaded file
"""
pass
@abstractmethod
async def download_file(self, object_name: str) -> bytes:
"""
Download a file from storage
Args:
object_name: Name/path of the object in storage
Returns:
Binary file data
"""
pass
@abstractmethod
async def delete_file(self, object_name: str) -> bool:
"""
Delete a file from storage
Args:
object_name: Name/path of the object in storage
Returns:
True if successful
"""
pass
@abstractmethod
async def file_exists(self, object_name: str) -> bool:
"""
Check if a file exists in storage
Args:
object_name: Name/path of the object in storage
Returns:
True if file exists
"""
pass
@abstractmethod
async def get_file_url(self, object_name: str, expiration: int = 3600) -> str:
"""
Get a presigned URL for downloading a file
Args:
object_name: Name/path of the object in storage
expiration: URL expiration time in seconds
Returns:
Presigned URL
"""
pass

17
app/storage/factory.py Normal file
View File

@@ -0,0 +1,17 @@
from app.storage.base import StorageBackend
from app.storage.s3_backend import S3Backend
from app.storage.minio_backend import MinIOBackend
from app.config import settings
def get_storage_backend() -> StorageBackend:
"""
Factory function to get the appropriate storage backend
based on configuration
"""
if settings.storage_backend == "s3":
return S3Backend()
elif settings.storage_backend == "minio":
return MinIOBackend()
else:
raise ValueError(f"Unsupported storage backend: {settings.storage_backend}")

View File

@@ -0,0 +1,88 @@
import boto3
from botocore.exceptions import ClientError
from botocore.client import Config
from typing import BinaryIO
from app.storage.base import StorageBackend
from app.config import settings
import logging
logger = logging.getLogger(__name__)
class MinIOBackend(StorageBackend):
"""MinIO storage backend implementation (S3-compatible)"""
def __init__(self):
# MinIO uses S3-compatible API
self.s3_client = boto3.client(
's3',
endpoint_url=f"{'https' if settings.minio_secure else 'http'}://{settings.minio_endpoint}",
aws_access_key_id=settings.minio_access_key,
aws_secret_access_key=settings.minio_secret_key,
config=Config(signature_version='s3v4'),
region_name='us-east-1'
)
self.bucket_name = settings.minio_bucket_name
self._ensure_bucket_exists()
def _ensure_bucket_exists(self):
"""Create bucket if it doesn't exist"""
try:
self.s3_client.head_bucket(Bucket=self.bucket_name)
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
try:
self.s3_client.create_bucket(Bucket=self.bucket_name)
logger.info(f"Created MinIO bucket: {self.bucket_name}")
except ClientError as create_error:
logger.error(f"Failed to create bucket: {create_error}")
raise
async def upload_file(self, file_data: BinaryIO, object_name: str) -> str:
"""Upload file to MinIO"""
try:
self.s3_client.upload_fileobj(file_data, self.bucket_name, object_name)
return f"minio://{self.bucket_name}/{object_name}"
except ClientError as e:
logger.error(f"Failed to upload file to MinIO: {e}")
raise
async def download_file(self, object_name: str) -> bytes:
"""Download file from MinIO"""
try:
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=object_name)
return response['Body'].read()
except ClientError as e:
logger.error(f"Failed to download file from MinIO: {e}")
raise
async def delete_file(self, object_name: str) -> bool:
"""Delete file from MinIO"""
try:
self.s3_client.delete_object(Bucket=self.bucket_name, Key=object_name)
return True
except ClientError as e:
logger.error(f"Failed to delete file from MinIO: {e}")
return False
async def file_exists(self, object_name: str) -> bool:
"""Check if file exists in MinIO"""
try:
self.s3_client.head_object(Bucket=self.bucket_name, Key=object_name)
return True
except ClientError:
return False
async def get_file_url(self, object_name: str, expiration: int = 3600) -> str:
"""Generate presigned URL for MinIO object"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': object_name},
ExpiresIn=expiration
)
return url
except ClientError as e:
logger.error(f"Failed to generate presigned URL: {e}")
raise

87
app/storage/s3_backend.py Normal file
View File

@@ -0,0 +1,87 @@
import boto3
from botocore.exceptions import ClientError
from typing import BinaryIO
from app.storage.base import StorageBackend
from app.config import settings
import logging
logger = logging.getLogger(__name__)
class S3Backend(StorageBackend):
"""AWS S3 storage backend implementation"""
def __init__(self):
self.s3_client = boto3.client(
's3',
aws_access_key_id=settings.aws_access_key_id,
aws_secret_access_key=settings.aws_secret_access_key,
region_name=settings.aws_region
)
self.bucket_name = settings.s3_bucket_name
self._ensure_bucket_exists()
def _ensure_bucket_exists(self):
"""Create bucket if it doesn't exist"""
try:
self.s3_client.head_bucket(Bucket=self.bucket_name)
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
try:
self.s3_client.create_bucket(
Bucket=self.bucket_name,
CreateBucketConfiguration={'LocationConstraint': settings.aws_region}
)
logger.info(f"Created S3 bucket: {self.bucket_name}")
except ClientError as create_error:
logger.error(f"Failed to create bucket: {create_error}")
raise
async def upload_file(self, file_data: BinaryIO, object_name: str) -> str:
"""Upload file to S3"""
try:
self.s3_client.upload_fileobj(file_data, self.bucket_name, object_name)
return f"s3://{self.bucket_name}/{object_name}"
except ClientError as e:
logger.error(f"Failed to upload file to S3: {e}")
raise
async def download_file(self, object_name: str) -> bytes:
"""Download file from S3"""
try:
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=object_name)
return response['Body'].read()
except ClientError as e:
logger.error(f"Failed to download file from S3: {e}")
raise
async def delete_file(self, object_name: str) -> bool:
"""Delete file from S3"""
try:
self.s3_client.delete_object(Bucket=self.bucket_name, Key=object_name)
return True
except ClientError as e:
logger.error(f"Failed to delete file from S3: {e}")
return False
async def file_exists(self, object_name: str) -> bool:
"""Check if file exists in S3"""
try:
self.s3_client.head_object(Bucket=self.bucket_name, Key=object_name)
return True
except ClientError:
return False
async def get_file_url(self, object_name: str, expiration: int = 3600) -> str:
"""Generate presigned URL for S3 object"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': object_name},
ExpiresIn=expiration
)
return url
except ClientError as e:
logger.error(f"Failed to generate presigned URL: {e}")
raise