#!/usr/bin/env python """ Utility functions for generating seed data for testing the Data Lake. This module provides functions to: - Generate random test artifacts (CSV, JSON, binary, PCAP files) - Upload them to the database and storage backend - Clear all data for testing purposes """ import os import sys import io import random import json import csv from datetime import datetime, timedelta from typing import List, Dict, Any import uuid # Add parent directory to path to import app modules sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from app.database import SessionLocal from app.models.artifact import Artifact from app.models.tag import Tag from app.storage import get_storage_backend from app.config import settings # Sample data for generating realistic test artifacts TEST_NAMES = [ "user_authentication", "payment_processing", "data_validation", "api_endpoint_test", "database_connection", "file_upload", "performance_test", "stress_test", "security_scan", "regression_test", "smoke_test", "integration_test" ] TEST_SUITES = [ "authentication", "payments", "api", "database", "ui", "performance", "security", "integration" ] TEST_RESULTS = ["pass", "fail", "skip", "error"] TAGS = [ "regression", "smoke", "critical", "high-priority", "automated", "manual", "performance", "security", "integration", "unit", "e2e", "api" ] # Predefined tags with descriptions and colors PREDEFINED_TAGS = [ {"name": "regression", "description": "Regression tests to verify existing functionality", "color": "#FF6B6B"}, {"name": "smoke", "description": "Quick smoke tests for basic functionality", "color": "#4ECDC4"}, {"name": "critical", "description": "Critical tests that must pass", "color": "#E74C3C"}, {"name": "high-priority", "description": "High priority tests", "color": "#F39C12"}, {"name": "automated", "description": "Automated test execution", "color": "#3498DB"}, {"name": "manual", "description": "Manual test execution required", "color": "#9B59B6"}, {"name": "performance", "description": "Performance and load tests", "color": "#1ABC9C"}, {"name": "security", "description": "Security and vulnerability tests", "color": "#E67E22"}, {"name": "integration", "description": "Integration tests between components", "color": "#2ECC71"}, {"name": "unit", "description": "Unit tests for individual components", "color": "#16A085"}, {"name": "e2e", "description": "End-to-end user journey tests", "color": "#8E44AD"}, {"name": "api", "description": "API endpoint tests", "color": "#2C3E50"}, ] def generate_csv_content() -> bytes: """Generate random CSV test data""" output = io.StringIO() writer = csv.writer(output) # Header writer.writerow(["timestamp", "test_case", "result", "duration_ms", "error_message"]) # Random rows num_rows = random.randint(10, 100) for i in range(num_rows): timestamp = datetime.now() - timedelta(minutes=random.randint(0, 1000)) test_case = f"test_case_{random.randint(1, 50)}" result = random.choice(TEST_RESULTS) duration = random.randint(100, 5000) error = "" if result == "pass" else f"Error_{random.randint(1, 10)}" writer.writerow([timestamp.isoformat(), test_case, result, duration, error]) return output.getvalue().encode('utf-8') def generate_json_content() -> bytes: """Generate random JSON test configuration""" config = { "test_run_id": str(uuid.uuid4()), "timestamp": datetime.now().isoformat(), "environment": random.choice(["dev", "staging", "prod"]), "browser": random.choice(["chrome", "firefox", "safari", "edge"]), "timeout": random.randint(30, 300), "retries": random.randint(0, 3), "parallel_threads": random.randint(1, 10), "test_data": { "users": random.randint(10, 1000), "iterations": random.randint(1, 100), "success_rate": round(random.uniform(0.7, 1.0), 2) } } return json.dumps(config, indent=2).encode('utf-8') def generate_binary_content() -> bytes: """Generate random binary data""" size = random.randint(1024, 10240) # 1-10KB return os.urandom(size) def generate_pcap_content() -> bytes: """Generate fake PCAP file header (simplified)""" # This is a simplified PCAP file header for demonstration # Real PCAP files would have proper packet data pcap_header = bytearray([ 0xd4, 0xc3, 0xb2, 0xa1, # Magic number 0x02, 0x00, 0x04, 0x00, # Version 0x00, 0x00, 0x00, 0x00, # Timezone 0x00, 0x00, 0x00, 0x00, # Timestamp accuracy 0xff, 0xff, 0x00, 0x00, # Snapshot length 0x01, 0x00, 0x00, 0x00 # Link-layer type ]) # Add some random data to simulate packets pcap_header.extend(os.urandom(random.randint(500, 2000))) return bytes(pcap_header) def create_artifact_data(index: int, sim_source_id: str = None) -> Dict[str, Any]: """Generate metadata for an artifact""" test_name = random.choice(TEST_NAMES) test_suite = random.choice(TEST_SUITES) test_result = random.choice(TEST_RESULTS) # Generate random tags (1-4 tags) num_tags = random.randint(1, 4) artifact_tags = random.sample(TAGS, num_tags) # Generate test config test_config = { "environment": random.choice(["dev", "staging", "prod"]), "timeout": random.randint(30, 300), "retries": random.randint(0, 3) } # Generate custom metadata custom_metadata = { "build_number": random.randint(1000, 9999), "commit_hash": uuid.uuid4().hex[:8], "triggered_by": random.choice(["manual", "scheduled", "webhook"]) } # Random version version = f"v{random.randint(1, 5)}.{random.randint(0, 10)}.{random.randint(0, 20)}" # Random creation date (within last 30 days) created_days_ago = random.randint(0, 30) created_at = datetime.now() - timedelta(days=created_days_ago, hours=random.randint(0, 23)) return { "test_name": test_name, "test_suite": test_suite, "test_result": test_result, "sim_source_id": sim_source_id, "tags": artifact_tags, "test_config": test_config, "custom_metadata": custom_metadata, "version": version, "description": f"Test artifact {index} for {test_name}", "created_at": created_at } async def upload_artifact_to_storage(file_content: bytes, filename: str) -> str: """Upload file to storage backend""" storage = get_storage_backend() file_extension = filename.split('.')[-1] if '.' in filename else '' object_name = f"{uuid.uuid4()}.{file_extension}" if file_extension else str(uuid.uuid4()) storage_path = await storage.upload_file( io.BytesIO(file_content), object_name ) return storage_path def get_file_type(filename: str) -> str: """Determine file type from filename""" extension = filename.lower().split('.')[-1] type_mapping = { 'csv': 'csv', 'json': 'json', 'pcap': 'pcap', 'pcapng': 'pcap', 'bin': 'binary', 'dat': 'binary', } return type_mapping.get(extension, 'binary') async def seed_predefined_tags() -> List[int]: """ Seed predefined tags into the database. Returns: List of created tag IDs """ db = SessionLocal() tag_ids = [] try: print("Seeding predefined tags...") for tag_data in PREDEFINED_TAGS: # Check if tag already exists existing_tag = db.query(Tag).filter(Tag.name == tag_data["name"]).first() if existing_tag: print(f" Tag '{tag_data['name']}' already exists, skipping...") tag_ids.append(existing_tag.id) continue tag = Tag( name=tag_data["name"], description=tag_data["description"], color=tag_data["color"] ) db.add(tag) db.commit() db.refresh(tag) tag_ids.append(tag.id) print(f" Created tag: {tag_data['name']}") print(f"✓ Successfully seeded {len(tag_ids)} tags") return tag_ids except Exception as e: db.rollback() print(f"✗ Error seeding tags: {e}") raise finally: db.close() async def generate_seed_data(num_artifacts: int = 50) -> List[int]: """ Generate and upload seed data to the database and storage. Args: num_artifacts: Number of artifacts to generate (default: 50) Returns: List of created artifact IDs """ db = SessionLocal() artifact_ids = [] try: # First, seed tags print("Step 1: Seeding tags...") await seed_predefined_tags() print(f"\nStep 2: Generating {num_artifacts} seed artifacts...") print(f"Deployment mode: {settings.deployment_mode}") print(f"Storage backend: {settings.storage_backend}") # Generate SIM source IDs - each source will have 2-4 artifacts num_sim_sources = max(num_artifacts // 3, 1) sim_sources = [f"sim_run_{uuid.uuid4().hex[:8]}" for _ in range(num_sim_sources)] # Pre-assign artifacts to SIM sources to ensure grouping sim_source_assignments = [] for sim_source in sim_sources: # Each SIM source gets 2-4 artifacts num_artifacts_for_source = random.randint(2, 4) sim_source_assignments.extend([sim_source] * num_artifacts_for_source) # Pad remaining artifacts with None (ungrouped) or random sources while len(sim_source_assignments) < num_artifacts: if random.random() < 0.3: # 30% ungrouped sim_source_assignments.append(None) else: sim_source_assignments.append(random.choice(sim_sources)) # Shuffle to randomize order random.shuffle(sim_source_assignments) for i in range(num_artifacts): # Randomly choose file type file_type_choice = random.choice(['csv', 'json', 'binary', 'pcap']) if file_type_choice == 'csv': filename = f"test_results_{i}.csv" content = generate_csv_content() content_type = "text/csv" elif file_type_choice == 'json': filename = f"test_config_{i}.json" content = generate_json_content() content_type = "application/json" elif file_type_choice == 'pcap': filename = f"network_capture_{i}.pcap" content = generate_pcap_content() content_type = "application/vnd.tcpdump.pcap" else: filename = f"test_data_{i}.bin" content = generate_binary_content() content_type = "application/octet-stream" # Upload to storage storage_path = await upload_artifact_to_storage(content, filename) # Get pre-assigned SIM source ID for this artifact sim_source_id = sim_source_assignments[i] # Generate metadata artifact_data = create_artifact_data(i, sim_source_id) # Create database record artifact = Artifact( filename=filename, file_type=get_file_type(filename), file_size=len(content), storage_path=storage_path, content_type=content_type, test_name=artifact_data["test_name"], test_suite=artifact_data["test_suite"], test_config=artifact_data["test_config"], test_result=artifact_data["test_result"], sim_source_id=artifact_data["sim_source_id"], custom_metadata=artifact_data["custom_metadata"], description=artifact_data["description"], tags=artifact_data["tags"], version=artifact_data["version"], created_at=artifact_data["created_at"], updated_at=artifact_data["created_at"] ) db.add(artifact) db.commit() db.refresh(artifact) artifact_ids.append(artifact.id) if (i + 1) % 10 == 0: print(f" Created {i + 1}/{num_artifacts} artifacts...") print(f"✓ Successfully created {len(artifact_ids)} artifacts") return artifact_ids except Exception as e: db.rollback() print(f"✗ Error generating seed data: {e}") raise finally: db.close() async def clear_all_data(): """ Clear all artifacts and tags from database and storage. WARNING: This will delete ALL data! """ db = SessionLocal() storage = get_storage_backend() try: print("Clearing all data...") # Clear artifacts artifacts = db.query(Artifact).all() artifact_count = len(artifacts) if artifact_count > 0: print(f"Found {artifact_count} artifacts to delete...") # Delete from storage and database for i, artifact in enumerate(artifacts): try: # Delete from storage object_name = artifact.storage_path.split('/')[-1] await storage.delete_file(object_name) except Exception as e: print(f" Warning: Could not delete {artifact.filename} from storage: {e}") # Delete from database db.delete(artifact) if (i + 1) % 10 == 0: print(f" Deleted {i + 1}/{artifact_count} artifacts...") db.commit() print(f"✓ Successfully deleted {artifact_count} artifacts") else: print("No artifacts to delete.") # Clear tags tags = db.query(Tag).all() tag_count = len(tags) if tag_count > 0: print(f"Found {tag_count} tags to delete...") for tag in tags: db.delete(tag) db.commit() print(f"✓ Successfully deleted {tag_count} tags") else: print("No tags to delete.") except Exception as e: db.rollback() print(f"✗ Error clearing data: {e}") raise finally: db.close() # CLI interface if __name__ == "__main__": import asyncio import argparse parser = argparse.ArgumentParser(description="Generate or clear seed data for Data Lake") parser.add_argument("action", choices=["generate", "clear"], help="Action to perform") parser.add_argument("--count", type=int, default=50, help="Number of artifacts to generate (default: 50)") args = parser.parse_args() if args.action == "generate": asyncio.run(generate_seed_data(args.count)) elif args.action == "clear": confirm = input("Are you sure you want to delete ALL data? (yes/no): ") if confirm.lower() == "yes": asyncio.run(clear_all_data()) else: print("Aborted.")