""" HTTP client manager with connection pooling and lifecycle management. Provides: - Shared connection pools for upstream requests - Per-upstream client isolation when needed - Thread pool for blocking I/O operations - FastAPI lifespan integration """ import asyncio import logging from concurrent.futures import ThreadPoolExecutor from typing import Any, Callable, Optional import httpx from .config import Settings logger = logging.getLogger(__name__) class HttpClientManager: """ Manages httpx.AsyncClient pools with FastAPI lifespan integration. Features: - Default shared pool for general requests - Per-upstream pools for sources needing specific config/auth - Dedicated thread pool for blocking operations - Graceful shutdown """ def __init__(self, settings: Settings): self.max_connections = settings.http_max_connections self.max_keepalive = settings.http_max_keepalive self.connect_timeout = settings.http_connect_timeout self.read_timeout = settings.http_read_timeout self.worker_threads = settings.http_worker_threads self._default_client: Optional[httpx.AsyncClient] = None self._upstream_clients: dict[str, httpx.AsyncClient] = {} self._executor: Optional[ThreadPoolExecutor] = None self._started = False async def startup(self) -> None: """Initialize clients and thread pool. Called by FastAPI lifespan.""" if self._started: return logger.info( f"Starting HttpClientManager: max_connections={self.max_connections}, " f"worker_threads={self.worker_threads}" ) # Create connection limits limits = httpx.Limits( max_connections=self.max_connections, max_keepalive_connections=self.max_keepalive, ) # Create timeout config timeout = httpx.Timeout( connect=self.connect_timeout, read=self.read_timeout, write=self.read_timeout, pool=self.connect_timeout, ) # Create default client self._default_client = httpx.AsyncClient( limits=limits, timeout=timeout, follow_redirects=False, # Handle redirects manually for auth ) # Create thread pool for blocking operations self._executor = ThreadPoolExecutor( max_workers=self.worker_threads, thread_name_prefix="orchard-blocking-", ) self._started = True logger.info("HttpClientManager started") async def shutdown(self) -> None: """Close all clients and thread pool. Called by FastAPI lifespan.""" if not self._started: return logger.info("Shutting down HttpClientManager") # Close default client if self._default_client: await self._default_client.aclose() self._default_client = None # Close upstream-specific clients for name, client in self._upstream_clients.items(): logger.debug(f"Closing upstream client: {name}") await client.aclose() self._upstream_clients.clear() # Shutdown thread pool if self._executor: self._executor.shutdown(wait=True) self._executor = None self._started = False logger.info("HttpClientManager shutdown complete") def get_client(self, upstream_name: Optional[str] = None) -> httpx.AsyncClient: """ Get HTTP client for making requests. Args: upstream_name: Optional upstream source name for dedicated pool. If None, returns the default shared client. Returns: httpx.AsyncClient configured for the request. Raises: RuntimeError: If manager not started. """ if not self._started or not self._default_client: raise RuntimeError("HttpClientManager not started. Call startup() first.") if upstream_name and upstream_name in self._upstream_clients: return self._upstream_clients[upstream_name] return self._default_client async def run_blocking(self, func: Callable[..., Any], *args: Any) -> Any: """ Run a blocking function in the thread pool. Use this for: - File I/O operations - Archive extraction (zipfile, tarfile) - Hash computation on large data Args: func: Synchronous function to execute *args: Arguments to pass to the function Returns: The function's return value. """ if not self._executor: raise RuntimeError("HttpClientManager not started. Call startup() first.") loop = asyncio.get_running_loop() return await loop.run_in_executor(self._executor, func, *args) @property def active_connections(self) -> int: """Get approximate number of active connections (for health checks).""" if not self._default_client: return 0 # httpx doesn't expose this directly, return pool size as approximation return self.max_connections @property def pool_size(self) -> int: """Get configured pool size.""" return self.max_connections @property def executor_active(self) -> int: """Get number of active thread pool workers.""" if not self._executor: return 0 return len(self._executor._threads) @property def executor_max(self) -> int: """Get max thread pool workers.""" return self.worker_threads