from pydantic_settings import BaseSettings from functools import lru_cache from typing import Optional import os import re class Settings(BaseSettings): # Environment env: str = "development" # "development" or "production" # Server server_host: str = "0.0.0.0" server_port: int = 8080 # Database database_host: str = "localhost" database_port: int = 5432 database_user: str = "orchard" database_password: str = "" database_dbname: str = "orchard" database_sslmode: str = "disable" # Database connection pool settings database_pool_size: int = 5 # Number of connections to keep open database_max_overflow: int = 10 # Max additional connections beyond pool_size database_pool_timeout: int = 30 # Seconds to wait for a connection from pool database_pool_recycle: int = ( 1800 # Recycle connections after this many seconds (30 min) ) database_query_timeout: int = 30 # Query timeout in seconds (0 = no timeout) # S3 s3_endpoint: str = "" s3_region: str = "us-east-1" s3_bucket: str = "orchard-artifacts" s3_access_key_id: str = "" s3_secret_access_key: str = "" s3_use_path_style: bool = True s3_verify_ssl: bool = True # Set to False for self-signed certs (dev only) s3_connect_timeout: int = 10 # Connection timeout in seconds s3_read_timeout: int = 60 # Read timeout in seconds s3_max_retries: int = 3 # Max retry attempts for transient failures # Upload settings max_file_size: int = 10 * 1024 * 1024 * 1024 # 10GB default max file size min_file_size: int = 1 # Minimum 1 byte (empty files rejected) # Download settings download_mode: str = "presigned" # "presigned", "redirect", or "proxy" presigned_url_expiry: int = ( 3600 # Presigned URL expiry in seconds (default: 1 hour) ) pypi_download_mode: str = "redirect" # "redirect" (to S3) or "proxy" (stream through Orchard) # Logging settings log_level: str = "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL log_format: str = "auto" # "json", "standard", or "auto" (json in production) # Initial admin user settings admin_password: str = "" # Initial admin password (if empty, uses 'changeme123') # Cache settings cache_encryption_key: str = "" # Fernet key for encrypting upstream credentials (auto-generated if empty) # Global cache settings override (None = use DB value, True/False = override DB) cache_auto_create_system_projects: Optional[bool] = None # Override auto_create_system_projects # PyPI Cache Worker settings pypi_cache_workers: int = 5 # Number of concurrent cache workers pypi_cache_max_depth: int = 10 # Maximum recursion depth for dependency caching pypi_cache_max_attempts: int = 3 # Maximum retry attempts for failed cache tasks # JWT Authentication settings (optional, for external identity providers) jwt_enabled: bool = False # Enable JWT token validation jwt_secret: str = "" # Secret key for HS256, or leave empty for RS256 with JWKS jwt_algorithm: str = "HS256" # HS256 or RS256 jwt_issuer: str = "" # Expected issuer (iss claim), leave empty to skip validation jwt_audience: str = "" # Expected audience (aud claim), leave empty to skip validation jwt_jwks_url: str = "" # JWKS URL for RS256 (e.g., https://auth.example.com/.well-known/jwks.json) jwt_username_claim: str = ( "sub" # JWT claim to use as username (sub, email, preferred_username, etc.) ) @property def database_url(self) -> str: sslmode = f"?sslmode={self.database_sslmode}" if self.database_sslmode else "" return f"postgresql://{self.database_user}:{self.database_password}@{self.database_host}:{self.database_port}/{self.database_dbname}{sslmode}" @property def is_development(self) -> bool: return self.env.lower() == "development" @property def is_production(self) -> bool: return self.env.lower() == "production" @property def PORT(self) -> int: """Alias for server_port for compatibility.""" return self.server_port # Uppercase aliases for PyPI cache settings (for backward compatibility) @property def PYPI_CACHE_WORKERS(self) -> int: return self.pypi_cache_workers @property def PYPI_CACHE_MAX_DEPTH(self) -> int: return self.pypi_cache_max_depth @property def PYPI_CACHE_MAX_ATTEMPTS(self) -> int: return self.pypi_cache_max_attempts class Config: env_prefix = "ORCHARD_" case_sensitive = False @lru_cache() def get_settings() -> Settings: return Settings() class EnvUpstreamSource: """Represents an upstream source defined via environment variables.""" def __init__( self, name: str, url: str, source_type: str = "generic", enabled: bool = True, auth_type: str = "none", username: Optional[str] = None, password: Optional[str] = None, priority: int = 100, ): self.name = name self.url = url self.source_type = source_type self.enabled = enabled self.auth_type = auth_type self.username = username self.password = password self.priority = priority self.source = "env" # Mark as env-defined def parse_upstream_sources_from_env() -> list[EnvUpstreamSource]: """ Parse upstream sources from environment variables. Uses double underscore (__) as separator to allow source names with single underscores. Pattern: ORCHARD_UPSTREAM__{NAME}__FIELD Example: ORCHARD_UPSTREAM__NPM_PRIVATE__URL=https://npm.corp.com ORCHARD_UPSTREAM__NPM_PRIVATE__TYPE=npm ORCHARD_UPSTREAM__NPM_PRIVATE__ENABLED=true ORCHARD_UPSTREAM__NPM_PRIVATE__AUTH_TYPE=basic ORCHARD_UPSTREAM__NPM_PRIVATE__USERNAME=reader ORCHARD_UPSTREAM__NPM_PRIVATE__PASSWORD=secret Returns: List of EnvUpstreamSource objects parsed from environment variables. """ # Pattern: ORCHARD_UPSTREAM__{NAME}__{FIELD} pattern = re.compile(r"^ORCHARD_UPSTREAM__([A-Z0-9_]+)__([A-Z_]+)$", re.IGNORECASE) # Collect all env vars matching the pattern, grouped by source name sources_data: dict[str, dict[str, str]] = {} for key, value in os.environ.items(): match = pattern.match(key) if match: source_name = match.group(1).lower() # Normalize to lowercase field = match.group(2).upper() if source_name not in sources_data: sources_data[source_name] = {} sources_data[source_name][field] = value # Build source objects from collected data sources: list[EnvUpstreamSource] = [] for name, data in sources_data.items(): # URL is required url = data.get("URL") if not url: continue # Skip sources without URL # Parse boolean fields def parse_bool(val: Optional[str], default: bool) -> bool: if val is None: return default return val.lower() in ("true", "1", "yes", "on") # Parse integer fields def parse_int(val: Optional[str], default: int) -> int: if val is None: return default try: return int(val) except ValueError: return default source = EnvUpstreamSource( name=name.replace("_", "-"), # Convert underscores to hyphens for readability url=url, source_type=data.get("TYPE", "generic").lower(), enabled=parse_bool(data.get("ENABLED"), True), auth_type=data.get("AUTH_TYPE", "none").lower(), username=data.get("USERNAME"), password=data.get("PASSWORD"), priority=parse_int(data.get("PRIORITY"), 100), ) sources.append(source) return sources @lru_cache() def get_env_upstream_sources() -> tuple[EnvUpstreamSource, ...]: """ Get cached list of upstream sources from environment variables. Returns a tuple for hashability (required by lru_cache). """ return tuple(parse_upstream_sources_from_env())