#!/usr/bin/env python """ Utility functions for generating seed data for testing the Data Lake. This module provides functions to: - Generate random test artifacts (CSV, JSON, binary, PCAP files) - Upload them to the database and storage backend - Clear all data for testing purposes """ import os import sys import io import random import json import csv from datetime import datetime, timedelta from typing import List, Dict, Any import uuid # Add parent directory to path to import app modules sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from app.database import SessionLocal from app.models.artifact import Artifact from app.storage import get_storage_backend from app.config import settings # Sample data for generating realistic test artifacts TEST_NAMES = [ "user_authentication", "payment_processing", "data_validation", "api_endpoint_test", "database_connection", "file_upload", "performance_test", "stress_test", "security_scan", "regression_test", "smoke_test", "integration_test" ] TEST_SUITES = [ "authentication", "payments", "api", "database", "ui", "performance", "security", "integration" ] TEST_RESULTS = ["pass", "fail", "skip", "error"] TAGS = [ "regression", "smoke", "critical", "high-priority", "automated", "manual", "performance", "security", "integration", "unit", "e2e", "api" ] def generate_csv_content() -> bytes: """Generate random CSV test data""" output = io.StringIO() writer = csv.writer(output) # Header writer.writerow(["timestamp", "test_case", "result", "duration_ms", "error_message"]) # Random rows num_rows = random.randint(10, 100) for i in range(num_rows): timestamp = datetime.now() - timedelta(minutes=random.randint(0, 1000)) test_case = f"test_case_{random.randint(1, 50)}" result = random.choice(TEST_RESULTS) duration = random.randint(100, 5000) error = "" if result == "pass" else f"Error_{random.randint(1, 10)}" writer.writerow([timestamp.isoformat(), test_case, result, duration, error]) return output.getvalue().encode('utf-8') def generate_json_content() -> bytes: """Generate random JSON test configuration""" config = { "test_run_id": str(uuid.uuid4()), "timestamp": datetime.now().isoformat(), "environment": random.choice(["dev", "staging", "prod"]), "browser": random.choice(["chrome", "firefox", "safari", "edge"]), "timeout": random.randint(30, 300), "retries": random.randint(0, 3), "parallel_threads": random.randint(1, 10), "test_data": { "users": random.randint(10, 1000), "iterations": random.randint(1, 100), "success_rate": round(random.uniform(0.7, 1.0), 2) } } return json.dumps(config, indent=2).encode('utf-8') def generate_binary_content() -> bytes: """Generate random binary data""" size = random.randint(1024, 10240) # 1-10KB return os.urandom(size) def generate_pcap_content() -> bytes: """Generate fake PCAP file header (simplified)""" # This is a simplified PCAP file header for demonstration # Real PCAP files would have proper packet data pcap_header = bytearray([ 0xd4, 0xc3, 0xb2, 0xa1, # Magic number 0x02, 0x00, 0x04, 0x00, # Version 0x00, 0x00, 0x00, 0x00, # Timezone 0x00, 0x00, 0x00, 0x00, # Timestamp accuracy 0xff, 0xff, 0x00, 0x00, # Snapshot length 0x01, 0x00, 0x00, 0x00 # Link-layer type ]) # Add some random data to simulate packets pcap_header.extend(os.urandom(random.randint(500, 2000))) return bytes(pcap_header) def create_artifact_data(index: int) -> Dict[str, Any]: """Generate metadata for an artifact""" test_name = random.choice(TEST_NAMES) test_suite = random.choice(TEST_SUITES) test_result = random.choice(TEST_RESULTS) # Generate random tags (1-4 tags) num_tags = random.randint(1, 4) artifact_tags = random.sample(TAGS, num_tags) # Generate test config test_config = { "environment": random.choice(["dev", "staging", "prod"]), "timeout": random.randint(30, 300), "retries": random.randint(0, 3) } # Generate custom metadata custom_metadata = { "build_number": random.randint(1000, 9999), "commit_hash": uuid.uuid4().hex[:8], "triggered_by": random.choice(["manual", "scheduled", "webhook"]) } # Random version version = f"v{random.randint(1, 5)}.{random.randint(0, 10)}.{random.randint(0, 20)}" # Random creation date (within last 30 days) created_days_ago = random.randint(0, 30) created_at = datetime.now() - timedelta(days=created_days_ago, hours=random.randint(0, 23)) return { "test_name": test_name, "test_suite": test_suite, "test_result": test_result, "tags": artifact_tags, "test_config": test_config, "custom_metadata": custom_metadata, "version": version, "description": f"Test artifact {index} for {test_name}", "created_at": created_at } async def upload_artifact_to_storage(file_content: bytes, filename: str) -> str: """Upload file to storage backend""" storage = get_storage_backend() file_extension = filename.split('.')[-1] if '.' in filename else '' object_name = f"{uuid.uuid4()}.{file_extension}" if file_extension else str(uuid.uuid4()) storage_path = await storage.upload_file( io.BytesIO(file_content), object_name ) return storage_path def get_file_type(filename: str) -> str: """Determine file type from filename""" extension = filename.lower().split('.')[-1] type_mapping = { 'csv': 'csv', 'json': 'json', 'pcap': 'pcap', 'pcapng': 'pcap', 'bin': 'binary', 'dat': 'binary', } return type_mapping.get(extension, 'binary') async def generate_seed_data(num_artifacts: int = 50) -> List[int]: """ Generate and upload seed data to the database and storage. Args: num_artifacts: Number of artifacts to generate (default: 50) Returns: List of created artifact IDs """ db = SessionLocal() artifact_ids = [] try: print(f"Generating {num_artifacts} seed artifacts...") print(f"Deployment mode: {settings.deployment_mode}") print(f"Storage backend: {settings.storage_backend}") for i in range(num_artifacts): # Randomly choose file type file_type_choice = random.choice(['csv', 'json', 'binary', 'pcap']) if file_type_choice == 'csv': filename = f"test_results_{i}.csv" content = generate_csv_content() content_type = "text/csv" elif file_type_choice == 'json': filename = f"test_config_{i}.json" content = generate_json_content() content_type = "application/json" elif file_type_choice == 'pcap': filename = f"network_capture_{i}.pcap" content = generate_pcap_content() content_type = "application/vnd.tcpdump.pcap" else: filename = f"test_data_{i}.bin" content = generate_binary_content() content_type = "application/octet-stream" # Upload to storage storage_path = await upload_artifact_to_storage(content, filename) # Generate metadata artifact_data = create_artifact_data(i) # Create database record artifact = Artifact( filename=filename, file_type=get_file_type(filename), file_size=len(content), storage_path=storage_path, content_type=content_type, test_name=artifact_data["test_name"], test_suite=artifact_data["test_suite"], test_config=artifact_data["test_config"], test_result=artifact_data["test_result"], custom_metadata=artifact_data["custom_metadata"], description=artifact_data["description"], tags=artifact_data["tags"], version=artifact_data["version"], created_at=artifact_data["created_at"], updated_at=artifact_data["created_at"] ) db.add(artifact) db.commit() db.refresh(artifact) artifact_ids.append(artifact.id) if (i + 1) % 10 == 0: print(f" Created {i + 1}/{num_artifacts} artifacts...") print(f"✓ Successfully created {len(artifact_ids)} artifacts") return artifact_ids except Exception as e: db.rollback() print(f"✗ Error generating seed data: {e}") raise finally: db.close() async def clear_all_data(): """ Clear all artifacts from database and storage. WARNING: This will delete ALL data! """ db = SessionLocal() storage = get_storage_backend() try: print("Clearing all artifacts...") # Get all artifacts artifacts = db.query(Artifact).all() count = len(artifacts) if count == 0: print("No artifacts to delete.") return print(f"Found {count} artifacts to delete...") # Delete from storage and database for i, artifact in enumerate(artifacts): try: # Delete from storage object_name = artifact.storage_path.split('/')[-1] await storage.delete_file(object_name) except Exception as e: print(f" Warning: Could not delete {artifact.filename} from storage: {e}") # Delete from database db.delete(artifact) if (i + 1) % 10 == 0: print(f" Deleted {i + 1}/{count} artifacts...") db.commit() print(f"✓ Successfully deleted {count} artifacts") except Exception as e: db.rollback() print(f"✗ Error clearing data: {e}") raise finally: db.close() # CLI interface if __name__ == "__main__": import asyncio import argparse parser = argparse.ArgumentParser(description="Generate or clear seed data for Data Lake") parser.add_argument("action", choices=["generate", "clear"], help="Action to perform") parser.add_argument("--count", type=int, default=50, help="Number of artifacts to generate (default: 50)") args = parser.parse_args() if args.action == "generate": asyncio.run(generate_seed_data(args.count)) elif args.action == "clear": confirm = input("Are you sure you want to delete ALL data? (yes/no): ") if confirm.lower() == "yes": asyncio.run(clear_all_data()) else: print("Aborted.")