Files
warehouse13/utils/seed_data.py
pratik 9303f3481b Merge main into f/updates
Resolved conflicts by keeping f/updates changes:
- Keep Angular frontend with dark theme styling
- Keep updated quickstart scripts at root level
- Remove static HTML/JS files (replaced by Angular)
- Keep sim_source_id field implementation
- Merge backend improvements from main

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-15 11:44:01 -05:00

438 lines
15 KiB
Python
Executable File

#!/usr/bin/env python
"""
Utility functions for generating seed data for testing the Data Lake.
This module provides functions to:
- Generate random test artifacts (CSV, JSON, binary, PCAP files)
- Upload them to the database and storage backend
- Clear all data for testing purposes
"""
import os
import sys
import io
import random
import json
import csv
from datetime import datetime, timedelta
from typing import List, Dict, Any
import uuid
# Add parent directory to path to import app modules
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from app.database import SessionLocal
from app.models.artifact import Artifact
from app.models.tag import Tag
from app.storage import get_storage_backend
from app.config import settings
# Sample data for generating realistic test artifacts
TEST_NAMES = [
"user_authentication", "payment_processing", "data_validation",
"api_endpoint_test", "database_connection", "file_upload",
"performance_test", "stress_test", "security_scan",
"regression_test", "smoke_test", "integration_test"
]
TEST_SUITES = [
"authentication", "payments", "api", "database",
"ui", "performance", "security", "integration"
]
TEST_RESULTS = ["pass", "fail", "skip", "error"]
TAGS = [
"regression", "smoke", "critical", "high-priority",
"automated", "manual", "performance", "security",
"integration", "unit", "e2e", "api"
]
# Predefined tags with descriptions and colors
PREDEFINED_TAGS = [
{"name": "regression", "description": "Regression tests to verify existing functionality", "color": "#FF6B6B"},
{"name": "smoke", "description": "Quick smoke tests for basic functionality", "color": "#4ECDC4"},
{"name": "critical", "description": "Critical tests that must pass", "color": "#E74C3C"},
{"name": "high-priority", "description": "High priority tests", "color": "#F39C12"},
{"name": "automated", "description": "Automated test execution", "color": "#3498DB"},
{"name": "manual", "description": "Manual test execution required", "color": "#9B59B6"},
{"name": "performance", "description": "Performance and load tests", "color": "#1ABC9C"},
{"name": "security", "description": "Security and vulnerability tests", "color": "#E67E22"},
{"name": "integration", "description": "Integration tests between components", "color": "#2ECC71"},
{"name": "unit", "description": "Unit tests for individual components", "color": "#16A085"},
{"name": "e2e", "description": "End-to-end user journey tests", "color": "#8E44AD"},
{"name": "api", "description": "API endpoint tests", "color": "#2C3E50"},
]
def generate_csv_content() -> bytes:
"""Generate random CSV test data"""
output = io.StringIO()
writer = csv.writer(output)
# Header
writer.writerow(["timestamp", "test_case", "result", "duration_ms", "error_message"])
# Random rows
num_rows = random.randint(10, 100)
for i in range(num_rows):
timestamp = datetime.now() - timedelta(minutes=random.randint(0, 1000))
test_case = f"test_case_{random.randint(1, 50)}"
result = random.choice(TEST_RESULTS)
duration = random.randint(100, 5000)
error = "" if result == "pass" else f"Error_{random.randint(1, 10)}"
writer.writerow([timestamp.isoformat(), test_case, result, duration, error])
return output.getvalue().encode('utf-8')
def generate_json_content() -> bytes:
"""Generate random JSON test configuration"""
config = {
"test_run_id": str(uuid.uuid4()),
"timestamp": datetime.now().isoformat(),
"environment": random.choice(["dev", "staging", "prod"]),
"browser": random.choice(["chrome", "firefox", "safari", "edge"]),
"timeout": random.randint(30, 300),
"retries": random.randint(0, 3),
"parallel_threads": random.randint(1, 10),
"test_data": {
"users": random.randint(10, 1000),
"iterations": random.randint(1, 100),
"success_rate": round(random.uniform(0.7, 1.0), 2)
}
}
return json.dumps(config, indent=2).encode('utf-8')
def generate_binary_content() -> bytes:
"""Generate random binary data"""
size = random.randint(1024, 10240) # 1-10KB
return os.urandom(size)
def generate_pcap_content() -> bytes:
"""Generate fake PCAP file header (simplified)"""
# This is a simplified PCAP file header for demonstration
# Real PCAP files would have proper packet data
pcap_header = bytearray([
0xd4, 0xc3, 0xb2, 0xa1, # Magic number
0x02, 0x00, 0x04, 0x00, # Version
0x00, 0x00, 0x00, 0x00, # Timezone
0x00, 0x00, 0x00, 0x00, # Timestamp accuracy
0xff, 0xff, 0x00, 0x00, # Snapshot length
0x01, 0x00, 0x00, 0x00 # Link-layer type
])
# Add some random data to simulate packets
pcap_header.extend(os.urandom(random.randint(500, 2000)))
return bytes(pcap_header)
def create_artifact_data(index: int, sim_source_id: str = None) -> Dict[str, Any]:
"""Generate metadata for an artifact"""
test_name = random.choice(TEST_NAMES)
test_suite = random.choice(TEST_SUITES)
test_result = random.choice(TEST_RESULTS)
# Generate random tags (1-4 tags)
num_tags = random.randint(1, 4)
artifact_tags = random.sample(TAGS, num_tags)
# Generate test config
test_config = {
"environment": random.choice(["dev", "staging", "prod"]),
"timeout": random.randint(30, 300),
"retries": random.randint(0, 3)
}
# Generate custom metadata
custom_metadata = {
"build_number": random.randint(1000, 9999),
"commit_hash": uuid.uuid4().hex[:8],
"triggered_by": random.choice(["manual", "scheduled", "webhook"])
}
# Random version
version = f"v{random.randint(1, 5)}.{random.randint(0, 10)}.{random.randint(0, 20)}"
# Random creation date (within last 30 days)
created_days_ago = random.randint(0, 30)
created_at = datetime.now() - timedelta(days=created_days_ago, hours=random.randint(0, 23))
return {
"test_name": test_name,
"test_suite": test_suite,
"test_result": test_result,
"sim_source_id": sim_source_id,
"tags": artifact_tags,
"test_config": test_config,
"custom_metadata": custom_metadata,
"version": version,
"description": f"Test artifact {index} for {test_name}",
"created_at": created_at
}
async def upload_artifact_to_storage(file_content: bytes, filename: str) -> str:
"""Upload file to storage backend"""
storage = get_storage_backend()
file_extension = filename.split('.')[-1] if '.' in filename else ''
object_name = f"{uuid.uuid4()}.{file_extension}" if file_extension else str(uuid.uuid4())
storage_path = await storage.upload_file(
io.BytesIO(file_content),
object_name
)
return storage_path
def get_file_type(filename: str) -> str:
"""Determine file type from filename"""
extension = filename.lower().split('.')[-1]
type_mapping = {
'csv': 'csv',
'json': 'json',
'pcap': 'pcap',
'pcapng': 'pcap',
'bin': 'binary',
'dat': 'binary',
}
return type_mapping.get(extension, 'binary')
async def seed_predefined_tags() -> List[int]:
"""
Seed predefined tags into the database.
Returns:
List of created tag IDs
"""
db = SessionLocal()
tag_ids = []
try:
print("Seeding predefined tags...")
for tag_data in PREDEFINED_TAGS:
# Check if tag already exists
existing_tag = db.query(Tag).filter(Tag.name == tag_data["name"]).first()
if existing_tag:
print(f" Tag '{tag_data['name']}' already exists, skipping...")
tag_ids.append(existing_tag.id)
continue
tag = Tag(
name=tag_data["name"],
description=tag_data["description"],
color=tag_data["color"]
)
db.add(tag)
db.commit()
db.refresh(tag)
tag_ids.append(tag.id)
print(f" Created tag: {tag_data['name']}")
print(f"✓ Successfully seeded {len(tag_ids)} tags")
return tag_ids
except Exception as e:
db.rollback()
print(f"✗ Error seeding tags: {e}")
raise
finally:
db.close()
async def generate_seed_data(num_artifacts: int = 50) -> List[int]:
"""
Generate and upload seed data to the database and storage.
Args:
num_artifacts: Number of artifacts to generate (default: 50)
Returns:
List of created artifact IDs
"""
db = SessionLocal()
artifact_ids = []
try:
# First, seed tags
print("Step 1: Seeding tags...")
await seed_predefined_tags()
print(f"\nStep 2: Generating {num_artifacts} seed artifacts...")
print(f"Deployment mode: {settings.deployment_mode}")
print(f"Storage backend: {settings.storage_backend}")
# Generate SIM source IDs - each source will have 2-4 artifacts
num_sim_sources = max(num_artifacts // 3, 1)
sim_sources = [f"sim_run_{uuid.uuid4().hex[:8]}" for _ in range(num_sim_sources)]
# Pre-assign artifacts to SIM sources to ensure grouping
sim_source_assignments = []
for sim_source in sim_sources:
# Each SIM source gets 2-4 artifacts
num_artifacts_for_source = random.randint(2, 4)
sim_source_assignments.extend([sim_source] * num_artifacts_for_source)
# Pad remaining artifacts with None (ungrouped) or random sources
while len(sim_source_assignments) < num_artifacts:
if random.random() < 0.3: # 30% ungrouped
sim_source_assignments.append(None)
else:
sim_source_assignments.append(random.choice(sim_sources))
# Shuffle to randomize order
random.shuffle(sim_source_assignments)
for i in range(num_artifacts):
# Randomly choose file type
file_type_choice = random.choice(['csv', 'json', 'binary', 'pcap'])
if file_type_choice == 'csv':
filename = f"test_results_{i}.csv"
content = generate_csv_content()
content_type = "text/csv"
elif file_type_choice == 'json':
filename = f"test_config_{i}.json"
content = generate_json_content()
content_type = "application/json"
elif file_type_choice == 'pcap':
filename = f"network_capture_{i}.pcap"
content = generate_pcap_content()
content_type = "application/vnd.tcpdump.pcap"
else:
filename = f"test_data_{i}.bin"
content = generate_binary_content()
content_type = "application/octet-stream"
# Upload to storage
storage_path = await upload_artifact_to_storage(content, filename)
# Get pre-assigned SIM source ID for this artifact
sim_source_id = sim_source_assignments[i]
# Generate metadata
artifact_data = create_artifact_data(i, sim_source_id)
# Create database record
artifact = Artifact(
filename=filename,
file_type=get_file_type(filename),
file_size=len(content),
storage_path=storage_path,
content_type=content_type,
test_name=artifact_data["test_name"],
test_suite=artifact_data["test_suite"],
test_config=artifact_data["test_config"],
test_result=artifact_data["test_result"],
sim_source_id=artifact_data["sim_source_id"],
custom_metadata=artifact_data["custom_metadata"],
description=artifact_data["description"],
tags=artifact_data["tags"],
version=artifact_data["version"],
created_at=artifact_data["created_at"],
updated_at=artifact_data["created_at"]
)
db.add(artifact)
db.commit()
db.refresh(artifact)
artifact_ids.append(artifact.id)
if (i + 1) % 10 == 0:
print(f" Created {i + 1}/{num_artifacts} artifacts...")
print(f"✓ Successfully created {len(artifact_ids)} artifacts")
return artifact_ids
except Exception as e:
db.rollback()
print(f"✗ Error generating seed data: {e}")
raise
finally:
db.close()
async def clear_all_data():
"""
Clear all artifacts and tags from database and storage.
WARNING: This will delete ALL data!
"""
db = SessionLocal()
storage = get_storage_backend()
try:
print("Clearing all data...")
# Clear artifacts
artifacts = db.query(Artifact).all()
artifact_count = len(artifacts)
if artifact_count > 0:
print(f"Found {artifact_count} artifacts to delete...")
# Delete from storage and database
for i, artifact in enumerate(artifacts):
try:
# Delete from storage
object_name = artifact.storage_path.split('/')[-1]
await storage.delete_file(object_name)
except Exception as e:
print(f" Warning: Could not delete {artifact.filename} from storage: {e}")
# Delete from database
db.delete(artifact)
if (i + 1) % 10 == 0:
print(f" Deleted {i + 1}/{artifact_count} artifacts...")
db.commit()
print(f"✓ Successfully deleted {artifact_count} artifacts")
else:
print("No artifacts to delete.")
# Clear tags
tags = db.query(Tag).all()
tag_count = len(tags)
if tag_count > 0:
print(f"Found {tag_count} tags to delete...")
for tag in tags:
db.delete(tag)
db.commit()
print(f"✓ Successfully deleted {tag_count} tags")
else:
print("No tags to delete.")
except Exception as e:
db.rollback()
print(f"✗ Error clearing data: {e}")
raise
finally:
db.close()
# CLI interface
if __name__ == "__main__":
import asyncio
import argparse
parser = argparse.ArgumentParser(description="Generate or clear seed data for Data Lake")
parser.add_argument("action", choices=["generate", "clear"], help="Action to perform")
parser.add_argument("--count", type=int, default=50, help="Number of artifacts to generate (default: 50)")
args = parser.parse_args()
if args.action == "generate":
asyncio.run(generate_seed_data(args.count))
elif args.action == "clear":
confirm = input("Are you sure you want to delete ALL data? (yes/no): ")
if confirm.lower() == "yes":
asyncio.run(clear_all_data())
else:
print("Aborted.")