feat: add HttpClientManager with connection pooling
Add HttpClientManager class for managing httpx.AsyncClient pools with FastAPI lifespan integration. Features include: - Default shared connection pool for general requests - Configurable max connections, keep-alive, and timeouts - Dedicated thread pool for blocking I/O operations - Graceful startup/shutdown lifecycle management - Per-upstream client isolation support (for future use) Includes comprehensive unit tests covering initialization, startup, shutdown, client retrieval, blocking operations, idempotency, and error handling.
This commit is contained in:
179
backend/app/http_client.py
Normal file
179
backend/app/http_client.py
Normal file
@@ -0,0 +1,179 @@
|
|||||||
|
"""
|
||||||
|
HTTP client manager with connection pooling and lifecycle management.
|
||||||
|
|
||||||
|
Provides:
|
||||||
|
- Shared connection pools for upstream requests
|
||||||
|
- Per-upstream client isolation when needed
|
||||||
|
- Thread pool for blocking I/O operations
|
||||||
|
- FastAPI lifespan integration
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from typing import Any, Callable, Optional
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from .config import Settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class HttpClientManager:
|
||||||
|
"""
|
||||||
|
Manages httpx.AsyncClient pools with FastAPI lifespan integration.
|
||||||
|
|
||||||
|
Features:
|
||||||
|
- Default shared pool for general requests
|
||||||
|
- Per-upstream pools for sources needing specific config/auth
|
||||||
|
- Dedicated thread pool for blocking operations
|
||||||
|
- Graceful shutdown
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, settings: Settings):
|
||||||
|
self.max_connections = settings.http_max_connections
|
||||||
|
self.max_keepalive = settings.http_max_keepalive
|
||||||
|
self.connect_timeout = settings.http_connect_timeout
|
||||||
|
self.read_timeout = settings.http_read_timeout
|
||||||
|
self.worker_threads = settings.http_worker_threads
|
||||||
|
|
||||||
|
self._default_client: Optional[httpx.AsyncClient] = None
|
||||||
|
self._upstream_clients: dict[str, httpx.AsyncClient] = {}
|
||||||
|
self._executor: Optional[ThreadPoolExecutor] = None
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
async def startup(self) -> None:
|
||||||
|
"""Initialize clients and thread pool. Called by FastAPI lifespan."""
|
||||||
|
if self._started:
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Starting HttpClientManager: max_connections={self.max_connections}, "
|
||||||
|
f"worker_threads={self.worker_threads}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create connection limits
|
||||||
|
limits = httpx.Limits(
|
||||||
|
max_connections=self.max_connections,
|
||||||
|
max_keepalive_connections=self.max_keepalive,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create timeout config
|
||||||
|
timeout = httpx.Timeout(
|
||||||
|
connect=self.connect_timeout,
|
||||||
|
read=self.read_timeout,
|
||||||
|
write=self.read_timeout,
|
||||||
|
pool=self.connect_timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create default client
|
||||||
|
self._default_client = httpx.AsyncClient(
|
||||||
|
limits=limits,
|
||||||
|
timeout=timeout,
|
||||||
|
follow_redirects=False, # Handle redirects manually for auth
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create thread pool for blocking operations
|
||||||
|
self._executor = ThreadPoolExecutor(
|
||||||
|
max_workers=self.worker_threads,
|
||||||
|
thread_name_prefix="orchard-blocking-",
|
||||||
|
)
|
||||||
|
|
||||||
|
self._started = True
|
||||||
|
logger.info("HttpClientManager started")
|
||||||
|
|
||||||
|
async def shutdown(self) -> None:
|
||||||
|
"""Close all clients and thread pool. Called by FastAPI lifespan."""
|
||||||
|
if not self._started:
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("Shutting down HttpClientManager")
|
||||||
|
|
||||||
|
# Close default client
|
||||||
|
if self._default_client:
|
||||||
|
await self._default_client.aclose()
|
||||||
|
self._default_client = None
|
||||||
|
|
||||||
|
# Close upstream-specific clients
|
||||||
|
for name, client in self._upstream_clients.items():
|
||||||
|
logger.debug(f"Closing upstream client: {name}")
|
||||||
|
await client.aclose()
|
||||||
|
self._upstream_clients.clear()
|
||||||
|
|
||||||
|
# Shutdown thread pool
|
||||||
|
if self._executor:
|
||||||
|
self._executor.shutdown(wait=True)
|
||||||
|
self._executor = None
|
||||||
|
|
||||||
|
self._started = False
|
||||||
|
logger.info("HttpClientManager shutdown complete")
|
||||||
|
|
||||||
|
def get_client(self, upstream_name: Optional[str] = None) -> httpx.AsyncClient:
|
||||||
|
"""
|
||||||
|
Get HTTP client for making requests.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
upstream_name: Optional upstream source name for dedicated pool.
|
||||||
|
If None, returns the default shared client.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
httpx.AsyncClient configured for the request.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RuntimeError: If manager not started.
|
||||||
|
"""
|
||||||
|
if not self._started or not self._default_client:
|
||||||
|
raise RuntimeError("HttpClientManager not started. Call startup() first.")
|
||||||
|
|
||||||
|
if upstream_name and upstream_name in self._upstream_clients:
|
||||||
|
return self._upstream_clients[upstream_name]
|
||||||
|
|
||||||
|
return self._default_client
|
||||||
|
|
||||||
|
async def run_blocking(self, func: Callable[..., Any], *args: Any) -> Any:
|
||||||
|
"""
|
||||||
|
Run a blocking function in the thread pool.
|
||||||
|
|
||||||
|
Use this for:
|
||||||
|
- File I/O operations
|
||||||
|
- Archive extraction (zipfile, tarfile)
|
||||||
|
- Hash computation on large data
|
||||||
|
|
||||||
|
Args:
|
||||||
|
func: Synchronous function to execute
|
||||||
|
*args: Arguments to pass to the function
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The function's return value.
|
||||||
|
"""
|
||||||
|
if not self._executor:
|
||||||
|
raise RuntimeError("HttpClientManager not started. Call startup() first.")
|
||||||
|
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
return await loop.run_in_executor(self._executor, func, *args)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def active_connections(self) -> int:
|
||||||
|
"""Get approximate number of active connections (for health checks)."""
|
||||||
|
if not self._default_client:
|
||||||
|
return 0
|
||||||
|
# httpx doesn't expose this directly, return pool size as approximation
|
||||||
|
return self.max_connections
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pool_size(self) -> int:
|
||||||
|
"""Get configured pool size."""
|
||||||
|
return self.max_connections
|
||||||
|
|
||||||
|
@property
|
||||||
|
def executor_active(self) -> int:
|
||||||
|
"""Get number of active thread pool workers."""
|
||||||
|
if not self._executor:
|
||||||
|
return 0
|
||||||
|
return len(self._executor._threads)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def executor_max(self) -> int:
|
||||||
|
"""Get max thread pool workers."""
|
||||||
|
return self.worker_threads
|
||||||
194
backend/tests/unit/test_http_client.py
Normal file
194
backend/tests/unit/test_http_client.py
Normal file
@@ -0,0 +1,194 @@
|
|||||||
|
"""Tests for HttpClientManager."""
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock, AsyncMock, patch
|
||||||
|
|
||||||
|
|
||||||
|
class TestHttpClientManager:
|
||||||
|
"""Tests for HTTP client pool management."""
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_manager_initializes_with_settings(self):
|
||||||
|
"""Manager should initialize with config settings."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(
|
||||||
|
http_max_connections=50,
|
||||||
|
http_connect_timeout=15.0,
|
||||||
|
)
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
assert manager.max_connections == 50
|
||||||
|
assert manager.connect_timeout == 15.0
|
||||||
|
assert manager._default_client is None # Not started yet
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_startup_creates_client(self):
|
||||||
|
"""Startup should create the default async client."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
await manager.startup()
|
||||||
|
|
||||||
|
assert manager._default_client is not None
|
||||||
|
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_shutdown_closes_client(self):
|
||||||
|
"""Shutdown should close all clients gracefully."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
await manager.startup()
|
||||||
|
client = manager._default_client
|
||||||
|
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
|
assert manager._default_client is None
|
||||||
|
assert client.is_closed
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_get_client_returns_default(self):
|
||||||
|
"""get_client() should return the default client."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
await manager.startup()
|
||||||
|
|
||||||
|
client = manager.get_client()
|
||||||
|
|
||||||
|
assert client is manager._default_client
|
||||||
|
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_get_client_raises_if_not_started(self):
|
||||||
|
"""get_client() should raise RuntimeError if manager not started."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="not started"):
|
||||||
|
manager.get_client()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_run_blocking_executes_in_thread_pool(self):
|
||||||
|
"""run_blocking should execute sync functions in thread pool."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
import threading
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
await manager.startup()
|
||||||
|
|
||||||
|
main_thread = threading.current_thread()
|
||||||
|
execution_thread = None
|
||||||
|
|
||||||
|
def blocking_func():
|
||||||
|
nonlocal execution_thread
|
||||||
|
execution_thread = threading.current_thread()
|
||||||
|
return "result"
|
||||||
|
|
||||||
|
result = await manager.run_blocking(blocking_func)
|
||||||
|
|
||||||
|
assert result == "result"
|
||||||
|
assert execution_thread is not main_thread
|
||||||
|
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_run_blocking_raises_if_not_started(self):
|
||||||
|
"""run_blocking should raise RuntimeError if manager not started."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="not started"):
|
||||||
|
await manager.run_blocking(lambda: None)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_startup_idempotent(self):
|
||||||
|
"""Calling startup multiple times should be safe."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
await manager.startup()
|
||||||
|
client1 = manager._default_client
|
||||||
|
|
||||||
|
await manager.startup() # Should not create a new client
|
||||||
|
client2 = manager._default_client
|
||||||
|
|
||||||
|
assert client1 is client2 # Same client instance
|
||||||
|
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_shutdown_idempotent(self):
|
||||||
|
"""Calling shutdown multiple times should be safe."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
await manager.startup()
|
||||||
|
await manager.shutdown()
|
||||||
|
await manager.shutdown() # Should not raise
|
||||||
|
|
||||||
|
assert manager._default_client is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_properties_return_configured_values(self):
|
||||||
|
"""Properties should return configured values."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(
|
||||||
|
http_max_connections=75,
|
||||||
|
http_worker_threads=16,
|
||||||
|
)
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
await manager.startup()
|
||||||
|
|
||||||
|
assert manager.pool_size == 75
|
||||||
|
assert manager.executor_max == 16
|
||||||
|
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_active_connections_when_not_started(self):
|
||||||
|
"""active_connections should return 0 when not started."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
assert manager.active_connections == 0
|
||||||
Reference in New Issue
Block a user