From 14806b05f041d621fee413075388c57f5d6ced20 Mon Sep 17 00:00:00 2001 From: Mondo Diaz Date: Wed, 4 Feb 2026 09:16:27 -0600 Subject: [PATCH] feat: add HttpClientManager with connection pooling Add HttpClientManager class for managing httpx.AsyncClient pools with FastAPI lifespan integration. Features include: - Default shared connection pool for general requests - Configurable max connections, keep-alive, and timeouts - Dedicated thread pool for blocking I/O operations - Graceful startup/shutdown lifecycle management - Per-upstream client isolation support (for future use) Includes comprehensive unit tests covering initialization, startup, shutdown, client retrieval, blocking operations, idempotency, and error handling. --- backend/app/http_client.py | 179 +++++++++++++++++++++++ backend/tests/unit/test_http_client.py | 194 +++++++++++++++++++++++++ 2 files changed, 373 insertions(+) create mode 100644 backend/app/http_client.py create mode 100644 backend/tests/unit/test_http_client.py diff --git a/backend/app/http_client.py b/backend/app/http_client.py new file mode 100644 index 0000000..d838675 --- /dev/null +++ b/backend/app/http_client.py @@ -0,0 +1,179 @@ +""" +HTTP client manager with connection pooling and lifecycle management. + +Provides: +- Shared connection pools for upstream requests +- Per-upstream client isolation when needed +- Thread pool for blocking I/O operations +- FastAPI lifespan integration +""" + +import asyncio +import logging +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Callable, Optional + +import httpx + +from .config import Settings + +logger = logging.getLogger(__name__) + + +class HttpClientManager: + """ + Manages httpx.AsyncClient pools with FastAPI lifespan integration. + + Features: + - Default shared pool for general requests + - Per-upstream pools for sources needing specific config/auth + - Dedicated thread pool for blocking operations + - Graceful shutdown + """ + + def __init__(self, settings: Settings): + self.max_connections = settings.http_max_connections + self.max_keepalive = settings.http_max_keepalive + self.connect_timeout = settings.http_connect_timeout + self.read_timeout = settings.http_read_timeout + self.worker_threads = settings.http_worker_threads + + self._default_client: Optional[httpx.AsyncClient] = None + self._upstream_clients: dict[str, httpx.AsyncClient] = {} + self._executor: Optional[ThreadPoolExecutor] = None + self._started = False + + async def startup(self) -> None: + """Initialize clients and thread pool. Called by FastAPI lifespan.""" + if self._started: + return + + logger.info( + f"Starting HttpClientManager: max_connections={self.max_connections}, " + f"worker_threads={self.worker_threads}" + ) + + # Create connection limits + limits = httpx.Limits( + max_connections=self.max_connections, + max_keepalive_connections=self.max_keepalive, + ) + + # Create timeout config + timeout = httpx.Timeout( + connect=self.connect_timeout, + read=self.read_timeout, + write=self.read_timeout, + pool=self.connect_timeout, + ) + + # Create default client + self._default_client = httpx.AsyncClient( + limits=limits, + timeout=timeout, + follow_redirects=False, # Handle redirects manually for auth + ) + + # Create thread pool for blocking operations + self._executor = ThreadPoolExecutor( + max_workers=self.worker_threads, + thread_name_prefix="orchard-blocking-", + ) + + self._started = True + logger.info("HttpClientManager started") + + async def shutdown(self) -> None: + """Close all clients and thread pool. Called by FastAPI lifespan.""" + if not self._started: + return + + logger.info("Shutting down HttpClientManager") + + # Close default client + if self._default_client: + await self._default_client.aclose() + self._default_client = None + + # Close upstream-specific clients + for name, client in self._upstream_clients.items(): + logger.debug(f"Closing upstream client: {name}") + await client.aclose() + self._upstream_clients.clear() + + # Shutdown thread pool + if self._executor: + self._executor.shutdown(wait=True) + self._executor = None + + self._started = False + logger.info("HttpClientManager shutdown complete") + + def get_client(self, upstream_name: Optional[str] = None) -> httpx.AsyncClient: + """ + Get HTTP client for making requests. + + Args: + upstream_name: Optional upstream source name for dedicated pool. + If None, returns the default shared client. + + Returns: + httpx.AsyncClient configured for the request. + + Raises: + RuntimeError: If manager not started. + """ + if not self._started or not self._default_client: + raise RuntimeError("HttpClientManager not started. Call startup() first.") + + if upstream_name and upstream_name in self._upstream_clients: + return self._upstream_clients[upstream_name] + + return self._default_client + + async def run_blocking(self, func: Callable[..., Any], *args: Any) -> Any: + """ + Run a blocking function in the thread pool. + + Use this for: + - File I/O operations + - Archive extraction (zipfile, tarfile) + - Hash computation on large data + + Args: + func: Synchronous function to execute + *args: Arguments to pass to the function + + Returns: + The function's return value. + """ + if not self._executor: + raise RuntimeError("HttpClientManager not started. Call startup() first.") + + loop = asyncio.get_event_loop() + return await loop.run_in_executor(self._executor, func, *args) + + @property + def active_connections(self) -> int: + """Get approximate number of active connections (for health checks).""" + if not self._default_client: + return 0 + # httpx doesn't expose this directly, return pool size as approximation + return self.max_connections + + @property + def pool_size(self) -> int: + """Get configured pool size.""" + return self.max_connections + + @property + def executor_active(self) -> int: + """Get number of active thread pool workers.""" + if not self._executor: + return 0 + return len(self._executor._threads) + + @property + def executor_max(self) -> int: + """Get max thread pool workers.""" + return self.worker_threads diff --git a/backend/tests/unit/test_http_client.py b/backend/tests/unit/test_http_client.py new file mode 100644 index 0000000..ccbe498 --- /dev/null +++ b/backend/tests/unit/test_http_client.py @@ -0,0 +1,194 @@ +"""Tests for HttpClientManager.""" +import pytest +from unittest.mock import MagicMock, AsyncMock, patch + + +class TestHttpClientManager: + """Tests for HTTP client pool management.""" + + @pytest.mark.unit + def test_manager_initializes_with_settings(self): + """Manager should initialize with config settings.""" + from app.http_client import HttpClientManager + from app.config import Settings + + settings = Settings( + http_max_connections=50, + http_connect_timeout=15.0, + ) + manager = HttpClientManager(settings) + + assert manager.max_connections == 50 + assert manager.connect_timeout == 15.0 + assert manager._default_client is None # Not started yet + + @pytest.mark.asyncio + @pytest.mark.unit + async def test_startup_creates_client(self): + """Startup should create the default async client.""" + from app.http_client import HttpClientManager + from app.config import Settings + + settings = Settings() + manager = HttpClientManager(settings) + + await manager.startup() + + assert manager._default_client is not None + + await manager.shutdown() + + @pytest.mark.asyncio + @pytest.mark.unit + async def test_shutdown_closes_client(self): + """Shutdown should close all clients gracefully.""" + from app.http_client import HttpClientManager + from app.config import Settings + + settings = Settings() + manager = HttpClientManager(settings) + + await manager.startup() + client = manager._default_client + + await manager.shutdown() + + assert manager._default_client is None + assert client.is_closed + + @pytest.mark.asyncio + @pytest.mark.unit + async def test_get_client_returns_default(self): + """get_client() should return the default client.""" + from app.http_client import HttpClientManager + from app.config import Settings + + settings = Settings() + manager = HttpClientManager(settings) + await manager.startup() + + client = manager.get_client() + + assert client is manager._default_client + + await manager.shutdown() + + @pytest.mark.asyncio + @pytest.mark.unit + async def test_get_client_raises_if_not_started(self): + """get_client() should raise RuntimeError if manager not started.""" + from app.http_client import HttpClientManager + from app.config import Settings + + settings = Settings() + manager = HttpClientManager(settings) + + with pytest.raises(RuntimeError, match="not started"): + manager.get_client() + + @pytest.mark.asyncio + @pytest.mark.unit + async def test_run_blocking_executes_in_thread_pool(self): + """run_blocking should execute sync functions in thread pool.""" + from app.http_client import HttpClientManager + from app.config import Settings + import threading + + settings = Settings() + manager = HttpClientManager(settings) + await manager.startup() + + main_thread = threading.current_thread() + execution_thread = None + + def blocking_func(): + nonlocal execution_thread + execution_thread = threading.current_thread() + return "result" + + result = await manager.run_blocking(blocking_func) + + assert result == "result" + assert execution_thread is not main_thread + + await manager.shutdown() + + @pytest.mark.asyncio + @pytest.mark.unit + async def test_run_blocking_raises_if_not_started(self): + """run_blocking should raise RuntimeError if manager not started.""" + from app.http_client import HttpClientManager + from app.config import Settings + + settings = Settings() + manager = HttpClientManager(settings) + + with pytest.raises(RuntimeError, match="not started"): + await manager.run_blocking(lambda: None) + + @pytest.mark.asyncio + @pytest.mark.unit + async def test_startup_idempotent(self): + """Calling startup multiple times should be safe.""" + from app.http_client import HttpClientManager + from app.config import Settings + + settings = Settings() + manager = HttpClientManager(settings) + + await manager.startup() + client1 = manager._default_client + + await manager.startup() # Should not create a new client + client2 = manager._default_client + + assert client1 is client2 # Same client instance + + await manager.shutdown() + + @pytest.mark.asyncio + @pytest.mark.unit + async def test_shutdown_idempotent(self): + """Calling shutdown multiple times should be safe.""" + from app.http_client import HttpClientManager + from app.config import Settings + + settings = Settings() + manager = HttpClientManager(settings) + + await manager.startup() + await manager.shutdown() + await manager.shutdown() # Should not raise + + assert manager._default_client is None + + @pytest.mark.asyncio + @pytest.mark.unit + async def test_properties_return_configured_values(self): + """Properties should return configured values.""" + from app.http_client import HttpClientManager + from app.config import Settings + + settings = Settings( + http_max_connections=75, + http_worker_threads=16, + ) + manager = HttpClientManager(settings) + await manager.startup() + + assert manager.pool_size == 75 + assert manager.executor_max == 16 + + await manager.shutdown() + + @pytest.mark.asyncio + @pytest.mark.unit + async def test_active_connections_when_not_started(self): + """active_connections should return 0 when not started.""" + from app.http_client import HttpClientManager + from app.config import Settings + + settings = Settings() + manager = HttpClientManager(settings) + + assert manager.active_connections == 0