- Remove unused _get_pypi_upstream_sources_cached function (never called) - Remove unused CacheService import and get_cache helper - Remove unused cache parameter from pypi_download_file - Fix asyncio.get_event_loop() deprecation - use get_running_loop() - Note: The caching implementation was incomplete but the other performance improvements (connection pooling, batch DB ops) remain
180 lines
5.5 KiB
Python
180 lines
5.5 KiB
Python
"""
|
|
HTTP client manager with connection pooling and lifecycle management.
|
|
|
|
Provides:
|
|
- Shared connection pools for upstream requests
|
|
- Per-upstream client isolation when needed
|
|
- Thread pool for blocking I/O operations
|
|
- FastAPI lifespan integration
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
from typing import Any, Callable, Optional
|
|
|
|
import httpx
|
|
|
|
from .config import Settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class HttpClientManager:
|
|
"""
|
|
Manages httpx.AsyncClient pools with FastAPI lifespan integration.
|
|
|
|
Features:
|
|
- Default shared pool for general requests
|
|
- Per-upstream pools for sources needing specific config/auth
|
|
- Dedicated thread pool for blocking operations
|
|
- Graceful shutdown
|
|
"""
|
|
|
|
def __init__(self, settings: Settings):
|
|
self.max_connections = settings.http_max_connections
|
|
self.max_keepalive = settings.http_max_keepalive
|
|
self.connect_timeout = settings.http_connect_timeout
|
|
self.read_timeout = settings.http_read_timeout
|
|
self.worker_threads = settings.http_worker_threads
|
|
|
|
self._default_client: Optional[httpx.AsyncClient] = None
|
|
self._upstream_clients: dict[str, httpx.AsyncClient] = {}
|
|
self._executor: Optional[ThreadPoolExecutor] = None
|
|
self._started = False
|
|
|
|
async def startup(self) -> None:
|
|
"""Initialize clients and thread pool. Called by FastAPI lifespan."""
|
|
if self._started:
|
|
return
|
|
|
|
logger.info(
|
|
f"Starting HttpClientManager: max_connections={self.max_connections}, "
|
|
f"worker_threads={self.worker_threads}"
|
|
)
|
|
|
|
# Create connection limits
|
|
limits = httpx.Limits(
|
|
max_connections=self.max_connections,
|
|
max_keepalive_connections=self.max_keepalive,
|
|
)
|
|
|
|
# Create timeout config
|
|
timeout = httpx.Timeout(
|
|
connect=self.connect_timeout,
|
|
read=self.read_timeout,
|
|
write=self.read_timeout,
|
|
pool=self.connect_timeout,
|
|
)
|
|
|
|
# Create default client
|
|
self._default_client = httpx.AsyncClient(
|
|
limits=limits,
|
|
timeout=timeout,
|
|
follow_redirects=False, # Handle redirects manually for auth
|
|
)
|
|
|
|
# Create thread pool for blocking operations
|
|
self._executor = ThreadPoolExecutor(
|
|
max_workers=self.worker_threads,
|
|
thread_name_prefix="orchard-blocking-",
|
|
)
|
|
|
|
self._started = True
|
|
logger.info("HttpClientManager started")
|
|
|
|
async def shutdown(self) -> None:
|
|
"""Close all clients and thread pool. Called by FastAPI lifespan."""
|
|
if not self._started:
|
|
return
|
|
|
|
logger.info("Shutting down HttpClientManager")
|
|
|
|
# Close default client
|
|
if self._default_client:
|
|
await self._default_client.aclose()
|
|
self._default_client = None
|
|
|
|
# Close upstream-specific clients
|
|
for name, client in self._upstream_clients.items():
|
|
logger.debug(f"Closing upstream client: {name}")
|
|
await client.aclose()
|
|
self._upstream_clients.clear()
|
|
|
|
# Shutdown thread pool
|
|
if self._executor:
|
|
self._executor.shutdown(wait=True)
|
|
self._executor = None
|
|
|
|
self._started = False
|
|
logger.info("HttpClientManager shutdown complete")
|
|
|
|
def get_client(self, upstream_name: Optional[str] = None) -> httpx.AsyncClient:
|
|
"""
|
|
Get HTTP client for making requests.
|
|
|
|
Args:
|
|
upstream_name: Optional upstream source name for dedicated pool.
|
|
If None, returns the default shared client.
|
|
|
|
Returns:
|
|
httpx.AsyncClient configured for the request.
|
|
|
|
Raises:
|
|
RuntimeError: If manager not started.
|
|
"""
|
|
if not self._started or not self._default_client:
|
|
raise RuntimeError("HttpClientManager not started. Call startup() first.")
|
|
|
|
if upstream_name and upstream_name in self._upstream_clients:
|
|
return self._upstream_clients[upstream_name]
|
|
|
|
return self._default_client
|
|
|
|
async def run_blocking(self, func: Callable[..., Any], *args: Any) -> Any:
|
|
"""
|
|
Run a blocking function in the thread pool.
|
|
|
|
Use this for:
|
|
- File I/O operations
|
|
- Archive extraction (zipfile, tarfile)
|
|
- Hash computation on large data
|
|
|
|
Args:
|
|
func: Synchronous function to execute
|
|
*args: Arguments to pass to the function
|
|
|
|
Returns:
|
|
The function's return value.
|
|
"""
|
|
if not self._executor:
|
|
raise RuntimeError("HttpClientManager not started. Call startup() first.")
|
|
|
|
loop = asyncio.get_running_loop()
|
|
return await loop.run_in_executor(self._executor, func, *args)
|
|
|
|
@property
|
|
def active_connections(self) -> int:
|
|
"""Get approximate number of active connections (for health checks)."""
|
|
if not self._default_client:
|
|
return 0
|
|
# httpx doesn't expose this directly, return pool size as approximation
|
|
return self.max_connections
|
|
|
|
@property
|
|
def pool_size(self) -> int:
|
|
"""Get configured pool size."""
|
|
return self.max_connections
|
|
|
|
@property
|
|
def executor_active(self) -> int:
|
|
"""Get number of active thread pool workers."""
|
|
if not self._executor:
|
|
return 0
|
|
return len(self._executor._threads)
|
|
|
|
@property
|
|
def executor_max(self) -> int:
|
|
"""Get max thread pool workers."""
|
|
return self.worker_threads
|