Compare commits
14 Commits
196f3f957c
...
632bf54087
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
632bf54087 | ||
|
|
170561b32a | ||
|
|
6e05697ae2 | ||
|
|
08b6589712 | ||
|
|
7ad5a15ef4 | ||
|
|
8fdb73901e | ||
|
|
79dd7b833e | ||
|
|
71089aee0e | ||
|
|
ffe0529ea8 | ||
|
|
146ca2ad74 | ||
|
|
a045509fe4 | ||
|
|
14806b05f0 | ||
|
|
c67004af52 | ||
|
|
8c6ba01a73 |
23
CHANGELOG.md
23
CHANGELOG.md
@@ -7,6 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
|
||||
## [Unreleased]
|
||||
### Added
|
||||
- Added HTTP connection pooling infrastructure for improved PyPI proxy performance
|
||||
- `HttpClientManager` with configurable pool size, timeouts, and thread pool executor
|
||||
- Eliminates per-request connection overhead (~100-500ms → ~5ms)
|
||||
- Added Redis caching layer with category-aware TTL for hermetic builds
|
||||
- `CacheService` with graceful fallback when Redis unavailable
|
||||
- Immutable data (artifact metadata, dependencies) cached forever
|
||||
- Mutable data (package index, versions) uses configurable TTL
|
||||
- Added `ArtifactRepository` for batch database operations
|
||||
- `batch_upsert_dependencies()` reduces N+1 queries to single INSERT
|
||||
- `get_or_create_artifact()` uses atomic ON CONFLICT upsert
|
||||
- Added infrastructure status to health endpoint (`/health`)
|
||||
- Reports HTTP pool size and worker threads
|
||||
- Reports Redis cache connection status
|
||||
- Added new configuration settings for HTTP client, Redis, and cache TTL
|
||||
- `ORCHARD_HTTP_MAX_CONNECTIONS`, `ORCHARD_HTTP_CONNECT_TIMEOUT`, etc.
|
||||
- `ORCHARD_REDIS_HOST`, `ORCHARD_REDIS_PORT`, `ORCHARD_REDIS_ENABLED`
|
||||
- `ORCHARD_CACHE_TTL_INDEX`, `ORCHARD_CACHE_TTL_VERSIONS`, etc.
|
||||
- Added transparent PyPI proxy implementing PEP 503 Simple API (#108)
|
||||
- `GET /pypi/simple/` - package index (proxied from upstream)
|
||||
- `GET /pypi/simple/{package}/` - version list with rewritten download links
|
||||
@@ -16,6 +33,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||
- Added `POST /api/v1/cache/resolve` endpoint to cache packages by coordinates instead of URL (#108)
|
||||
|
||||
### Changed
|
||||
- PyPI proxy now uses shared HTTP connection pool instead of per-request clients
|
||||
- PyPI proxy now caches upstream source configuration in Redis
|
||||
- Dependency storage now uses batch INSERT instead of individual queries
|
||||
- Increased default database pool size from 5 to 20 connections
|
||||
- Increased default database max overflow from 10 to 30 connections
|
||||
- Enabled Redis in Helm chart values for dev, stage, and prod environments
|
||||
- Upstream sources table text is now centered under column headers (#108)
|
||||
- ENV badge now appears inline with source name instead of separate column (#108)
|
||||
- Test and Edit buttons now have more prominent button styling (#108)
|
||||
|
||||
262
backend/app/cache_service.py
Normal file
262
backend/app/cache_service.py
Normal file
@@ -0,0 +1,262 @@
|
||||
"""
|
||||
Redis-backed caching service with category-aware TTL and invalidation.
|
||||
|
||||
Provides:
|
||||
- Immutable caching for artifact data (hermetic builds)
|
||||
- TTL-based caching for discovery data
|
||||
- Event-driven invalidation for config changes
|
||||
- Graceful fallback when Redis unavailable
|
||||
"""
|
||||
|
||||
import logging
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
from .config import Settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CacheCategory(Enum):
|
||||
"""
|
||||
Cache categories with different TTL and invalidation rules.
|
||||
|
||||
Immutable (cache forever):
|
||||
- ARTIFACT_METADATA: Artifact info by SHA256
|
||||
- ARTIFACT_DEPENDENCIES: Extracted deps by SHA256
|
||||
- DEPENDENCY_RESOLUTION: Resolution results by input hash
|
||||
|
||||
Mutable (TTL + event invalidation):
|
||||
- UPSTREAM_SOURCES: Upstream config, invalidate on DB change
|
||||
- PACKAGE_INDEX: PyPI/npm index pages, TTL only
|
||||
- PACKAGE_VERSIONS: Version listings, TTL only
|
||||
"""
|
||||
|
||||
# Immutable - cache forever (hermetic builds)
|
||||
ARTIFACT_METADATA = "artifact"
|
||||
ARTIFACT_DEPENDENCIES = "deps"
|
||||
DEPENDENCY_RESOLUTION = "resolve"
|
||||
|
||||
# Mutable - TTL + event invalidation
|
||||
UPSTREAM_SOURCES = "upstream"
|
||||
PACKAGE_INDEX = "index"
|
||||
PACKAGE_VERSIONS = "versions"
|
||||
|
||||
|
||||
def get_category_ttl(category: CacheCategory, settings: Settings) -> Optional[int]:
|
||||
"""
|
||||
Get TTL for a cache category.
|
||||
|
||||
Returns:
|
||||
TTL in seconds, or None for no expiry (immutable).
|
||||
"""
|
||||
ttl_map = {
|
||||
# Immutable - no TTL
|
||||
CacheCategory.ARTIFACT_METADATA: None,
|
||||
CacheCategory.ARTIFACT_DEPENDENCIES: None,
|
||||
CacheCategory.DEPENDENCY_RESOLUTION: None,
|
||||
# Mutable - configurable TTL
|
||||
CacheCategory.UPSTREAM_SOURCES: settings.cache_ttl_upstream,
|
||||
CacheCategory.PACKAGE_INDEX: settings.cache_ttl_index,
|
||||
CacheCategory.PACKAGE_VERSIONS: settings.cache_ttl_versions,
|
||||
}
|
||||
return ttl_map.get(category)
|
||||
|
||||
|
||||
class CacheService:
|
||||
"""
|
||||
Redis-backed caching with category-aware TTL.
|
||||
|
||||
Key format: orchard:{category}:{protocol}:{identifier}
|
||||
Example: orchard:deps:pypi:abc123def456
|
||||
|
||||
When Redis is disabled or unavailable, operations gracefully
|
||||
return None/no-op to allow the application to function without caching.
|
||||
"""
|
||||
|
||||
def __init__(self, settings: Settings):
|
||||
self._settings = settings
|
||||
self._enabled = settings.redis_enabled
|
||||
self._redis: Optional["redis.asyncio.Redis"] = None
|
||||
self._started = False
|
||||
|
||||
async def startup(self) -> None:
|
||||
"""Initialize Redis connection. Called by FastAPI lifespan."""
|
||||
if self._started:
|
||||
return
|
||||
|
||||
if not self._enabled:
|
||||
logger.info("CacheService disabled (redis_enabled=False)")
|
||||
self._started = True
|
||||
return
|
||||
|
||||
try:
|
||||
import redis.asyncio as redis
|
||||
|
||||
logger.info(
|
||||
f"Connecting to Redis at {self._settings.redis_host}:"
|
||||
f"{self._settings.redis_port}/{self._settings.redis_db}"
|
||||
)
|
||||
|
||||
self._redis = redis.Redis(
|
||||
host=self._settings.redis_host,
|
||||
port=self._settings.redis_port,
|
||||
db=self._settings.redis_db,
|
||||
password=self._settings.redis_password,
|
||||
decode_responses=False, # We handle bytes
|
||||
)
|
||||
|
||||
# Test connection
|
||||
await self._redis.ping()
|
||||
logger.info("CacheService connected to Redis")
|
||||
|
||||
except ImportError:
|
||||
logger.warning("redis package not installed, caching disabled")
|
||||
self._enabled = False
|
||||
except Exception as e:
|
||||
logger.warning(f"Redis connection failed, caching disabled: {e}")
|
||||
self._enabled = False
|
||||
self._redis = None
|
||||
|
||||
self._started = True
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
"""Close Redis connection. Called by FastAPI lifespan."""
|
||||
if not self._started:
|
||||
return
|
||||
|
||||
if self._redis:
|
||||
await self._redis.aclose()
|
||||
self._redis = None
|
||||
|
||||
self._started = False
|
||||
logger.info("CacheService shutdown complete")
|
||||
|
||||
@staticmethod
|
||||
def _make_key(category: CacheCategory, protocol: str, identifier: str) -> str:
|
||||
"""Build namespaced cache key."""
|
||||
return f"orchard:{category.value}:{protocol}:{identifier}"
|
||||
|
||||
async def get(
|
||||
self,
|
||||
category: CacheCategory,
|
||||
key: str,
|
||||
protocol: str = "default",
|
||||
) -> Optional[bytes]:
|
||||
"""
|
||||
Get cached value.
|
||||
|
||||
Args:
|
||||
category: Cache category for TTL rules
|
||||
key: Unique identifier within category
|
||||
protocol: Protocol namespace (pypi, npm, etc.)
|
||||
|
||||
Returns:
|
||||
Cached bytes or None if not found/disabled.
|
||||
"""
|
||||
if not self._enabled or not self._redis:
|
||||
return None
|
||||
|
||||
try:
|
||||
full_key = self._make_key(category, protocol, key)
|
||||
return await self._redis.get(full_key)
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache get failed for {key}: {e}")
|
||||
return None
|
||||
|
||||
async def set(
|
||||
self,
|
||||
category: CacheCategory,
|
||||
key: str,
|
||||
value: bytes,
|
||||
protocol: str = "default",
|
||||
) -> None:
|
||||
"""
|
||||
Set cached value with category-appropriate TTL.
|
||||
|
||||
Args:
|
||||
category: Cache category for TTL rules
|
||||
key: Unique identifier within category
|
||||
value: Bytes to cache
|
||||
protocol: Protocol namespace (pypi, npm, etc.)
|
||||
"""
|
||||
if not self._enabled or not self._redis:
|
||||
return
|
||||
|
||||
try:
|
||||
full_key = self._make_key(category, protocol, key)
|
||||
ttl = get_category_ttl(category, self._settings)
|
||||
|
||||
if ttl is None:
|
||||
await self._redis.set(full_key, value)
|
||||
else:
|
||||
await self._redis.setex(full_key, ttl, value)
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache set failed for {key}: {e}")
|
||||
|
||||
async def delete(
|
||||
self,
|
||||
category: CacheCategory,
|
||||
key: str,
|
||||
protocol: str = "default",
|
||||
) -> None:
|
||||
"""Delete a specific cache entry."""
|
||||
if not self._enabled or not self._redis:
|
||||
return
|
||||
|
||||
try:
|
||||
full_key = self._make_key(category, protocol, key)
|
||||
await self._redis.delete(full_key)
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache delete failed for {key}: {e}")
|
||||
|
||||
async def invalidate_pattern(
|
||||
self,
|
||||
category: CacheCategory,
|
||||
pattern: str = "*",
|
||||
protocol: str = "default",
|
||||
) -> int:
|
||||
"""
|
||||
Invalidate all entries matching pattern.
|
||||
|
||||
Args:
|
||||
category: Cache category
|
||||
pattern: Glob pattern for keys (default "*" = all in category)
|
||||
protocol: Protocol namespace
|
||||
|
||||
Returns:
|
||||
Number of keys deleted.
|
||||
"""
|
||||
if not self._enabled or not self._redis:
|
||||
return 0
|
||||
|
||||
try:
|
||||
full_pattern = self._make_key(category, protocol, pattern)
|
||||
keys = []
|
||||
async for key in self._redis.scan_iter(match=full_pattern):
|
||||
keys.append(key)
|
||||
|
||||
if keys:
|
||||
return await self._redis.delete(*keys)
|
||||
return 0
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Cache invalidate failed for pattern {pattern}: {e}")
|
||||
return 0
|
||||
|
||||
async def ping(self) -> bool:
|
||||
"""Check if Redis is connected and responding."""
|
||||
if not self._enabled or not self._redis:
|
||||
return False
|
||||
|
||||
try:
|
||||
await self._redis.ping()
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
@property
|
||||
def enabled(self) -> bool:
|
||||
"""Check if caching is enabled."""
|
||||
return self._enabled
|
||||
@@ -22,8 +22,8 @@ class Settings(BaseSettings):
|
||||
database_sslmode: str = "disable"
|
||||
|
||||
# Database connection pool settings
|
||||
database_pool_size: int = 5 # Number of connections to keep open
|
||||
database_max_overflow: int = 10 # Max additional connections beyond pool_size
|
||||
database_pool_size: int = 20 # Number of connections to keep open
|
||||
database_max_overflow: int = 30 # Max additional connections beyond pool_size
|
||||
database_pool_timeout: int = 30 # Seconds to wait for a connection from pool
|
||||
database_pool_recycle: int = (
|
||||
1800 # Recycle connections after this many seconds (30 min)
|
||||
@@ -53,6 +53,25 @@ class Settings(BaseSettings):
|
||||
)
|
||||
pypi_download_mode: str = "redirect" # "redirect" (to S3) or "proxy" (stream through Orchard)
|
||||
|
||||
# HTTP Client pool settings
|
||||
http_max_connections: int = 100 # Max connections per pool
|
||||
http_max_keepalive: int = 20 # Keep-alive connections
|
||||
http_connect_timeout: float = 30.0 # Connection timeout seconds
|
||||
http_read_timeout: float = 60.0 # Read timeout seconds
|
||||
http_worker_threads: int = 32 # Thread pool for blocking ops
|
||||
|
||||
# Redis cache settings
|
||||
redis_host: str = "localhost"
|
||||
redis_port: int = 6379
|
||||
redis_db: int = 0
|
||||
redis_password: Optional[str] = None
|
||||
redis_enabled: bool = True # Set False to disable caching
|
||||
|
||||
# Cache TTL settings (seconds, 0 = no expiry)
|
||||
cache_ttl_index: int = 300 # Package index pages: 5 min
|
||||
cache_ttl_versions: int = 300 # Version listings: 5 min
|
||||
cache_ttl_upstream: int = 3600 # Upstream source config: 1 hour
|
||||
|
||||
# Logging settings
|
||||
log_level: str = "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL
|
||||
log_format: str = "auto" # "json", "standard", or "auto" (json in production)
|
||||
|
||||
175
backend/app/db_utils.py
Normal file
175
backend/app/db_utils.py
Normal file
@@ -0,0 +1,175 @@
|
||||
"""
|
||||
Database utilities for optimized artifact operations.
|
||||
|
||||
Provides batch operations to eliminate N+1 queries.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from .models import Artifact, ArtifactDependency, CachedUrl
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ArtifactRepository:
|
||||
"""
|
||||
Optimized database operations for artifact storage.
|
||||
|
||||
Key optimizations:
|
||||
- Atomic upserts using ON CONFLICT
|
||||
- Batch inserts for dependencies
|
||||
- Joined queries to avoid N+1
|
||||
"""
|
||||
|
||||
def __init__(self, db: Session):
|
||||
self.db = db
|
||||
|
||||
@staticmethod
|
||||
def _format_dependency_values(
|
||||
artifact_id: str,
|
||||
dependencies: list[tuple[str, str, str]],
|
||||
) -> list[dict]:
|
||||
"""
|
||||
Format dependencies for batch insert.
|
||||
|
||||
Args:
|
||||
artifact_id: SHA256 of the artifact
|
||||
dependencies: List of (project, package, version_constraint)
|
||||
|
||||
Returns:
|
||||
List of dicts ready for bulk insert.
|
||||
"""
|
||||
return [
|
||||
{
|
||||
"artifact_id": artifact_id,
|
||||
"dependency_project": proj,
|
||||
"dependency_package": pkg,
|
||||
"version_constraint": ver,
|
||||
}
|
||||
for proj, pkg, ver in dependencies
|
||||
]
|
||||
|
||||
def get_or_create_artifact(
|
||||
self,
|
||||
sha256: str,
|
||||
size: int,
|
||||
filename: str,
|
||||
content_type: Optional[str] = None,
|
||||
created_by: str = "system",
|
||||
s3_key: Optional[str] = None,
|
||||
) -> tuple[Artifact, bool]:
|
||||
"""
|
||||
Get existing artifact or create new one atomically.
|
||||
|
||||
Uses INSERT ... ON CONFLICT DO UPDATE to handle races.
|
||||
If artifact exists, increments ref_count.
|
||||
|
||||
Args:
|
||||
sha256: Content hash (primary key)
|
||||
size: File size in bytes
|
||||
filename: Original filename
|
||||
content_type: MIME type
|
||||
created_by: User who created the artifact
|
||||
s3_key: S3 storage key (defaults to standard path)
|
||||
|
||||
Returns:
|
||||
(artifact, created) tuple where created is True for new artifacts.
|
||||
"""
|
||||
if s3_key is None:
|
||||
s3_key = f"fruits/{sha256[:2]}/{sha256[2:4]}/{sha256}"
|
||||
|
||||
stmt = pg_insert(Artifact).values(
|
||||
id=sha256,
|
||||
size=size,
|
||||
original_name=filename,
|
||||
content_type=content_type,
|
||||
ref_count=1,
|
||||
created_by=created_by,
|
||||
s3_key=s3_key,
|
||||
).on_conflict_do_update(
|
||||
index_elements=['id'],
|
||||
set_={'ref_count': Artifact.ref_count + 1}
|
||||
).returning(Artifact)
|
||||
|
||||
result = self.db.execute(stmt)
|
||||
artifact = result.scalar_one()
|
||||
|
||||
# Check if this was an insert or update by comparing ref_count
|
||||
# ref_count=1 means new, >1 means existing
|
||||
created = artifact.ref_count == 1
|
||||
|
||||
return artifact, created
|
||||
|
||||
def batch_upsert_dependencies(
|
||||
self,
|
||||
artifact_id: str,
|
||||
dependencies: list[tuple[str, str, str]],
|
||||
) -> int:
|
||||
"""
|
||||
Insert dependencies in a single batch operation.
|
||||
|
||||
Uses ON CONFLICT DO NOTHING to skip duplicates.
|
||||
|
||||
Args:
|
||||
artifact_id: SHA256 of the artifact
|
||||
dependencies: List of (project, package, version_constraint)
|
||||
|
||||
Returns:
|
||||
Number of dependencies inserted.
|
||||
"""
|
||||
if not dependencies:
|
||||
return 0
|
||||
|
||||
values = self._format_dependency_values(artifact_id, dependencies)
|
||||
|
||||
stmt = pg_insert(ArtifactDependency).values(values)
|
||||
stmt = stmt.on_conflict_do_nothing(
|
||||
index_elements=['artifact_id', 'dependency_project', 'dependency_package']
|
||||
)
|
||||
|
||||
result = self.db.execute(stmt)
|
||||
return result.rowcount
|
||||
|
||||
def get_cached_url_with_artifact(
|
||||
self,
|
||||
url_hash: str,
|
||||
) -> Optional[tuple[CachedUrl, Artifact]]:
|
||||
"""
|
||||
Get cached URL and its artifact in a single query.
|
||||
|
||||
Args:
|
||||
url_hash: SHA256 of the URL
|
||||
|
||||
Returns:
|
||||
(CachedUrl, Artifact) tuple or None if not found.
|
||||
"""
|
||||
result = (
|
||||
self.db.query(CachedUrl, Artifact)
|
||||
.join(Artifact, CachedUrl.artifact_id == Artifact.id)
|
||||
.filter(CachedUrl.url_hash == url_hash)
|
||||
.first()
|
||||
)
|
||||
return result
|
||||
|
||||
def get_artifact_dependencies(
|
||||
self,
|
||||
artifact_id: str,
|
||||
) -> list[ArtifactDependency]:
|
||||
"""
|
||||
Get all dependencies for an artifact in a single query.
|
||||
|
||||
Args:
|
||||
artifact_id: SHA256 of the artifact
|
||||
|
||||
Returns:
|
||||
List of ArtifactDependency objects.
|
||||
"""
|
||||
return (
|
||||
self.db.query(ArtifactDependency)
|
||||
.filter(ArtifactDependency.artifact_id == artifact_id)
|
||||
.all()
|
||||
)
|
||||
179
backend/app/http_client.py
Normal file
179
backend/app/http_client.py
Normal file
@@ -0,0 +1,179 @@
|
||||
"""
|
||||
HTTP client manager with connection pooling and lifecycle management.
|
||||
|
||||
Provides:
|
||||
- Shared connection pools for upstream requests
|
||||
- Per-upstream client isolation when needed
|
||||
- Thread pool for blocking I/O operations
|
||||
- FastAPI lifespan integration
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
import httpx
|
||||
|
||||
from .config import Settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class HttpClientManager:
|
||||
"""
|
||||
Manages httpx.AsyncClient pools with FastAPI lifespan integration.
|
||||
|
||||
Features:
|
||||
- Default shared pool for general requests
|
||||
- Per-upstream pools for sources needing specific config/auth
|
||||
- Dedicated thread pool for blocking operations
|
||||
- Graceful shutdown
|
||||
"""
|
||||
|
||||
def __init__(self, settings: Settings):
|
||||
self.max_connections = settings.http_max_connections
|
||||
self.max_keepalive = settings.http_max_keepalive
|
||||
self.connect_timeout = settings.http_connect_timeout
|
||||
self.read_timeout = settings.http_read_timeout
|
||||
self.worker_threads = settings.http_worker_threads
|
||||
|
||||
self._default_client: Optional[httpx.AsyncClient] = None
|
||||
self._upstream_clients: dict[str, httpx.AsyncClient] = {}
|
||||
self._executor: Optional[ThreadPoolExecutor] = None
|
||||
self._started = False
|
||||
|
||||
async def startup(self) -> None:
|
||||
"""Initialize clients and thread pool. Called by FastAPI lifespan."""
|
||||
if self._started:
|
||||
return
|
||||
|
||||
logger.info(
|
||||
f"Starting HttpClientManager: max_connections={self.max_connections}, "
|
||||
f"worker_threads={self.worker_threads}"
|
||||
)
|
||||
|
||||
# Create connection limits
|
||||
limits = httpx.Limits(
|
||||
max_connections=self.max_connections,
|
||||
max_keepalive_connections=self.max_keepalive,
|
||||
)
|
||||
|
||||
# Create timeout config
|
||||
timeout = httpx.Timeout(
|
||||
connect=self.connect_timeout,
|
||||
read=self.read_timeout,
|
||||
write=self.read_timeout,
|
||||
pool=self.connect_timeout,
|
||||
)
|
||||
|
||||
# Create default client
|
||||
self._default_client = httpx.AsyncClient(
|
||||
limits=limits,
|
||||
timeout=timeout,
|
||||
follow_redirects=False, # Handle redirects manually for auth
|
||||
)
|
||||
|
||||
# Create thread pool for blocking operations
|
||||
self._executor = ThreadPoolExecutor(
|
||||
max_workers=self.worker_threads,
|
||||
thread_name_prefix="orchard-blocking-",
|
||||
)
|
||||
|
||||
self._started = True
|
||||
logger.info("HttpClientManager started")
|
||||
|
||||
async def shutdown(self) -> None:
|
||||
"""Close all clients and thread pool. Called by FastAPI lifespan."""
|
||||
if not self._started:
|
||||
return
|
||||
|
||||
logger.info("Shutting down HttpClientManager")
|
||||
|
||||
# Close default client
|
||||
if self._default_client:
|
||||
await self._default_client.aclose()
|
||||
self._default_client = None
|
||||
|
||||
# Close upstream-specific clients
|
||||
for name, client in self._upstream_clients.items():
|
||||
logger.debug(f"Closing upstream client: {name}")
|
||||
await client.aclose()
|
||||
self._upstream_clients.clear()
|
||||
|
||||
# Shutdown thread pool
|
||||
if self._executor:
|
||||
self._executor.shutdown(wait=True)
|
||||
self._executor = None
|
||||
|
||||
self._started = False
|
||||
logger.info("HttpClientManager shutdown complete")
|
||||
|
||||
def get_client(self, upstream_name: Optional[str] = None) -> httpx.AsyncClient:
|
||||
"""
|
||||
Get HTTP client for making requests.
|
||||
|
||||
Args:
|
||||
upstream_name: Optional upstream source name for dedicated pool.
|
||||
If None, returns the default shared client.
|
||||
|
||||
Returns:
|
||||
httpx.AsyncClient configured for the request.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If manager not started.
|
||||
"""
|
||||
if not self._started or not self._default_client:
|
||||
raise RuntimeError("HttpClientManager not started. Call startup() first.")
|
||||
|
||||
if upstream_name and upstream_name in self._upstream_clients:
|
||||
return self._upstream_clients[upstream_name]
|
||||
|
||||
return self._default_client
|
||||
|
||||
async def run_blocking(self, func: Callable[..., Any], *args: Any) -> Any:
|
||||
"""
|
||||
Run a blocking function in the thread pool.
|
||||
|
||||
Use this for:
|
||||
- File I/O operations
|
||||
- Archive extraction (zipfile, tarfile)
|
||||
- Hash computation on large data
|
||||
|
||||
Args:
|
||||
func: Synchronous function to execute
|
||||
*args: Arguments to pass to the function
|
||||
|
||||
Returns:
|
||||
The function's return value.
|
||||
"""
|
||||
if not self._executor:
|
||||
raise RuntimeError("HttpClientManager not started. Call startup() first.")
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(self._executor, func, *args)
|
||||
|
||||
@property
|
||||
def active_connections(self) -> int:
|
||||
"""Get approximate number of active connections (for health checks)."""
|
||||
if not self._default_client:
|
||||
return 0
|
||||
# httpx doesn't expose this directly, return pool size as approximation
|
||||
return self.max_connections
|
||||
|
||||
@property
|
||||
def pool_size(self) -> int:
|
||||
"""Get configured pool size."""
|
||||
return self.max_connections
|
||||
|
||||
@property
|
||||
def executor_active(self) -> int:
|
||||
"""Get number of active thread pool workers."""
|
||||
if not self._executor:
|
||||
return 0
|
||||
return len(self._executor._threads)
|
||||
|
||||
@property
|
||||
def executor_max(self) -> int:
|
||||
"""Get max thread pool workers."""
|
||||
return self.worker_threads
|
||||
@@ -15,6 +15,8 @@ from .pypi_proxy import router as pypi_router
|
||||
from .seed import seed_database
|
||||
from .auth import create_default_admin
|
||||
from .rate_limit import limiter
|
||||
from .http_client import HttpClientManager
|
||||
from .cache_service import CacheService
|
||||
|
||||
settings = get_settings()
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
@@ -38,6 +40,17 @@ async def lifespan(app: FastAPI):
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
# Initialize infrastructure services
|
||||
logger.info("Initializing infrastructure services...")
|
||||
|
||||
app.state.http_client = HttpClientManager(settings)
|
||||
await app.state.http_client.startup()
|
||||
|
||||
app.state.cache = CacheService(settings)
|
||||
await app.state.cache.startup()
|
||||
|
||||
logger.info("Infrastructure services ready")
|
||||
|
||||
# Seed test data in development mode
|
||||
if settings.is_development:
|
||||
logger.info(f"Running in {settings.env} mode - checking for seed data")
|
||||
@@ -51,6 +64,12 @@ async def lifespan(app: FastAPI):
|
||||
|
||||
yield
|
||||
|
||||
# Shutdown infrastructure services
|
||||
logger.info("Shutting down infrastructure services...")
|
||||
await app.state.http_client.shutdown()
|
||||
await app.state.cache.shutdown()
|
||||
logger.info("Shutdown complete")
|
||||
|
||||
|
||||
app = FastAPI(
|
||||
title="Orchard",
|
||||
|
||||
@@ -23,15 +23,28 @@ from fastapi.responses import StreamingResponse, HTMLResponse, RedirectResponse
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from .database import get_db
|
||||
from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, PackageVersion, ArtifactDependency
|
||||
from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, PackageVersion
|
||||
from .storage import S3Storage, get_storage
|
||||
from .config import get_env_upstream_sources, get_settings
|
||||
from .http_client import HttpClientManager
|
||||
from .cache_service import CacheService, CacheCategory
|
||||
from .db_utils import ArtifactRepository
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
router = APIRouter(prefix="/pypi", tags=["pypi-proxy"])
|
||||
|
||||
|
||||
def get_http_client(request: Request) -> HttpClientManager:
|
||||
"""Get HttpClientManager from app state."""
|
||||
return request.app.state.http_client
|
||||
|
||||
|
||||
def get_cache(request: Request) -> CacheService:
|
||||
"""Get CacheService from app state."""
|
||||
return request.app.state.cache
|
||||
|
||||
|
||||
# Timeout configuration for proxy requests
|
||||
PROXY_CONNECT_TIMEOUT = 30.0
|
||||
PROXY_READ_TIMEOUT = 60.0
|
||||
@@ -241,6 +254,62 @@ def _extract_pypi_version(filename: str) -> Optional[str]:
|
||||
return None
|
||||
|
||||
|
||||
async def _get_pypi_upstream_sources_cached(
|
||||
db: Session,
|
||||
cache: CacheService,
|
||||
) -> list[UpstreamSource]:
|
||||
"""
|
||||
Get PyPI upstream sources with caching.
|
||||
|
||||
Sources are cached for cache_ttl_upstream seconds to avoid
|
||||
repeated database queries on every request.
|
||||
"""
|
||||
cache_key = "sources"
|
||||
|
||||
# Try cache first
|
||||
cached = await cache.get(CacheCategory.UPSTREAM_SOURCES, cache_key, protocol="pypi")
|
||||
if cached:
|
||||
source_data = json.loads(cached.decode())
|
||||
# Reconstruct UpstreamSource-like objects from cached data
|
||||
# We cache just the essential fields needed for requests
|
||||
return [type('CachedSource', (), d)() for d in source_data]
|
||||
|
||||
# Query database
|
||||
db_sources = (
|
||||
db.query(UpstreamSource)
|
||||
.filter(UpstreamSource.source_type == "pypi", UpstreamSource.enabled == True)
|
||||
.order_by(UpstreamSource.priority)
|
||||
.all()
|
||||
)
|
||||
|
||||
# Combine with env sources
|
||||
env_sources = [s for s in get_env_upstream_sources() if s.source_type == "pypi"]
|
||||
all_sources = list(db_sources) + list(env_sources)
|
||||
all_sources = sorted(all_sources, key=lambda s: s.priority)
|
||||
|
||||
# Cache the essential fields
|
||||
if all_sources and cache.enabled:
|
||||
cache_data = [
|
||||
{
|
||||
"name": s.name,
|
||||
"url": s.url,
|
||||
"priority": s.priority,
|
||||
"auth_type": getattr(s, "auth_type", "none"),
|
||||
"username": getattr(s, "username", None),
|
||||
"password": getattr(s, "password", None),
|
||||
}
|
||||
for s in all_sources
|
||||
]
|
||||
await cache.set(
|
||||
CacheCategory.UPSTREAM_SOURCES,
|
||||
cache_key,
|
||||
json.dumps(cache_data).encode(),
|
||||
protocol="pypi",
|
||||
)
|
||||
|
||||
return all_sources
|
||||
|
||||
|
||||
def _get_pypi_upstream_sources(db: Session) -> list[UpstreamSource]:
|
||||
"""Get all enabled upstream sources configured for PyPI."""
|
||||
# Get database sources
|
||||
@@ -573,6 +642,8 @@ async def pypi_download_file(
|
||||
upstream: Optional[str] = None,
|
||||
db: Session = Depends(get_db),
|
||||
storage: S3Storage = Depends(get_storage),
|
||||
http_client: HttpClientManager = Depends(get_http_client),
|
||||
cache: CacheService = Depends(get_cache),
|
||||
):
|
||||
"""
|
||||
Download a package file, caching it in Orchard.
|
||||
@@ -654,7 +725,9 @@ async def pypi_download_file(
|
||||
headers.update(_build_auth_headers(matched_source))
|
||||
auth = _get_basic_auth(matched_source) if matched_source else None
|
||||
|
||||
timeout = httpx.Timeout(300.0, connect=PROXY_CONNECT_TIMEOUT) # 5 minutes for large files
|
||||
# Use shared HTTP client from pool with longer timeout for file downloads
|
||||
client = http_client.get_client()
|
||||
download_timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0)
|
||||
|
||||
# Initialize extracted dependencies list
|
||||
extracted_deps = []
|
||||
@@ -662,78 +735,79 @@ async def pypi_download_file(
|
||||
# Fetch the file
|
||||
logger.info(f"PyPI proxy: fetching {filename} from {upstream_url}")
|
||||
|
||||
async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client:
|
||||
response = await client.get(
|
||||
upstream_url,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
timeout=download_timeout,
|
||||
)
|
||||
|
||||
# Handle redirects manually
|
||||
redirect_count = 0
|
||||
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
|
||||
redirect_url = response.headers.get('location')
|
||||
if not redirect_url:
|
||||
break
|
||||
|
||||
if not redirect_url.startswith('http'):
|
||||
redirect_url = urljoin(upstream_url, redirect_url)
|
||||
|
||||
logger.info(f"PyPI proxy: following redirect to {redirect_url}")
|
||||
|
||||
# Don't send auth to different hosts
|
||||
redirect_headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
||||
redirect_auth = None
|
||||
if urlparse(redirect_url).netloc == urlparse(upstream_url).netloc:
|
||||
redirect_headers.update(headers)
|
||||
redirect_auth = auth
|
||||
|
||||
response = await client.get(
|
||||
upstream_url,
|
||||
headers=headers,
|
||||
auth=auth,
|
||||
redirect_url,
|
||||
headers=redirect_headers,
|
||||
auth=redirect_auth,
|
||||
follow_redirects=False,
|
||||
timeout=download_timeout,
|
||||
)
|
||||
redirect_count += 1
|
||||
|
||||
if response.status_code != 200:
|
||||
# Parse upstream error for policy/blocking messages
|
||||
error_detail = _parse_upstream_error(response)
|
||||
logger.warning(f"PyPI proxy: upstream returned {response.status_code} for {filename}: {error_detail}")
|
||||
raise HTTPException(
|
||||
status_code=response.status_code,
|
||||
detail=f"Upstream error: {error_detail}"
|
||||
)
|
||||
|
||||
# Handle redirects manually
|
||||
redirect_count = 0
|
||||
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
|
||||
redirect_url = response.headers.get('location')
|
||||
if not redirect_url:
|
||||
break
|
||||
content_type = response.headers.get('content-type', 'application/octet-stream')
|
||||
|
||||
if not redirect_url.startswith('http'):
|
||||
redirect_url = urljoin(upstream_url, redirect_url)
|
||||
# Stream to temp file to avoid loading large packages into memory
|
||||
# This keeps memory usage constant regardless of package size
|
||||
# Using async iteration to avoid blocking the event loop
|
||||
tmp_path = None
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file:
|
||||
tmp_path = tmp_file.name
|
||||
async for chunk in response.aiter_bytes(chunk_size=65536): # 64KB chunks
|
||||
tmp_file.write(chunk)
|
||||
|
||||
logger.info(f"PyPI proxy: following redirect to {redirect_url}")
|
||||
# Store in S3 from temp file (computes hash and deduplicates automatically)
|
||||
with open(tmp_path, 'rb') as f:
|
||||
result = storage.store(f)
|
||||
sha256 = result.sha256
|
||||
size = result.size
|
||||
s3_key = result.s3_key
|
||||
|
||||
# Don't send auth to different hosts
|
||||
redirect_headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
||||
redirect_auth = None
|
||||
if urlparse(redirect_url).netloc == urlparse(upstream_url).netloc:
|
||||
redirect_headers.update(headers)
|
||||
redirect_auth = auth
|
||||
# Extract dependencies from the temp file before cleaning up
|
||||
extracted_deps = _extract_dependencies_from_file(tmp_path, filename)
|
||||
if extracted_deps:
|
||||
logger.info(f"PyPI proxy: extracted {len(extracted_deps)} dependencies from {filename}")
|
||||
|
||||
response = await client.get(
|
||||
redirect_url,
|
||||
headers=redirect_headers,
|
||||
auth=redirect_auth,
|
||||
follow_redirects=False,
|
||||
)
|
||||
redirect_count += 1
|
||||
|
||||
if response.status_code != 200:
|
||||
# Parse upstream error for policy/blocking messages
|
||||
error_detail = _parse_upstream_error(response)
|
||||
logger.warning(f"PyPI proxy: upstream returned {response.status_code} for {filename}: {error_detail}")
|
||||
raise HTTPException(
|
||||
status_code=response.status_code,
|
||||
detail=f"Upstream error: {error_detail}"
|
||||
)
|
||||
|
||||
content_type = response.headers.get('content-type', 'application/octet-stream')
|
||||
|
||||
# Stream to temp file to avoid loading large packages into memory
|
||||
# This keeps memory usage constant regardless of package size
|
||||
# Using async iteration to avoid blocking the event loop
|
||||
tmp_path = None
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file:
|
||||
tmp_path = tmp_file.name
|
||||
async for chunk in response.aiter_bytes(chunk_size=65536): # 64KB chunks
|
||||
tmp_file.write(chunk)
|
||||
|
||||
# Store in S3 from temp file (computes hash and deduplicates automatically)
|
||||
with open(tmp_path, 'rb') as f:
|
||||
result = storage.store(f)
|
||||
sha256 = result.sha256
|
||||
size = result.size
|
||||
s3_key = result.s3_key
|
||||
|
||||
# Extract dependencies from the temp file before cleaning up
|
||||
extracted_deps = _extract_dependencies_from_file(tmp_path, filename)
|
||||
if extracted_deps:
|
||||
logger.info(f"PyPI proxy: extracted {len(extracted_deps)} dependencies from {filename}")
|
||||
|
||||
logger.info(f"PyPI proxy: downloaded {filename}, {size} bytes, sha256={sha256[:12]}")
|
||||
finally:
|
||||
# Clean up temp file
|
||||
if tmp_path and os.path.exists(tmp_path):
|
||||
os.unlink(tmp_path)
|
||||
logger.info(f"PyPI proxy: downloaded {filename}, {size} bytes, sha256={sha256[:12]}")
|
||||
finally:
|
||||
# Clean up temp file
|
||||
if tmp_path and os.path.exists(tmp_path):
|
||||
os.unlink(tmp_path)
|
||||
|
||||
# Check if artifact already exists
|
||||
existing = db.query(Artifact).filter(Artifact.id == sha256).first()
|
||||
@@ -821,7 +895,7 @@ async def pypi_download_file(
|
||||
)
|
||||
db.add(cached_url_record)
|
||||
|
||||
# Store extracted dependencies (deduplicate first - METADATA can list same dep under multiple extras)
|
||||
# Store extracted dependencies using batch operation
|
||||
if extracted_deps:
|
||||
# Deduplicate: keep first version constraint seen for each package name
|
||||
seen_deps: dict[str, str] = {}
|
||||
@@ -829,22 +903,17 @@ async def pypi_download_file(
|
||||
if dep_name not in seen_deps:
|
||||
seen_deps[dep_name] = dep_version if dep_version else "*"
|
||||
|
||||
for dep_name, dep_version in seen_deps.items():
|
||||
# Check if this dependency already exists for this artifact
|
||||
existing_dep = db.query(ArtifactDependency).filter(
|
||||
ArtifactDependency.artifact_id == sha256,
|
||||
ArtifactDependency.dependency_project == "_pypi",
|
||||
ArtifactDependency.dependency_package == dep_name,
|
||||
).first()
|
||||
# Convert to list of tuples for batch insert
|
||||
deps_to_store = [
|
||||
("_pypi", dep_name, dep_version)
|
||||
for dep_name, dep_version in seen_deps.items()
|
||||
]
|
||||
|
||||
if not existing_dep:
|
||||
dep = ArtifactDependency(
|
||||
artifact_id=sha256,
|
||||
dependency_project="_pypi",
|
||||
dependency_package=dep_name,
|
||||
version_constraint=dep_version,
|
||||
)
|
||||
db.add(dep)
|
||||
# Batch upsert - handles duplicates with ON CONFLICT DO NOTHING
|
||||
repo = ArtifactRepository(db)
|
||||
inserted = repo.batch_upsert_dependencies(sha256, deps_to_store)
|
||||
if inserted > 0:
|
||||
logger.debug(f"Stored {inserted} dependencies for {sha256[:12]}...")
|
||||
|
||||
db.commit()
|
||||
|
||||
|
||||
@@ -421,7 +421,8 @@ def _log_audit(
|
||||
|
||||
# Health check
|
||||
@router.get("/health", response_model=HealthResponse)
|
||||
def health_check(
|
||||
async def health_check(
|
||||
request: Request,
|
||||
db: Session = Depends(get_db),
|
||||
storage: S3Storage = Depends(get_storage),
|
||||
):
|
||||
@@ -449,11 +450,30 @@ def health_check(
|
||||
|
||||
overall_status = "ok" if (storage_healthy and database_healthy) else "degraded"
|
||||
|
||||
return HealthResponse(
|
||||
status=overall_status,
|
||||
storage_healthy=storage_healthy,
|
||||
database_healthy=database_healthy,
|
||||
)
|
||||
# Build response with optional infrastructure status
|
||||
response_data = {
|
||||
"status": overall_status,
|
||||
"storage_healthy": storage_healthy,
|
||||
"database_healthy": database_healthy,
|
||||
}
|
||||
|
||||
# Add HTTP pool status if available
|
||||
if hasattr(request.app.state, 'http_client'):
|
||||
http_client = request.app.state.http_client
|
||||
response_data["http_pool"] = {
|
||||
"pool_size": http_client.pool_size,
|
||||
"worker_threads": http_client.executor_max,
|
||||
}
|
||||
|
||||
# Add cache status if available
|
||||
if hasattr(request.app.state, 'cache'):
|
||||
cache = request.app.state.cache
|
||||
response_data["cache"] = {
|
||||
"enabled": cache.enabled,
|
||||
"connected": await cache.ping() if cache.enabled else False,
|
||||
}
|
||||
|
||||
return HealthResponse(**response_data)
|
||||
|
||||
|
||||
# --- Authentication Routes ---
|
||||
|
||||
@@ -493,6 +493,8 @@ class HealthResponse(BaseModel):
|
||||
version: str = "1.0.0"
|
||||
storage_healthy: Optional[bool] = None
|
||||
database_healthy: Optional[bool] = None
|
||||
http_pool: Optional[Dict[str, Any]] = None
|
||||
cache: Optional[Dict[str, Any]] = None
|
||||
|
||||
|
||||
# Garbage collection schemas
|
||||
|
||||
@@ -12,6 +12,7 @@ passlib[bcrypt]==1.7.4
|
||||
bcrypt==4.0.1
|
||||
slowapi==0.1.9
|
||||
httpx>=0.25.0
|
||||
redis>=5.0.0
|
||||
|
||||
# Test dependencies
|
||||
pytest>=7.4.0
|
||||
|
||||
@@ -135,3 +135,19 @@ class TestPyPIPackageNormalization:
|
||||
assert "text/html" in response.headers.get("content-type", "")
|
||||
elif response.status_code == 503:
|
||||
assert "No PyPI upstream sources configured" in response.json()["detail"]
|
||||
|
||||
|
||||
class TestPyPIProxyInfrastructure:
|
||||
"""Tests for PyPI proxy infrastructure integration."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_health_endpoint_includes_infrastructure(self, integration_client):
|
||||
"""Health endpoint should report infrastructure status."""
|
||||
response = integration_client.get("/health")
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
assert data["status"] == "ok"
|
||||
# Infrastructure status should be present
|
||||
assert "http_pool" in data
|
||||
assert "cache" in data
|
||||
|
||||
374
backend/tests/unit/test_cache_service.py
Normal file
374
backend/tests/unit/test_cache_service.py
Normal file
@@ -0,0 +1,374 @@
|
||||
"""Tests for CacheService."""
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, AsyncMock, patch
|
||||
|
||||
|
||||
class TestCacheCategory:
|
||||
"""Tests for cache category enum."""
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_immutable_categories_have_no_ttl(self):
|
||||
"""Immutable categories should return None for TTL."""
|
||||
from app.cache_service import CacheCategory, get_category_ttl
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
|
||||
assert get_category_ttl(CacheCategory.ARTIFACT_METADATA, settings) is None
|
||||
assert get_category_ttl(CacheCategory.ARTIFACT_DEPENDENCIES, settings) is None
|
||||
assert get_category_ttl(CacheCategory.DEPENDENCY_RESOLUTION, settings) is None
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_mutable_categories_have_ttl(self):
|
||||
"""Mutable categories should return configured TTL."""
|
||||
from app.cache_service import CacheCategory, get_category_ttl
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(
|
||||
cache_ttl_index=300,
|
||||
cache_ttl_upstream=3600,
|
||||
)
|
||||
|
||||
assert get_category_ttl(CacheCategory.PACKAGE_INDEX, settings) == 300
|
||||
assert get_category_ttl(CacheCategory.UPSTREAM_SOURCES, settings) == 3600
|
||||
|
||||
|
||||
class TestCacheService:
|
||||
"""Tests for Redis cache service."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_disabled_cache_returns_none(self):
|
||||
"""When Redis disabled, get() should return None."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
await cache.startup()
|
||||
|
||||
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key")
|
||||
|
||||
assert result is None
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_disabled_cache_set_is_noop(self):
|
||||
"""When Redis disabled, set() should be a no-op."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
await cache.startup()
|
||||
|
||||
# Should not raise
|
||||
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"test-value")
|
||||
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_cache_key_namespacing(self):
|
||||
"""Cache keys should be properly namespaced."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
|
||||
key = CacheService._make_key(CacheCategory.PACKAGE_INDEX, "pypi", "numpy")
|
||||
|
||||
assert key == "orchard:index:pypi:numpy"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_ping_returns_false_when_disabled(self):
|
||||
"""ping() should return False when Redis is disabled."""
|
||||
from app.cache_service import CacheService
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
await cache.startup()
|
||||
|
||||
result = await cache.ping()
|
||||
|
||||
assert result is False
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_enabled_property(self):
|
||||
"""enabled property should reflect Redis state."""
|
||||
from app.cache_service import CacheService
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
|
||||
assert cache.enabled is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_delete_is_noop_when_disabled(self):
|
||||
"""delete() should be a no-op when Redis is disabled."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
await cache.startup()
|
||||
|
||||
# Should not raise
|
||||
await cache.delete(CacheCategory.PACKAGE_INDEX, "test-key")
|
||||
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_invalidate_pattern_returns_zero_when_disabled(self):
|
||||
"""invalidate_pattern() should return 0 when Redis is disabled."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
await cache.startup()
|
||||
|
||||
result = await cache.invalidate_pattern(CacheCategory.PACKAGE_INDEX)
|
||||
|
||||
assert result == 0
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_startup_already_started(self):
|
||||
"""startup() should be idempotent."""
|
||||
from app.cache_service import CacheService
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
await cache.startup()
|
||||
await cache.startup() # Should not raise
|
||||
|
||||
assert cache._started is True
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_shutdown_not_started(self):
|
||||
"""shutdown() should handle not-started state."""
|
||||
from app.cache_service import CacheService
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
|
||||
# Should not raise
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_make_key_with_default_protocol(self):
|
||||
"""_make_key should work with default protocol."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
|
||||
key = CacheService._make_key(CacheCategory.ARTIFACT_METADATA, "default", "abc123")
|
||||
|
||||
assert key == "orchard:artifact:default:abc123"
|
||||
|
||||
|
||||
class TestCacheServiceWithMockedRedis:
|
||||
"""Tests for CacheService with mocked Redis client."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_get_returns_cached_value(self):
|
||||
"""get() should return cached value when available."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
# Mock the redis client
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.get.return_value = b"cached-data"
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key", "pypi")
|
||||
|
||||
assert result == b"cached-data"
|
||||
mock_redis.get.assert_called_once_with("orchard:index:pypi:test-key")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_set_with_ttl(self):
|
||||
"""set() should use setex for mutable categories."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True, cache_ttl_index=300)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"test-value", "pypi")
|
||||
|
||||
mock_redis.setex.assert_called_once_with(
|
||||
"orchard:index:pypi:test-key", 300, b"test-value"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_set_without_ttl(self):
|
||||
"""set() should use set (no expiry) for immutable categories."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
await cache.set(
|
||||
CacheCategory.ARTIFACT_METADATA, "abc123", b"metadata", "pypi"
|
||||
)
|
||||
|
||||
mock_redis.set.assert_called_once_with(
|
||||
"orchard:artifact:pypi:abc123", b"metadata"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_delete_calls_redis_delete(self):
|
||||
"""delete() should call Redis delete."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
await cache.delete(CacheCategory.PACKAGE_INDEX, "test-key", "pypi")
|
||||
|
||||
mock_redis.delete.assert_called_once_with("orchard:index:pypi:test-key")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_invalidate_pattern_deletes_matching_keys(self):
|
||||
"""invalidate_pattern() should delete all matching keys."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
|
||||
# Create an async generator for scan_iter
|
||||
async def mock_scan_iter(match=None):
|
||||
for key in [b"orchard:index:pypi:numpy", b"orchard:index:pypi:requests"]:
|
||||
yield key
|
||||
|
||||
mock_redis.scan_iter = mock_scan_iter
|
||||
mock_redis.delete.return_value = 2
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
result = await cache.invalidate_pattern(CacheCategory.PACKAGE_INDEX, "*", "pypi")
|
||||
|
||||
assert result == 2
|
||||
mock_redis.delete.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_ping_returns_true_when_connected(self):
|
||||
"""ping() should return True when Redis responds."""
|
||||
from app.cache_service import CacheService
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.ping.return_value = True
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
result = await cache.ping()
|
||||
|
||||
assert result is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_get_handles_exception(self):
|
||||
"""get() should return None and log warning on exception."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.get.side_effect = Exception("Connection lost")
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key")
|
||||
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_set_handles_exception(self):
|
||||
"""set() should log warning on exception."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True, cache_ttl_index=300)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.setex.side_effect = Exception("Connection lost")
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
# Should not raise
|
||||
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"value")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_ping_returns_false_on_exception(self):
|
||||
"""ping() should return False on exception."""
|
||||
from app.cache_service import CacheService
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.ping.side_effect = Exception("Connection lost")
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
result = await cache.ping()
|
||||
|
||||
assert result is False
|
||||
|
||||
167
backend/tests/unit/test_db_utils.py
Normal file
167
backend/tests/unit/test_db_utils.py
Normal file
@@ -0,0 +1,167 @@
|
||||
"""Tests for database utility functions."""
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
class TestArtifactRepository:
|
||||
"""Tests for ArtifactRepository."""
|
||||
|
||||
def test_batch_dependency_values_formatting(self):
|
||||
"""batch_upsert_dependencies should format values correctly."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
deps = [
|
||||
("_pypi", "numpy", ">=1.21.0"),
|
||||
("_pypi", "requests", "*"),
|
||||
("myproject", "mylib", "==1.0.0"),
|
||||
]
|
||||
|
||||
values = ArtifactRepository._format_dependency_values("abc123", deps)
|
||||
|
||||
assert len(values) == 3
|
||||
assert values[0] == {
|
||||
"artifact_id": "abc123",
|
||||
"dependency_project": "_pypi",
|
||||
"dependency_package": "numpy",
|
||||
"version_constraint": ">=1.21.0",
|
||||
}
|
||||
assert values[2]["dependency_project"] == "myproject"
|
||||
|
||||
def test_empty_dependencies_returns_empty_list(self):
|
||||
"""Empty dependency list should return empty values."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
values = ArtifactRepository._format_dependency_values("abc123", [])
|
||||
|
||||
assert values == []
|
||||
|
||||
def test_format_dependency_values_preserves_special_characters(self):
|
||||
"""Version constraints with special characters should be preserved."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
deps = [
|
||||
("_pypi", "package-name", ">=1.0.0,<2.0.0"),
|
||||
("_pypi", "another_pkg", "~=1.4.2"),
|
||||
]
|
||||
|
||||
values = ArtifactRepository._format_dependency_values("hash123", deps)
|
||||
|
||||
assert values[0]["version_constraint"] == ">=1.0.0,<2.0.0"
|
||||
assert values[1]["version_constraint"] == "~=1.4.2"
|
||||
|
||||
def test_batch_upsert_dependencies_returns_zero_for_empty(self):
|
||||
"""batch_upsert_dependencies should return 0 for empty list without DB call."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
mock_db = MagicMock()
|
||||
repo = ArtifactRepository(mock_db)
|
||||
|
||||
result = repo.batch_upsert_dependencies("abc123", [])
|
||||
|
||||
assert result == 0
|
||||
# Verify no DB operations were performed
|
||||
mock_db.execute.assert_not_called()
|
||||
|
||||
def test_get_or_create_artifact_builds_correct_statement(self):
|
||||
"""get_or_create_artifact should use ON CONFLICT DO UPDATE."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
from app.models import Artifact
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_artifact = MagicMock()
|
||||
mock_artifact.ref_count = 1
|
||||
mock_result.scalar_one.return_value = mock_artifact
|
||||
mock_db.execute.return_value = mock_result
|
||||
|
||||
repo = ArtifactRepository(mock_db)
|
||||
artifact, created = repo.get_or_create_artifact(
|
||||
sha256="abc123def456",
|
||||
size=1024,
|
||||
filename="test.whl",
|
||||
content_type="application/zip",
|
||||
)
|
||||
|
||||
assert mock_db.execute.called
|
||||
assert created is True
|
||||
assert artifact == mock_artifact
|
||||
|
||||
def test_get_or_create_artifact_existing_not_created(self):
|
||||
"""get_or_create_artifact should return created=False for existing artifact."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_artifact = MagicMock()
|
||||
mock_artifact.ref_count = 5 # Existing artifact with ref_count > 1
|
||||
mock_result.scalar_one.return_value = mock_artifact
|
||||
mock_db.execute.return_value = mock_result
|
||||
|
||||
repo = ArtifactRepository(mock_db)
|
||||
artifact, created = repo.get_or_create_artifact(
|
||||
sha256="abc123def456",
|
||||
size=1024,
|
||||
filename="test.whl",
|
||||
)
|
||||
|
||||
assert created is False
|
||||
|
||||
def test_get_cached_url_with_artifact_returns_tuple(self):
|
||||
"""get_cached_url_with_artifact should return (CachedUrl, Artifact) tuple."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_cached_url = MagicMock()
|
||||
mock_artifact = MagicMock()
|
||||
mock_db.query.return_value.join.return_value.filter.return_value.first.return_value = (
|
||||
mock_cached_url,
|
||||
mock_artifact,
|
||||
)
|
||||
|
||||
repo = ArtifactRepository(mock_db)
|
||||
result = repo.get_cached_url_with_artifact("url_hash_123")
|
||||
|
||||
assert result == (mock_cached_url, mock_artifact)
|
||||
|
||||
def test_get_cached_url_with_artifact_returns_none_when_not_found(self):
|
||||
"""get_cached_url_with_artifact should return None when URL not cached."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_db.query.return_value.join.return_value.filter.return_value.first.return_value = None
|
||||
|
||||
repo = ArtifactRepository(mock_db)
|
||||
result = repo.get_cached_url_with_artifact("nonexistent_hash")
|
||||
|
||||
assert result is None
|
||||
|
||||
def test_get_artifact_dependencies_returns_list(self):
|
||||
"""get_artifact_dependencies should return list of dependencies."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_dep1 = MagicMock()
|
||||
mock_dep2 = MagicMock()
|
||||
mock_db.query.return_value.filter.return_value.all.return_value = [
|
||||
mock_dep1,
|
||||
mock_dep2,
|
||||
]
|
||||
|
||||
repo = ArtifactRepository(mock_db)
|
||||
result = repo.get_artifact_dependencies("artifact_hash_123")
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0] == mock_dep1
|
||||
assert result[1] == mock_dep2
|
||||
|
||||
def test_get_artifact_dependencies_returns_empty_list(self):
|
||||
"""get_artifact_dependencies should return empty list when no dependencies."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_db.query.return_value.filter.return_value.all.return_value = []
|
||||
|
||||
repo = ArtifactRepository(mock_db)
|
||||
result = repo.get_artifact_dependencies("artifact_without_deps")
|
||||
|
||||
assert result == []
|
||||
194
backend/tests/unit/test_http_client.py
Normal file
194
backend/tests/unit/test_http_client.py
Normal file
@@ -0,0 +1,194 @@
|
||||
"""Tests for HttpClientManager."""
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, AsyncMock, patch
|
||||
|
||||
|
||||
class TestHttpClientManager:
|
||||
"""Tests for HTTP client pool management."""
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_manager_initializes_with_settings(self):
|
||||
"""Manager should initialize with config settings."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(
|
||||
http_max_connections=50,
|
||||
http_connect_timeout=15.0,
|
||||
)
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
assert manager.max_connections == 50
|
||||
assert manager.connect_timeout == 15.0
|
||||
assert manager._default_client is None # Not started yet
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_startup_creates_client(self):
|
||||
"""Startup should create the default async client."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
await manager.startup()
|
||||
|
||||
assert manager._default_client is not None
|
||||
|
||||
await manager.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_shutdown_closes_client(self):
|
||||
"""Shutdown should close all clients gracefully."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
await manager.startup()
|
||||
client = manager._default_client
|
||||
|
||||
await manager.shutdown()
|
||||
|
||||
assert manager._default_client is None
|
||||
assert client.is_closed
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_get_client_returns_default(self):
|
||||
"""get_client() should return the default client."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
await manager.startup()
|
||||
|
||||
client = manager.get_client()
|
||||
|
||||
assert client is manager._default_client
|
||||
|
||||
await manager.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_get_client_raises_if_not_started(self):
|
||||
"""get_client() should raise RuntimeError if manager not started."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
with pytest.raises(RuntimeError, match="not started"):
|
||||
manager.get_client()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_run_blocking_executes_in_thread_pool(self):
|
||||
"""run_blocking should execute sync functions in thread pool."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
import threading
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
await manager.startup()
|
||||
|
||||
main_thread = threading.current_thread()
|
||||
execution_thread = None
|
||||
|
||||
def blocking_func():
|
||||
nonlocal execution_thread
|
||||
execution_thread = threading.current_thread()
|
||||
return "result"
|
||||
|
||||
result = await manager.run_blocking(blocking_func)
|
||||
|
||||
assert result == "result"
|
||||
assert execution_thread is not main_thread
|
||||
|
||||
await manager.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_run_blocking_raises_if_not_started(self):
|
||||
"""run_blocking should raise RuntimeError if manager not started."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
with pytest.raises(RuntimeError, match="not started"):
|
||||
await manager.run_blocking(lambda: None)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_startup_idempotent(self):
|
||||
"""Calling startup multiple times should be safe."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
await manager.startup()
|
||||
client1 = manager._default_client
|
||||
|
||||
await manager.startup() # Should not create a new client
|
||||
client2 = manager._default_client
|
||||
|
||||
assert client1 is client2 # Same client instance
|
||||
|
||||
await manager.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_shutdown_idempotent(self):
|
||||
"""Calling shutdown multiple times should be safe."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
await manager.startup()
|
||||
await manager.shutdown()
|
||||
await manager.shutdown() # Should not raise
|
||||
|
||||
assert manager._default_client is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_properties_return_configured_values(self):
|
||||
"""Properties should return configured values."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(
|
||||
http_max_connections=75,
|
||||
http_worker_threads=16,
|
||||
)
|
||||
manager = HttpClientManager(settings)
|
||||
await manager.startup()
|
||||
|
||||
assert manager.pool_size == 75
|
||||
assert manager.executor_max == 16
|
||||
|
||||
await manager.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_active_connections_when_not_started(self):
|
||||
"""active_connections should return 0 when not started."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
assert manager.active_connections == 0
|
||||
@@ -228,7 +228,7 @@ minioIngress:
|
||||
secretName: minio-tls # Overridden by CI
|
||||
|
||||
redis:
|
||||
enabled: false
|
||||
enabled: true
|
||||
|
||||
waitForDatabase: true
|
||||
|
||||
|
||||
@@ -140,7 +140,7 @@ minioIngress:
|
||||
enabled: false
|
||||
|
||||
redis:
|
||||
enabled: false
|
||||
enabled: true
|
||||
|
||||
waitForDatabase: true
|
||||
|
||||
|
||||
@@ -146,7 +146,7 @@ minioIngress:
|
||||
|
||||
# Redis subchart configuration (for future caching)
|
||||
redis:
|
||||
enabled: false
|
||||
enabled: true
|
||||
image:
|
||||
registry: containers.global.bsf.tools
|
||||
repository: bitnami/redis
|
||||
|
||||
Reference in New Issue
Block a user