""" Test data factories for Orchard backend tests. This module provides factory functions for creating test data, including test files, pre-computed hashes, and helper utilities. """ import hashlib import io import os import uuid from typing import Optional # ============================================================================= # Hash Computation Utilities # ============================================================================= def compute_sha256(content: bytes) -> str: """Compute SHA256 hash of content as lowercase hex string.""" return hashlib.sha256(content).hexdigest() def compute_md5(content: bytes) -> str: """Compute MD5 hash of content as lowercase hex string.""" return hashlib.md5(content).hexdigest() def compute_sha1(content: bytes) -> str: """Compute SHA1 hash of content as lowercase hex string.""" return hashlib.sha1(content).hexdigest() # ============================================================================= # Test File Factories # ============================================================================= def create_test_file(content: Optional[bytes] = None, size: int = 1024) -> io.BytesIO: """ Create a test file with known content. Args: content: Specific content to use, or None to generate random-ish content size: Size of generated content if content is None Returns: BytesIO object with the content """ if content is None: content = os.urandom(size) return io.BytesIO(content) def create_unique_content(prefix: str = "test-content") -> tuple[bytes, str]: """ Create unique test content with its SHA256 hash. Args: prefix: Prefix for the content string Returns: Tuple of (content_bytes, sha256_hash) """ content = f"{prefix}-{uuid.uuid4().hex}".encode() sha256 = compute_sha256(content) return content, sha256 # ============================================================================= # Known Test Data (Pre-computed hashes for deterministic tests) # ============================================================================= TEST_CONTENT_HELLO = b"Hello, World!" TEST_HASH_HELLO = "dffd6021bb2bd5b0af676290809ec3a53191dd81c7f70a4b28688a362182986f" TEST_MD5_HELLO = "65a8e27d8879283831b664bd8b7f0ad4" TEST_SHA1_HELLO = "0a0a9f2a6772942557ab5355d76af442f8f65e01" TEST_CONTENT_EMPTY = b"" # Note: Empty content should be rejected by the storage layer TEST_CONTENT_BINARY = bytes(range(256)) TEST_HASH_BINARY = compute_sha256(TEST_CONTENT_BINARY) # ============================================================================= # API Test Helpers # ============================================================================= def upload_test_file( client, project: str, package: str, content: bytes, filename: str = "test.bin", tag: Optional[str] = None, version: Optional[str] = None, ) -> dict: """ Helper function to upload a test file via the API. Args: client: HTTP client (httpx or TestClient) project: Project name package: Package name content: File content as bytes filename: Original filename tag: Optional tag to assign version: Optional version to assign Returns: The upload response as a dict """ files = {"file": (filename, io.BytesIO(content), "application/octet-stream")} data = {} if tag: data["tag"] = tag if version: data["version"] = version response = client.post( f"/api/v1/project/{project}/{package}/upload", files=files, data=data if data else None, ) assert response.status_code == 200, f"Upload failed: {response.text}" return response.json() def generate_content(size: int, seed: Optional[int] = None) -> bytes: """ Generate deterministic or random content of a specified size. Args: size: Size of content in bytes seed: Optional seed for reproducible content (None for random) Returns: Bytes of the specified size """ if size == 0: return b"" if seed is not None: import random rng = random.Random(seed) return bytes(rng.randint(0, 255) for _ in range(size)) return os.urandom(size) def generate_content_with_hash(size: int, seed: Optional[int] = None) -> tuple[bytes, str]: """ Generate content of specified size and compute its SHA256 hash. Args: size: Size of content in bytes seed: Optional seed for reproducible content Returns: Tuple of (content_bytes, sha256_hash) """ content = generate_content(size, seed) return content, compute_sha256(content) # ============================================================================= # Project/Package Factories # ============================================================================= def create_test_project(client, unique_id: Optional[str] = None) -> str: """ Create a test project via the API. Args: client: HTTP client unique_id: Unique identifier for the project name Returns: Project name """ if unique_id is None: unique_id = uuid.uuid4().hex[:8] project_name = f"test-project-{unique_id}" response = client.post( "/api/v1/projects", json={"name": project_name, "description": "Test project", "is_public": True}, ) assert response.status_code == 200, f"Failed to create project: {response.text}" return project_name def create_test_package(client, project: str, unique_id: Optional[str] = None) -> str: """ Create a test package via the API. Args: client: HTTP client project: Project name unique_id: Unique identifier for the package name Returns: Package name """ if unique_id is None: unique_id = uuid.uuid4().hex[:8] package_name = f"test-package-{unique_id}" response = client.post( f"/api/v1/project/{project}/packages", json={"name": package_name, "description": "Test package"}, ) assert response.status_code == 200, f"Failed to create package: {response.text}" return package_name def delete_test_project(client, project: str) -> None: """ Delete a test project (cleanup helper). Args: client: HTTP client project: Project name to delete """ try: client.delete(f"/api/v1/projects/{project}") except Exception: pass # Ignore cleanup errors # ============================================================================= # S3 Test Helpers # ============================================================================= def get_s3_client(): """ Create a boto3 S3 client for direct S3 access in integration tests. Uses environment variables for configuration (same as the app). Note: When running in container, S3 endpoint should be 'minio:9000' not 'localhost:9000'. """ import boto3 from botocore.config import Config config = Config(s3={"addressing_style": "path"}) # Use the same endpoint as the app (minio:9000 in container, localhost:9000 locally) endpoint = os.environ.get("ORCHARD_S3_ENDPOINT", "http://minio:9000") return boto3.client( "s3", endpoint_url=endpoint, region_name=os.environ.get("ORCHARD_S3_REGION", "us-east-1"), aws_access_key_id=os.environ.get("ORCHARD_S3_ACCESS_KEY_ID", "minioadmin"), aws_secret_access_key=os.environ.get( "ORCHARD_S3_SECRET_ACCESS_KEY", "minioadmin" ), config=config, ) def get_s3_bucket() -> str: """Get the S3 bucket name from environment.""" return os.environ.get("ORCHARD_S3_BUCKET", "orchard-artifacts") def list_s3_objects_by_hash(sha256_hash: str) -> list: """ List S3 objects that match a specific SHA256 hash. Uses the fruits/{hash[:2]}/{hash[2:4]}/{hash} key pattern. Returns list of matching object keys. """ client = get_s3_client() bucket = get_s3_bucket() prefix = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}" response = client.list_objects_v2(Bucket=bucket, Prefix=prefix) if "Contents" not in response: return [] return [obj["Key"] for obj in response["Contents"]] def count_s3_objects_by_prefix(prefix: str) -> int: """ Count S3 objects with a given prefix. Useful for checking if duplicate uploads created multiple objects. """ client = get_s3_client() bucket = get_s3_bucket() response = client.list_objects_v2(Bucket=bucket, Prefix=prefix) if "Contents" not in response: return 0 return len(response["Contents"]) def s3_object_exists(sha256_hash: str) -> bool: """ Check if an S3 object exists for a given SHA256 hash. """ objects = list_s3_objects_by_hash(sha256_hash) return len(objects) > 0 def delete_s3_object_by_hash(sha256_hash: str) -> bool: """ Delete an S3 object by its SHA256 hash (for test cleanup). """ client = get_s3_client() bucket = get_s3_bucket() s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}" try: client.delete_object(Bucket=bucket, Key=s3_key) return True except Exception: return False