Add HttpClientManager class for managing httpx.AsyncClient pools with FastAPI lifespan integration. Features include: - Default shared connection pool for general requests - Configurable max connections, keep-alive, and timeouts - Dedicated thread pool for blocking I/O operations - Graceful startup/shutdown lifecycle management - Per-upstream client isolation support (for future use) Includes comprehensive unit tests covering initialization, startup, shutdown, client retrieval, blocking operations, idempotency, and error handling.
180 lines
5.5 KiB
Python
180 lines
5.5 KiB
Python
"""
|
|
HTTP client manager with connection pooling and lifecycle management.
|
|
|
|
Provides:
|
|
- Shared connection pools for upstream requests
|
|
- Per-upstream client isolation when needed
|
|
- Thread pool for blocking I/O operations
|
|
- FastAPI lifespan integration
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from typing import Any, Callable, Optional
|
|
|
|
import httpx
|
|
|
|
from .config import Settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HttpClientManager:
|
|
"""
|
|
Manages httpx.AsyncClient pools with FastAPI lifespan integration.
|
|
|
|
Features:
|
|
- Default shared pool for general requests
|
|
- Per-upstream pools for sources needing specific config/auth
|
|
- Dedicated thread pool for blocking operations
|
|
- Graceful shutdown
|
|
"""
|
|
|
|
def __init__(self, settings: Settings):
|
|
self.max_connections = settings.http_max_connections
|
|
self.max_keepalive = settings.http_max_keepalive
|
|
self.connect_timeout = settings.http_connect_timeout
|
|
self.read_timeout = settings.http_read_timeout
|
|
self.worker_threads = settings.http_worker_threads
|
|
|
|
self._default_client: Optional[httpx.AsyncClient] = None
|
|
self._upstream_clients: dict[str, httpx.AsyncClient] = {}
|
|
self._executor: Optional[ThreadPoolExecutor] = None
|
|
self._started = False
|
|
|
|
async def startup(self) -> None:
|
|
"""Initialize clients and thread pool. Called by FastAPI lifespan."""
|
|
if self._started:
|
|
return
|
|
|
|
logger.info(
|
|
f"Starting HttpClientManager: max_connections={self.max_connections}, "
|
|
f"worker_threads={self.worker_threads}"
|
|
)
|
|
|
|
# Create connection limits
|
|
limits = httpx.Limits(
|
|
max_connections=self.max_connections,
|
|
max_keepalive_connections=self.max_keepalive,
|
|
)
|
|
|
|
# Create timeout config
|
|
timeout = httpx.Timeout(
|
|
connect=self.connect_timeout,
|
|
read=self.read_timeout,
|
|
write=self.read_timeout,
|
|
pool=self.connect_timeout,
|
|
)
|
|
|
|
# Create default client
|
|
self._default_client = httpx.AsyncClient(
|
|
limits=limits,
|
|
timeout=timeout,
|
|
follow_redirects=False, # Handle redirects manually for auth
|
|
)
|
|
|
|
# Create thread pool for blocking operations
|
|
self._executor = ThreadPoolExecutor(
|
|
max_workers=self.worker_threads,
|
|
thread_name_prefix="orchard-blocking-",
|
|
)
|
|
|
|
self._started = True
|
|
logger.info("HttpClientManager started")
|
|
|
|
async def shutdown(self) -> None:
|
|
"""Close all clients and thread pool. Called by FastAPI lifespan."""
|
|
if not self._started:
|
|
return
|
|
|
|
logger.info("Shutting down HttpClientManager")
|
|
|
|
# Close default client
|
|
if self._default_client:
|
|
await self._default_client.aclose()
|
|
self._default_client = None
|
|
|
|
# Close upstream-specific clients
|
|
for name, client in self._upstream_clients.items():
|
|
logger.debug(f"Closing upstream client: {name}")
|
|
await client.aclose()
|
|
self._upstream_clients.clear()
|
|
|
|
# Shutdown thread pool
|
|
if self._executor:
|
|
self._executor.shutdown(wait=True)
|
|
self._executor = None
|
|
|
|
self._started = False
|
|
logger.info("HttpClientManager shutdown complete")
|
|
|
|
def get_client(self, upstream_name: Optional[str] = None) -> httpx.AsyncClient:
|
|
"""
|
|
Get HTTP client for making requests.
|
|
|
|
Args:
|
|
upstream_name: Optional upstream source name for dedicated pool.
|
|
If None, returns the default shared client.
|
|
|
|
Returns:
|
|
httpx.AsyncClient configured for the request.
|
|
|
|
Raises:
|
|
RuntimeError: If manager not started.
|
|
"""
|
|
if not self._started or not self._default_client:
|
|
raise RuntimeError("HttpClientManager not started. Call startup() first.")
|
|
|
|
if upstream_name and upstream_name in self._upstream_clients:
|
|
return self._upstream_clients[upstream_name]
|
|
|
|
return self._default_client
|
|
|
|
async def run_blocking(self, func: Callable[..., Any], *args: Any) -> Any:
|
|
"""
|
|
Run a blocking function in the thread pool.
|
|
|
|
Use this for:
|
|
- File I/O operations
|
|
- Archive extraction (zipfile, tarfile)
|
|
- Hash computation on large data
|
|
|
|
Args:
|
|
func: Synchronous function to execute
|
|
*args: Arguments to pass to the function
|
|
|
|
Returns:
|
|
The function's return value.
|
|
"""
|
|
if not self._executor:
|
|
raise RuntimeError("HttpClientManager not started. Call startup() first.")
|
|
|
|
loop = asyncio.get_event_loop()
|
|
return await loop.run_in_executor(self._executor, func, *args)
|
|
|
|
@property
|
|
def active_connections(self) -> int:
|
|
"""Get approximate number of active connections (for health checks)."""
|
|
if not self._default_client:
|
|
return 0
|
|
# httpx doesn't expose this directly, return pool size as approximation
|
|
return self.max_connections
|
|
|
|
@property
|
|
def pool_size(self) -> int:
|
|
"""Get configured pool size."""
|
|
return self.max_connections
|
|
|
|
@property
|
|
def executor_active(self) -> int:
|
|
"""Get number of active thread pool workers."""
|
|
if not self._executor:
|
|
return 0
|
|
return len(self._executor._threads)
|
|
|
|
@property
|
|
def executor_max(self) -> int:
|
|
"""Get max thread pool workers."""
|
|
return self.worker_threads
|