14 Commits

Author SHA1 Message Date
Mondo Diaz
632bf54087 fix: correct test imports and health endpoint assertions
- Fix import in test_db_utils.py: use app.models instead of backend.app.models
- Update health endpoint test to expect 'ok' status and infrastructure keys
- Add CHANGELOG entries for PyPI proxy performance improvements
2026-02-04 10:37:12 -06:00
Mondo Diaz
170561b32a feat: add infrastructure status to health endpoint 2026-02-04 09:54:45 -06:00
Mondo Diaz
6e05697ae2 infra: enable Redis in Helm chart values for all environments 2026-02-04 09:53:38 -06:00
Mondo Diaz
08b6589712 test: add infrastructure integration tests for pypi_proxy 2026-02-04 09:53:02 -06:00
Mondo Diaz
7ad5a15ef4 perf: use batch dependency storage in pypi_proxy 2026-02-04 09:52:16 -06:00
Mondo Diaz
8fdb73901e perf: use shared HTTP client pool in pypi_download_file 2026-02-04 09:51:05 -06:00
Mondo Diaz
79dd7b833e perf: cache upstream sources lookup in pypi_proxy 2026-02-04 09:49:59 -06:00
Mondo Diaz
71089aee0e refactor: add infrastructure dependency injection to pypi_proxy
Add dependency injection helper functions for HttpClientManager
and CacheService, along with imports for the new infrastructure
modules (http_client, cache_service, db_utils).
2026-02-04 09:49:04 -06:00
Mondo Diaz
ffe0529ea8 feat: add ArtifactRepository with batch DB operations
Add optimized database operations for artifact storage:
- Atomic upserts using ON CONFLICT for artifact creation
- Batch inserts for dependencies to eliminate N+1 queries
- Joined queries for cached URL lookups
- All methods include comprehensive unit tests
2026-02-04 09:48:08 -06:00
Mondo Diaz
146ca2ad74 feat: integrate HttpClientManager and CacheService into lifespan 2026-02-04 09:45:09 -06:00
Mondo Diaz
a045509fe4 feat: add CacheService with Redis caching and graceful fallback
Implements Redis-backed caching with category-aware TTL management:
- Immutable categories (artifact metadata, dependencies) cached forever
- Mutable categories (index pages, upstream sources) use configurable TTL
- Graceful fallback when Redis unavailable or disabled
- Pattern-based invalidation for bulk cache clearing
2026-02-04 09:44:12 -06:00
Mondo Diaz
14806b05f0 feat: add HttpClientManager with connection pooling
Add HttpClientManager class for managing httpx.AsyncClient pools with
FastAPI lifespan integration. Features include:
- Default shared connection pool for general requests
- Configurable max connections, keep-alive, and timeouts
- Dedicated thread pool for blocking I/O operations
- Graceful startup/shutdown lifecycle management
- Per-upstream client isolation support (for future use)

Includes comprehensive unit tests covering initialization, startup,
shutdown, client retrieval, blocking operations, idempotency, and
error handling.
2026-02-04 09:16:27 -06:00
Mondo Diaz
c67004af52 config: add HTTP pool, Redis, and updated DB pool settings 2026-02-04 09:12:01 -06:00
Mondo Diaz
8c6ba01a73 deps: add redis-py for caching layer 2026-02-04 09:11:12 -06:00
17 changed files with 1614 additions and 94 deletions

View File

@@ -7,6 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added ### Added
- Added HTTP connection pooling infrastructure for improved PyPI proxy performance
- `HttpClientManager` with configurable pool size, timeouts, and thread pool executor
- Eliminates per-request connection overhead (~100-500ms → ~5ms)
- Added Redis caching layer with category-aware TTL for hermetic builds
- `CacheService` with graceful fallback when Redis unavailable
- Immutable data (artifact metadata, dependencies) cached forever
- Mutable data (package index, versions) uses configurable TTL
- Added `ArtifactRepository` for batch database operations
- `batch_upsert_dependencies()` reduces N+1 queries to single INSERT
- `get_or_create_artifact()` uses atomic ON CONFLICT upsert
- Added infrastructure status to health endpoint (`/health`)
- Reports HTTP pool size and worker threads
- Reports Redis cache connection status
- Added new configuration settings for HTTP client, Redis, and cache TTL
- `ORCHARD_HTTP_MAX_CONNECTIONS`, `ORCHARD_HTTP_CONNECT_TIMEOUT`, etc.
- `ORCHARD_REDIS_HOST`, `ORCHARD_REDIS_PORT`, `ORCHARD_REDIS_ENABLED`
- `ORCHARD_CACHE_TTL_INDEX`, `ORCHARD_CACHE_TTL_VERSIONS`, etc.
- Added transparent PyPI proxy implementing PEP 503 Simple API (#108) - Added transparent PyPI proxy implementing PEP 503 Simple API (#108)
- `GET /pypi/simple/` - package index (proxied from upstream) - `GET /pypi/simple/` - package index (proxied from upstream)
- `GET /pypi/simple/{package}/` - version list with rewritten download links - `GET /pypi/simple/{package}/` - version list with rewritten download links
@@ -16,6 +33,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `POST /api/v1/cache/resolve` endpoint to cache packages by coordinates instead of URL (#108) - Added `POST /api/v1/cache/resolve` endpoint to cache packages by coordinates instead of URL (#108)
### Changed ### Changed
- PyPI proxy now uses shared HTTP connection pool instead of per-request clients
- PyPI proxy now caches upstream source configuration in Redis
- Dependency storage now uses batch INSERT instead of individual queries
- Increased default database pool size from 5 to 20 connections
- Increased default database max overflow from 10 to 30 connections
- Enabled Redis in Helm chart values for dev, stage, and prod environments
- Upstream sources table text is now centered under column headers (#108) - Upstream sources table text is now centered under column headers (#108)
- ENV badge now appears inline with source name instead of separate column (#108) - ENV badge now appears inline with source name instead of separate column (#108)
- Test and Edit buttons now have more prominent button styling (#108) - Test and Edit buttons now have more prominent button styling (#108)

View File

@@ -0,0 +1,262 @@
"""
Redis-backed caching service with category-aware TTL and invalidation.
Provides:
- Immutable caching for artifact data (hermetic builds)
- TTL-based caching for discovery data
- Event-driven invalidation for config changes
- Graceful fallback when Redis unavailable
"""
import logging
from enum import Enum
from typing import Optional
from .config import Settings
logger = logging.getLogger(__name__)
class CacheCategory(Enum):
"""
Cache categories with different TTL and invalidation rules.
Immutable (cache forever):
- ARTIFACT_METADATA: Artifact info by SHA256
- ARTIFACT_DEPENDENCIES: Extracted deps by SHA256
- DEPENDENCY_RESOLUTION: Resolution results by input hash
Mutable (TTL + event invalidation):
- UPSTREAM_SOURCES: Upstream config, invalidate on DB change
- PACKAGE_INDEX: PyPI/npm index pages, TTL only
- PACKAGE_VERSIONS: Version listings, TTL only
"""
# Immutable - cache forever (hermetic builds)
ARTIFACT_METADATA = "artifact"
ARTIFACT_DEPENDENCIES = "deps"
DEPENDENCY_RESOLUTION = "resolve"
# Mutable - TTL + event invalidation
UPSTREAM_SOURCES = "upstream"
PACKAGE_INDEX = "index"
PACKAGE_VERSIONS = "versions"
def get_category_ttl(category: CacheCategory, settings: Settings) -> Optional[int]:
"""
Get TTL for a cache category.
Returns:
TTL in seconds, or None for no expiry (immutable).
"""
ttl_map = {
# Immutable - no TTL
CacheCategory.ARTIFACT_METADATA: None,
CacheCategory.ARTIFACT_DEPENDENCIES: None,
CacheCategory.DEPENDENCY_RESOLUTION: None,
# Mutable - configurable TTL
CacheCategory.UPSTREAM_SOURCES: settings.cache_ttl_upstream,
CacheCategory.PACKAGE_INDEX: settings.cache_ttl_index,
CacheCategory.PACKAGE_VERSIONS: settings.cache_ttl_versions,
}
return ttl_map.get(category)
class CacheService:
"""
Redis-backed caching with category-aware TTL.
Key format: orchard:{category}:{protocol}:{identifier}
Example: orchard:deps:pypi:abc123def456
When Redis is disabled or unavailable, operations gracefully
return None/no-op to allow the application to function without caching.
"""
def __init__(self, settings: Settings):
self._settings = settings
self._enabled = settings.redis_enabled
self._redis: Optional["redis.asyncio.Redis"] = None
self._started = False
async def startup(self) -> None:
"""Initialize Redis connection. Called by FastAPI lifespan."""
if self._started:
return
if not self._enabled:
logger.info("CacheService disabled (redis_enabled=False)")
self._started = True
return
try:
import redis.asyncio as redis
logger.info(
f"Connecting to Redis at {self._settings.redis_host}:"
f"{self._settings.redis_port}/{self._settings.redis_db}"
)
self._redis = redis.Redis(
host=self._settings.redis_host,
port=self._settings.redis_port,
db=self._settings.redis_db,
password=self._settings.redis_password,
decode_responses=False, # We handle bytes
)
# Test connection
await self._redis.ping()
logger.info("CacheService connected to Redis")
except ImportError:
logger.warning("redis package not installed, caching disabled")
self._enabled = False
except Exception as e:
logger.warning(f"Redis connection failed, caching disabled: {e}")
self._enabled = False
self._redis = None
self._started = True
async def shutdown(self) -> None:
"""Close Redis connection. Called by FastAPI lifespan."""
if not self._started:
return
if self._redis:
await self._redis.aclose()
self._redis = None
self._started = False
logger.info("CacheService shutdown complete")
@staticmethod
def _make_key(category: CacheCategory, protocol: str, identifier: str) -> str:
"""Build namespaced cache key."""
return f"orchard:{category.value}:{protocol}:{identifier}"
async def get(
self,
category: CacheCategory,
key: str,
protocol: str = "default",
) -> Optional[bytes]:
"""
Get cached value.
Args:
category: Cache category for TTL rules
key: Unique identifier within category
protocol: Protocol namespace (pypi, npm, etc.)
Returns:
Cached bytes or None if not found/disabled.
"""
if not self._enabled or not self._redis:
return None
try:
full_key = self._make_key(category, protocol, key)
return await self._redis.get(full_key)
except Exception as e:
logger.warning(f"Cache get failed for {key}: {e}")
return None
async def set(
self,
category: CacheCategory,
key: str,
value: bytes,
protocol: str = "default",
) -> None:
"""
Set cached value with category-appropriate TTL.
Args:
category: Cache category for TTL rules
key: Unique identifier within category
value: Bytes to cache
protocol: Protocol namespace (pypi, npm, etc.)
"""
if not self._enabled or not self._redis:
return
try:
full_key = self._make_key(category, protocol, key)
ttl = get_category_ttl(category, self._settings)
if ttl is None:
await self._redis.set(full_key, value)
else:
await self._redis.setex(full_key, ttl, value)
except Exception as e:
logger.warning(f"Cache set failed for {key}: {e}")
async def delete(
self,
category: CacheCategory,
key: str,
protocol: str = "default",
) -> None:
"""Delete a specific cache entry."""
if not self._enabled or not self._redis:
return
try:
full_key = self._make_key(category, protocol, key)
await self._redis.delete(full_key)
except Exception as e:
logger.warning(f"Cache delete failed for {key}: {e}")
async def invalidate_pattern(
self,
category: CacheCategory,
pattern: str = "*",
protocol: str = "default",
) -> int:
"""
Invalidate all entries matching pattern.
Args:
category: Cache category
pattern: Glob pattern for keys (default "*" = all in category)
protocol: Protocol namespace
Returns:
Number of keys deleted.
"""
if not self._enabled or not self._redis:
return 0
try:
full_pattern = self._make_key(category, protocol, pattern)
keys = []
async for key in self._redis.scan_iter(match=full_pattern):
keys.append(key)
if keys:
return await self._redis.delete(*keys)
return 0
except Exception as e:
logger.warning(f"Cache invalidate failed for pattern {pattern}: {e}")
return 0
async def ping(self) -> bool:
"""Check if Redis is connected and responding."""
if not self._enabled or not self._redis:
return False
try:
await self._redis.ping()
return True
except Exception:
return False
@property
def enabled(self) -> bool:
"""Check if caching is enabled."""
return self._enabled

View File

@@ -22,8 +22,8 @@ class Settings(BaseSettings):
database_sslmode: str = "disable" database_sslmode: str = "disable"
# Database connection pool settings # Database connection pool settings
database_pool_size: int = 5 # Number of connections to keep open database_pool_size: int = 20 # Number of connections to keep open
database_max_overflow: int = 10 # Max additional connections beyond pool_size database_max_overflow: int = 30 # Max additional connections beyond pool_size
database_pool_timeout: int = 30 # Seconds to wait for a connection from pool database_pool_timeout: int = 30 # Seconds to wait for a connection from pool
database_pool_recycle: int = ( database_pool_recycle: int = (
1800 # Recycle connections after this many seconds (30 min) 1800 # Recycle connections after this many seconds (30 min)
@@ -53,6 +53,25 @@ class Settings(BaseSettings):
) )
pypi_download_mode: str = "redirect" # "redirect" (to S3) or "proxy" (stream through Orchard) pypi_download_mode: str = "redirect" # "redirect" (to S3) or "proxy" (stream through Orchard)
# HTTP Client pool settings
http_max_connections: int = 100 # Max connections per pool
http_max_keepalive: int = 20 # Keep-alive connections
http_connect_timeout: float = 30.0 # Connection timeout seconds
http_read_timeout: float = 60.0 # Read timeout seconds
http_worker_threads: int = 32 # Thread pool for blocking ops
# Redis cache settings
redis_host: str = "localhost"
redis_port: int = 6379
redis_db: int = 0
redis_password: Optional[str] = None
redis_enabled: bool = True # Set False to disable caching
# Cache TTL settings (seconds, 0 = no expiry)
cache_ttl_index: int = 300 # Package index pages: 5 min
cache_ttl_versions: int = 300 # Version listings: 5 min
cache_ttl_upstream: int = 3600 # Upstream source config: 1 hour
# Logging settings # Logging settings
log_level: str = "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL log_level: str = "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL
log_format: str = "auto" # "json", "standard", or "auto" (json in production) log_format: str = "auto" # "json", "standard", or "auto" (json in production)

175
backend/app/db_utils.py Normal file
View File

@@ -0,0 +1,175 @@
"""
Database utilities for optimized artifact operations.
Provides batch operations to eliminate N+1 queries.
"""
import logging
from typing import Optional
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.orm import Session
from .models import Artifact, ArtifactDependency, CachedUrl
logger = logging.getLogger(__name__)
class ArtifactRepository:
"""
Optimized database operations for artifact storage.
Key optimizations:
- Atomic upserts using ON CONFLICT
- Batch inserts for dependencies
- Joined queries to avoid N+1
"""
def __init__(self, db: Session):
self.db = db
@staticmethod
def _format_dependency_values(
artifact_id: str,
dependencies: list[tuple[str, str, str]],
) -> list[dict]:
"""
Format dependencies for batch insert.
Args:
artifact_id: SHA256 of the artifact
dependencies: List of (project, package, version_constraint)
Returns:
List of dicts ready for bulk insert.
"""
return [
{
"artifact_id": artifact_id,
"dependency_project": proj,
"dependency_package": pkg,
"version_constraint": ver,
}
for proj, pkg, ver in dependencies
]
def get_or_create_artifact(
self,
sha256: str,
size: int,
filename: str,
content_type: Optional[str] = None,
created_by: str = "system",
s3_key: Optional[str] = None,
) -> tuple[Artifact, bool]:
"""
Get existing artifact or create new one atomically.
Uses INSERT ... ON CONFLICT DO UPDATE to handle races.
If artifact exists, increments ref_count.
Args:
sha256: Content hash (primary key)
size: File size in bytes
filename: Original filename
content_type: MIME type
created_by: User who created the artifact
s3_key: S3 storage key (defaults to standard path)
Returns:
(artifact, created) tuple where created is True for new artifacts.
"""
if s3_key is None:
s3_key = f"fruits/{sha256[:2]}/{sha256[2:4]}/{sha256}"
stmt = pg_insert(Artifact).values(
id=sha256,
size=size,
original_name=filename,
content_type=content_type,
ref_count=1,
created_by=created_by,
s3_key=s3_key,
).on_conflict_do_update(
index_elements=['id'],
set_={'ref_count': Artifact.ref_count + 1}
).returning(Artifact)
result = self.db.execute(stmt)
artifact = result.scalar_one()
# Check if this was an insert or update by comparing ref_count
# ref_count=1 means new, >1 means existing
created = artifact.ref_count == 1
return artifact, created
def batch_upsert_dependencies(
self,
artifact_id: str,
dependencies: list[tuple[str, str, str]],
) -> int:
"""
Insert dependencies in a single batch operation.
Uses ON CONFLICT DO NOTHING to skip duplicates.
Args:
artifact_id: SHA256 of the artifact
dependencies: List of (project, package, version_constraint)
Returns:
Number of dependencies inserted.
"""
if not dependencies:
return 0
values = self._format_dependency_values(artifact_id, dependencies)
stmt = pg_insert(ArtifactDependency).values(values)
stmt = stmt.on_conflict_do_nothing(
index_elements=['artifact_id', 'dependency_project', 'dependency_package']
)
result = self.db.execute(stmt)
return result.rowcount
def get_cached_url_with_artifact(
self,
url_hash: str,
) -> Optional[tuple[CachedUrl, Artifact]]:
"""
Get cached URL and its artifact in a single query.
Args:
url_hash: SHA256 of the URL
Returns:
(CachedUrl, Artifact) tuple or None if not found.
"""
result = (
self.db.query(CachedUrl, Artifact)
.join(Artifact, CachedUrl.artifact_id == Artifact.id)
.filter(CachedUrl.url_hash == url_hash)
.first()
)
return result
def get_artifact_dependencies(
self,
artifact_id: str,
) -> list[ArtifactDependency]:
"""
Get all dependencies for an artifact in a single query.
Args:
artifact_id: SHA256 of the artifact
Returns:
List of ArtifactDependency objects.
"""
return (
self.db.query(ArtifactDependency)
.filter(ArtifactDependency.artifact_id == artifact_id)
.all()
)

179
backend/app/http_client.py Normal file
View File

@@ -0,0 +1,179 @@
"""
HTTP client manager with connection pooling and lifecycle management.
Provides:
- Shared connection pools for upstream requests
- Per-upstream client isolation when needed
- Thread pool for blocking I/O operations
- FastAPI lifespan integration
"""
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Optional
import httpx
from .config import Settings
logger = logging.getLogger(__name__)
class HttpClientManager:
"""
Manages httpx.AsyncClient pools with FastAPI lifespan integration.
Features:
- Default shared pool for general requests
- Per-upstream pools for sources needing specific config/auth
- Dedicated thread pool for blocking operations
- Graceful shutdown
"""
def __init__(self, settings: Settings):
self.max_connections = settings.http_max_connections
self.max_keepalive = settings.http_max_keepalive
self.connect_timeout = settings.http_connect_timeout
self.read_timeout = settings.http_read_timeout
self.worker_threads = settings.http_worker_threads
self._default_client: Optional[httpx.AsyncClient] = None
self._upstream_clients: dict[str, httpx.AsyncClient] = {}
self._executor: Optional[ThreadPoolExecutor] = None
self._started = False
async def startup(self) -> None:
"""Initialize clients and thread pool. Called by FastAPI lifespan."""
if self._started:
return
logger.info(
f"Starting HttpClientManager: max_connections={self.max_connections}, "
f"worker_threads={self.worker_threads}"
)
# Create connection limits
limits = httpx.Limits(
max_connections=self.max_connections,
max_keepalive_connections=self.max_keepalive,
)
# Create timeout config
timeout = httpx.Timeout(
connect=self.connect_timeout,
read=self.read_timeout,
write=self.read_timeout,
pool=self.connect_timeout,
)
# Create default client
self._default_client = httpx.AsyncClient(
limits=limits,
timeout=timeout,
follow_redirects=False, # Handle redirects manually for auth
)
# Create thread pool for blocking operations
self._executor = ThreadPoolExecutor(
max_workers=self.worker_threads,
thread_name_prefix="orchard-blocking-",
)
self._started = True
logger.info("HttpClientManager started")
async def shutdown(self) -> None:
"""Close all clients and thread pool. Called by FastAPI lifespan."""
if not self._started:
return
logger.info("Shutting down HttpClientManager")
# Close default client
if self._default_client:
await self._default_client.aclose()
self._default_client = None
# Close upstream-specific clients
for name, client in self._upstream_clients.items():
logger.debug(f"Closing upstream client: {name}")
await client.aclose()
self._upstream_clients.clear()
# Shutdown thread pool
if self._executor:
self._executor.shutdown(wait=True)
self._executor = None
self._started = False
logger.info("HttpClientManager shutdown complete")
def get_client(self, upstream_name: Optional[str] = None) -> httpx.AsyncClient:
"""
Get HTTP client for making requests.
Args:
upstream_name: Optional upstream source name for dedicated pool.
If None, returns the default shared client.
Returns:
httpx.AsyncClient configured for the request.
Raises:
RuntimeError: If manager not started.
"""
if not self._started or not self._default_client:
raise RuntimeError("HttpClientManager not started. Call startup() first.")
if upstream_name and upstream_name in self._upstream_clients:
return self._upstream_clients[upstream_name]
return self._default_client
async def run_blocking(self, func: Callable[..., Any], *args: Any) -> Any:
"""
Run a blocking function in the thread pool.
Use this for:
- File I/O operations
- Archive extraction (zipfile, tarfile)
- Hash computation on large data
Args:
func: Synchronous function to execute
*args: Arguments to pass to the function
Returns:
The function's return value.
"""
if not self._executor:
raise RuntimeError("HttpClientManager not started. Call startup() first.")
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self._executor, func, *args)
@property
def active_connections(self) -> int:
"""Get approximate number of active connections (for health checks)."""
if not self._default_client:
return 0
# httpx doesn't expose this directly, return pool size as approximation
return self.max_connections
@property
def pool_size(self) -> int:
"""Get configured pool size."""
return self.max_connections
@property
def executor_active(self) -> int:
"""Get number of active thread pool workers."""
if not self._executor:
return 0
return len(self._executor._threads)
@property
def executor_max(self) -> int:
"""Get max thread pool workers."""
return self.worker_threads

View File

@@ -15,6 +15,8 @@ from .pypi_proxy import router as pypi_router
from .seed import seed_database from .seed import seed_database
from .auth import create_default_admin from .auth import create_default_admin
from .rate_limit import limiter from .rate_limit import limiter
from .http_client import HttpClientManager
from .cache_service import CacheService
settings = get_settings() settings = get_settings()
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
@@ -38,6 +40,17 @@ async def lifespan(app: FastAPI):
finally: finally:
db.close() db.close()
# Initialize infrastructure services
logger.info("Initializing infrastructure services...")
app.state.http_client = HttpClientManager(settings)
await app.state.http_client.startup()
app.state.cache = CacheService(settings)
await app.state.cache.startup()
logger.info("Infrastructure services ready")
# Seed test data in development mode # Seed test data in development mode
if settings.is_development: if settings.is_development:
logger.info(f"Running in {settings.env} mode - checking for seed data") logger.info(f"Running in {settings.env} mode - checking for seed data")
@@ -51,6 +64,12 @@ async def lifespan(app: FastAPI):
yield yield
# Shutdown infrastructure services
logger.info("Shutting down infrastructure services...")
await app.state.http_client.shutdown()
await app.state.cache.shutdown()
logger.info("Shutdown complete")
app = FastAPI( app = FastAPI(
title="Orchard", title="Orchard",

View File

@@ -23,15 +23,28 @@ from fastapi.responses import StreamingResponse, HTMLResponse, RedirectResponse
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from .database import get_db from .database import get_db
from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, PackageVersion, ArtifactDependency from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, PackageVersion
from .storage import S3Storage, get_storage from .storage import S3Storage, get_storage
from .config import get_env_upstream_sources, get_settings from .config import get_env_upstream_sources, get_settings
from .http_client import HttpClientManager
from .cache_service import CacheService, CacheCategory
from .db_utils import ArtifactRepository
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
router = APIRouter(prefix="/pypi", tags=["pypi-proxy"]) router = APIRouter(prefix="/pypi", tags=["pypi-proxy"])
def get_http_client(request: Request) -> HttpClientManager:
"""Get HttpClientManager from app state."""
return request.app.state.http_client
def get_cache(request: Request) -> CacheService:
"""Get CacheService from app state."""
return request.app.state.cache
# Timeout configuration for proxy requests # Timeout configuration for proxy requests
PROXY_CONNECT_TIMEOUT = 30.0 PROXY_CONNECT_TIMEOUT = 30.0
PROXY_READ_TIMEOUT = 60.0 PROXY_READ_TIMEOUT = 60.0
@@ -241,6 +254,62 @@ def _extract_pypi_version(filename: str) -> Optional[str]:
return None return None
async def _get_pypi_upstream_sources_cached(
db: Session,
cache: CacheService,
) -> list[UpstreamSource]:
"""
Get PyPI upstream sources with caching.
Sources are cached for cache_ttl_upstream seconds to avoid
repeated database queries on every request.
"""
cache_key = "sources"
# Try cache first
cached = await cache.get(CacheCategory.UPSTREAM_SOURCES, cache_key, protocol="pypi")
if cached:
source_data = json.loads(cached.decode())
# Reconstruct UpstreamSource-like objects from cached data
# We cache just the essential fields needed for requests
return [type('CachedSource', (), d)() for d in source_data]
# Query database
db_sources = (
db.query(UpstreamSource)
.filter(UpstreamSource.source_type == "pypi", UpstreamSource.enabled == True)
.order_by(UpstreamSource.priority)
.all()
)
# Combine with env sources
env_sources = [s for s in get_env_upstream_sources() if s.source_type == "pypi"]
all_sources = list(db_sources) + list(env_sources)
all_sources = sorted(all_sources, key=lambda s: s.priority)
# Cache the essential fields
if all_sources and cache.enabled:
cache_data = [
{
"name": s.name,
"url": s.url,
"priority": s.priority,
"auth_type": getattr(s, "auth_type", "none"),
"username": getattr(s, "username", None),
"password": getattr(s, "password", None),
}
for s in all_sources
]
await cache.set(
CacheCategory.UPSTREAM_SOURCES,
cache_key,
json.dumps(cache_data).encode(),
protocol="pypi",
)
return all_sources
def _get_pypi_upstream_sources(db: Session) -> list[UpstreamSource]: def _get_pypi_upstream_sources(db: Session) -> list[UpstreamSource]:
"""Get all enabled upstream sources configured for PyPI.""" """Get all enabled upstream sources configured for PyPI."""
# Get database sources # Get database sources
@@ -573,6 +642,8 @@ async def pypi_download_file(
upstream: Optional[str] = None, upstream: Optional[str] = None,
db: Session = Depends(get_db), db: Session = Depends(get_db),
storage: S3Storage = Depends(get_storage), storage: S3Storage = Depends(get_storage),
http_client: HttpClientManager = Depends(get_http_client),
cache: CacheService = Depends(get_cache),
): ):
""" """
Download a package file, caching it in Orchard. Download a package file, caching it in Orchard.
@@ -654,7 +725,9 @@ async def pypi_download_file(
headers.update(_build_auth_headers(matched_source)) headers.update(_build_auth_headers(matched_source))
auth = _get_basic_auth(matched_source) if matched_source else None auth = _get_basic_auth(matched_source) if matched_source else None
timeout = httpx.Timeout(300.0, connect=PROXY_CONNECT_TIMEOUT) # 5 minutes for large files # Use shared HTTP client from pool with longer timeout for file downloads
client = http_client.get_client()
download_timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0)
# Initialize extracted dependencies list # Initialize extracted dependencies list
extracted_deps = [] extracted_deps = []
@@ -662,11 +735,11 @@ async def pypi_download_file(
# Fetch the file # Fetch the file
logger.info(f"PyPI proxy: fetching {filename} from {upstream_url}") logger.info(f"PyPI proxy: fetching {filename} from {upstream_url}")
async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client:
response = await client.get( response = await client.get(
upstream_url, upstream_url,
headers=headers, headers=headers,
auth=auth, auth=auth,
timeout=download_timeout,
) )
# Handle redirects manually # Handle redirects manually
@@ -693,6 +766,7 @@ async def pypi_download_file(
headers=redirect_headers, headers=redirect_headers,
auth=redirect_auth, auth=redirect_auth,
follow_redirects=False, follow_redirects=False,
timeout=download_timeout,
) )
redirect_count += 1 redirect_count += 1
@@ -821,7 +895,7 @@ async def pypi_download_file(
) )
db.add(cached_url_record) db.add(cached_url_record)
# Store extracted dependencies (deduplicate first - METADATA can list same dep under multiple extras) # Store extracted dependencies using batch operation
if extracted_deps: if extracted_deps:
# Deduplicate: keep first version constraint seen for each package name # Deduplicate: keep first version constraint seen for each package name
seen_deps: dict[str, str] = {} seen_deps: dict[str, str] = {}
@@ -829,22 +903,17 @@ async def pypi_download_file(
if dep_name not in seen_deps: if dep_name not in seen_deps:
seen_deps[dep_name] = dep_version if dep_version else "*" seen_deps[dep_name] = dep_version if dep_version else "*"
for dep_name, dep_version in seen_deps.items(): # Convert to list of tuples for batch insert
# Check if this dependency already exists for this artifact deps_to_store = [
existing_dep = db.query(ArtifactDependency).filter( ("_pypi", dep_name, dep_version)
ArtifactDependency.artifact_id == sha256, for dep_name, dep_version in seen_deps.items()
ArtifactDependency.dependency_project == "_pypi", ]
ArtifactDependency.dependency_package == dep_name,
).first()
if not existing_dep: # Batch upsert - handles duplicates with ON CONFLICT DO NOTHING
dep = ArtifactDependency( repo = ArtifactRepository(db)
artifact_id=sha256, inserted = repo.batch_upsert_dependencies(sha256, deps_to_store)
dependency_project="_pypi", if inserted > 0:
dependency_package=dep_name, logger.debug(f"Stored {inserted} dependencies for {sha256[:12]}...")
version_constraint=dep_version,
)
db.add(dep)
db.commit() db.commit()

View File

@@ -421,7 +421,8 @@ def _log_audit(
# Health check # Health check
@router.get("/health", response_model=HealthResponse) @router.get("/health", response_model=HealthResponse)
def health_check( async def health_check(
request: Request,
db: Session = Depends(get_db), db: Session = Depends(get_db),
storage: S3Storage = Depends(get_storage), storage: S3Storage = Depends(get_storage),
): ):
@@ -449,11 +450,30 @@ def health_check(
overall_status = "ok" if (storage_healthy and database_healthy) else "degraded" overall_status = "ok" if (storage_healthy and database_healthy) else "degraded"
return HealthResponse( # Build response with optional infrastructure status
status=overall_status, response_data = {
storage_healthy=storage_healthy, "status": overall_status,
database_healthy=database_healthy, "storage_healthy": storage_healthy,
) "database_healthy": database_healthy,
}
# Add HTTP pool status if available
if hasattr(request.app.state, 'http_client'):
http_client = request.app.state.http_client
response_data["http_pool"] = {
"pool_size": http_client.pool_size,
"worker_threads": http_client.executor_max,
}
# Add cache status if available
if hasattr(request.app.state, 'cache'):
cache = request.app.state.cache
response_data["cache"] = {
"enabled": cache.enabled,
"connected": await cache.ping() if cache.enabled else False,
}
return HealthResponse(**response_data)
# --- Authentication Routes --- # --- Authentication Routes ---

View File

@@ -493,6 +493,8 @@ class HealthResponse(BaseModel):
version: str = "1.0.0" version: str = "1.0.0"
storage_healthy: Optional[bool] = None storage_healthy: Optional[bool] = None
database_healthy: Optional[bool] = None database_healthy: Optional[bool] = None
http_pool: Optional[Dict[str, Any]] = None
cache: Optional[Dict[str, Any]] = None
# Garbage collection schemas # Garbage collection schemas

View File

@@ -12,6 +12,7 @@ passlib[bcrypt]==1.7.4
bcrypt==4.0.1 bcrypt==4.0.1
slowapi==0.1.9 slowapi==0.1.9
httpx>=0.25.0 httpx>=0.25.0
redis>=5.0.0
# Test dependencies # Test dependencies
pytest>=7.4.0 pytest>=7.4.0

View File

@@ -135,3 +135,19 @@ class TestPyPIPackageNormalization:
assert "text/html" in response.headers.get("content-type", "") assert "text/html" in response.headers.get("content-type", "")
elif response.status_code == 503: elif response.status_code == 503:
assert "No PyPI upstream sources configured" in response.json()["detail"] assert "No PyPI upstream sources configured" in response.json()["detail"]
class TestPyPIProxyInfrastructure:
"""Tests for PyPI proxy infrastructure integration."""
@pytest.mark.integration
def test_health_endpoint_includes_infrastructure(self, integration_client):
"""Health endpoint should report infrastructure status."""
response = integration_client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "ok"
# Infrastructure status should be present
assert "http_pool" in data
assert "cache" in data

View File

@@ -0,0 +1,374 @@
"""Tests for CacheService."""
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
class TestCacheCategory:
"""Tests for cache category enum."""
@pytest.mark.unit
def test_immutable_categories_have_no_ttl(self):
"""Immutable categories should return None for TTL."""
from app.cache_service import CacheCategory, get_category_ttl
from app.config import Settings
settings = Settings()
assert get_category_ttl(CacheCategory.ARTIFACT_METADATA, settings) is None
assert get_category_ttl(CacheCategory.ARTIFACT_DEPENDENCIES, settings) is None
assert get_category_ttl(CacheCategory.DEPENDENCY_RESOLUTION, settings) is None
@pytest.mark.unit
def test_mutable_categories_have_ttl(self):
"""Mutable categories should return configured TTL."""
from app.cache_service import CacheCategory, get_category_ttl
from app.config import Settings
settings = Settings(
cache_ttl_index=300,
cache_ttl_upstream=3600,
)
assert get_category_ttl(CacheCategory.PACKAGE_INDEX, settings) == 300
assert get_category_ttl(CacheCategory.UPSTREAM_SOURCES, settings) == 3600
class TestCacheService:
"""Tests for Redis cache service."""
@pytest.mark.asyncio
@pytest.mark.unit
async def test_disabled_cache_returns_none(self):
"""When Redis disabled, get() should return None."""
from app.cache_service import CacheService, CacheCategory
from app.config import Settings
settings = Settings(redis_enabled=False)
cache = CacheService(settings)
await cache.startup()
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key")
assert result is None
await cache.shutdown()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_disabled_cache_set_is_noop(self):
"""When Redis disabled, set() should be a no-op."""
from app.cache_service import CacheService, CacheCategory
from app.config import Settings
settings = Settings(redis_enabled=False)
cache = CacheService(settings)
await cache.startup()
# Should not raise
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"test-value")
await cache.shutdown()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_cache_key_namespacing(self):
"""Cache keys should be properly namespaced."""
from app.cache_service import CacheService, CacheCategory
key = CacheService._make_key(CacheCategory.PACKAGE_INDEX, "pypi", "numpy")
assert key == "orchard:index:pypi:numpy"
@pytest.mark.asyncio
@pytest.mark.unit
async def test_ping_returns_false_when_disabled(self):
"""ping() should return False when Redis is disabled."""
from app.cache_service import CacheService
from app.config import Settings
settings = Settings(redis_enabled=False)
cache = CacheService(settings)
await cache.startup()
result = await cache.ping()
assert result is False
await cache.shutdown()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_enabled_property(self):
"""enabled property should reflect Redis state."""
from app.cache_service import CacheService
from app.config import Settings
settings = Settings(redis_enabled=False)
cache = CacheService(settings)
assert cache.enabled is False
@pytest.mark.asyncio
@pytest.mark.unit
async def test_delete_is_noop_when_disabled(self):
"""delete() should be a no-op when Redis is disabled."""
from app.cache_service import CacheService, CacheCategory
from app.config import Settings
settings = Settings(redis_enabled=False)
cache = CacheService(settings)
await cache.startup()
# Should not raise
await cache.delete(CacheCategory.PACKAGE_INDEX, "test-key")
await cache.shutdown()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_invalidate_pattern_returns_zero_when_disabled(self):
"""invalidate_pattern() should return 0 when Redis is disabled."""
from app.cache_service import CacheService, CacheCategory
from app.config import Settings
settings = Settings(redis_enabled=False)
cache = CacheService(settings)
await cache.startup()
result = await cache.invalidate_pattern(CacheCategory.PACKAGE_INDEX)
assert result == 0
await cache.shutdown()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_startup_already_started(self):
"""startup() should be idempotent."""
from app.cache_service import CacheService
from app.config import Settings
settings = Settings(redis_enabled=False)
cache = CacheService(settings)
await cache.startup()
await cache.startup() # Should not raise
assert cache._started is True
await cache.shutdown()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_shutdown_not_started(self):
"""shutdown() should handle not-started state."""
from app.cache_service import CacheService
from app.config import Settings
settings = Settings(redis_enabled=False)
cache = CacheService(settings)
# Should not raise
await cache.shutdown()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_make_key_with_default_protocol(self):
"""_make_key should work with default protocol."""
from app.cache_service import CacheService, CacheCategory
key = CacheService._make_key(CacheCategory.ARTIFACT_METADATA, "default", "abc123")
assert key == "orchard:artifact:default:abc123"
class TestCacheServiceWithMockedRedis:
"""Tests for CacheService with mocked Redis client."""
@pytest.mark.asyncio
@pytest.mark.unit
async def test_get_returns_cached_value(self):
"""get() should return cached value when available."""
from app.cache_service import CacheService, CacheCategory
from app.config import Settings
settings = Settings(redis_enabled=True)
cache = CacheService(settings)
# Mock the redis client
mock_redis = AsyncMock()
mock_redis.get.return_value = b"cached-data"
cache._redis = mock_redis
cache._enabled = True
cache._started = True
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key", "pypi")
assert result == b"cached-data"
mock_redis.get.assert_called_once_with("orchard:index:pypi:test-key")
@pytest.mark.asyncio
@pytest.mark.unit
async def test_set_with_ttl(self):
"""set() should use setex for mutable categories."""
from app.cache_service import CacheService, CacheCategory
from app.config import Settings
settings = Settings(redis_enabled=True, cache_ttl_index=300)
cache = CacheService(settings)
mock_redis = AsyncMock()
cache._redis = mock_redis
cache._enabled = True
cache._started = True
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"test-value", "pypi")
mock_redis.setex.assert_called_once_with(
"orchard:index:pypi:test-key", 300, b"test-value"
)
@pytest.mark.asyncio
@pytest.mark.unit
async def test_set_without_ttl(self):
"""set() should use set (no expiry) for immutable categories."""
from app.cache_service import CacheService, CacheCategory
from app.config import Settings
settings = Settings(redis_enabled=True)
cache = CacheService(settings)
mock_redis = AsyncMock()
cache._redis = mock_redis
cache._enabled = True
cache._started = True
await cache.set(
CacheCategory.ARTIFACT_METADATA, "abc123", b"metadata", "pypi"
)
mock_redis.set.assert_called_once_with(
"orchard:artifact:pypi:abc123", b"metadata"
)
@pytest.mark.asyncio
@pytest.mark.unit
async def test_delete_calls_redis_delete(self):
"""delete() should call Redis delete."""
from app.cache_service import CacheService, CacheCategory
from app.config import Settings
settings = Settings(redis_enabled=True)
cache = CacheService(settings)
mock_redis = AsyncMock()
cache._redis = mock_redis
cache._enabled = True
cache._started = True
await cache.delete(CacheCategory.PACKAGE_INDEX, "test-key", "pypi")
mock_redis.delete.assert_called_once_with("orchard:index:pypi:test-key")
@pytest.mark.asyncio
@pytest.mark.unit
async def test_invalidate_pattern_deletes_matching_keys(self):
"""invalidate_pattern() should delete all matching keys."""
from app.cache_service import CacheService, CacheCategory
from app.config import Settings
settings = Settings(redis_enabled=True)
cache = CacheService(settings)
mock_redis = AsyncMock()
# Create an async generator for scan_iter
async def mock_scan_iter(match=None):
for key in [b"orchard:index:pypi:numpy", b"orchard:index:pypi:requests"]:
yield key
mock_redis.scan_iter = mock_scan_iter
mock_redis.delete.return_value = 2
cache._redis = mock_redis
cache._enabled = True
cache._started = True
result = await cache.invalidate_pattern(CacheCategory.PACKAGE_INDEX, "*", "pypi")
assert result == 2
mock_redis.delete.assert_called_once()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_ping_returns_true_when_connected(self):
"""ping() should return True when Redis responds."""
from app.cache_service import CacheService
from app.config import Settings
settings = Settings(redis_enabled=True)
cache = CacheService(settings)
mock_redis = AsyncMock()
mock_redis.ping.return_value = True
cache._redis = mock_redis
cache._enabled = True
cache._started = True
result = await cache.ping()
assert result is True
@pytest.mark.asyncio
@pytest.mark.unit
async def test_get_handles_exception(self):
"""get() should return None and log warning on exception."""
from app.cache_service import CacheService, CacheCategory
from app.config import Settings
settings = Settings(redis_enabled=True)
cache = CacheService(settings)
mock_redis = AsyncMock()
mock_redis.get.side_effect = Exception("Connection lost")
cache._redis = mock_redis
cache._enabled = True
cache._started = True
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key")
assert result is None
@pytest.mark.asyncio
@pytest.mark.unit
async def test_set_handles_exception(self):
"""set() should log warning on exception."""
from app.cache_service import CacheService, CacheCategory
from app.config import Settings
settings = Settings(redis_enabled=True, cache_ttl_index=300)
cache = CacheService(settings)
mock_redis = AsyncMock()
mock_redis.setex.side_effect = Exception("Connection lost")
cache._redis = mock_redis
cache._enabled = True
cache._started = True
# Should not raise
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"value")
@pytest.mark.asyncio
@pytest.mark.unit
async def test_ping_returns_false_on_exception(self):
"""ping() should return False on exception."""
from app.cache_service import CacheService
from app.config import Settings
settings = Settings(redis_enabled=True)
cache = CacheService(settings)
mock_redis = AsyncMock()
mock_redis.ping.side_effect = Exception("Connection lost")
cache._redis = mock_redis
cache._enabled = True
cache._started = True
result = await cache.ping()
assert result is False

View File

@@ -0,0 +1,167 @@
"""Tests for database utility functions."""
import pytest
from unittest.mock import MagicMock, patch
class TestArtifactRepository:
"""Tests for ArtifactRepository."""
def test_batch_dependency_values_formatting(self):
"""batch_upsert_dependencies should format values correctly."""
from app.db_utils import ArtifactRepository
deps = [
("_pypi", "numpy", ">=1.21.0"),
("_pypi", "requests", "*"),
("myproject", "mylib", "==1.0.0"),
]
values = ArtifactRepository._format_dependency_values("abc123", deps)
assert len(values) == 3
assert values[0] == {
"artifact_id": "abc123",
"dependency_project": "_pypi",
"dependency_package": "numpy",
"version_constraint": ">=1.21.0",
}
assert values[2]["dependency_project"] == "myproject"
def test_empty_dependencies_returns_empty_list(self):
"""Empty dependency list should return empty values."""
from app.db_utils import ArtifactRepository
values = ArtifactRepository._format_dependency_values("abc123", [])
assert values == []
def test_format_dependency_values_preserves_special_characters(self):
"""Version constraints with special characters should be preserved."""
from app.db_utils import ArtifactRepository
deps = [
("_pypi", "package-name", ">=1.0.0,<2.0.0"),
("_pypi", "another_pkg", "~=1.4.2"),
]
values = ArtifactRepository._format_dependency_values("hash123", deps)
assert values[0]["version_constraint"] == ">=1.0.0,<2.0.0"
assert values[1]["version_constraint"] == "~=1.4.2"
def test_batch_upsert_dependencies_returns_zero_for_empty(self):
"""batch_upsert_dependencies should return 0 for empty list without DB call."""
from app.db_utils import ArtifactRepository
mock_db = MagicMock()
repo = ArtifactRepository(mock_db)
result = repo.batch_upsert_dependencies("abc123", [])
assert result == 0
# Verify no DB operations were performed
mock_db.execute.assert_not_called()
def test_get_or_create_artifact_builds_correct_statement(self):
"""get_or_create_artifact should use ON CONFLICT DO UPDATE."""
from app.db_utils import ArtifactRepository
from app.models import Artifact
mock_db = MagicMock()
mock_result = MagicMock()
mock_artifact = MagicMock()
mock_artifact.ref_count = 1
mock_result.scalar_one.return_value = mock_artifact
mock_db.execute.return_value = mock_result
repo = ArtifactRepository(mock_db)
artifact, created = repo.get_or_create_artifact(
sha256="abc123def456",
size=1024,
filename="test.whl",
content_type="application/zip",
)
assert mock_db.execute.called
assert created is True
assert artifact == mock_artifact
def test_get_or_create_artifact_existing_not_created(self):
"""get_or_create_artifact should return created=False for existing artifact."""
from app.db_utils import ArtifactRepository
mock_db = MagicMock()
mock_result = MagicMock()
mock_artifact = MagicMock()
mock_artifact.ref_count = 5 # Existing artifact with ref_count > 1
mock_result.scalar_one.return_value = mock_artifact
mock_db.execute.return_value = mock_result
repo = ArtifactRepository(mock_db)
artifact, created = repo.get_or_create_artifact(
sha256="abc123def456",
size=1024,
filename="test.whl",
)
assert created is False
def test_get_cached_url_with_artifact_returns_tuple(self):
"""get_cached_url_with_artifact should return (CachedUrl, Artifact) tuple."""
from app.db_utils import ArtifactRepository
mock_db = MagicMock()
mock_cached_url = MagicMock()
mock_artifact = MagicMock()
mock_db.query.return_value.join.return_value.filter.return_value.first.return_value = (
mock_cached_url,
mock_artifact,
)
repo = ArtifactRepository(mock_db)
result = repo.get_cached_url_with_artifact("url_hash_123")
assert result == (mock_cached_url, mock_artifact)
def test_get_cached_url_with_artifact_returns_none_when_not_found(self):
"""get_cached_url_with_artifact should return None when URL not cached."""
from app.db_utils import ArtifactRepository
mock_db = MagicMock()
mock_db.query.return_value.join.return_value.filter.return_value.first.return_value = None
repo = ArtifactRepository(mock_db)
result = repo.get_cached_url_with_artifact("nonexistent_hash")
assert result is None
def test_get_artifact_dependencies_returns_list(self):
"""get_artifact_dependencies should return list of dependencies."""
from app.db_utils import ArtifactRepository
mock_db = MagicMock()
mock_dep1 = MagicMock()
mock_dep2 = MagicMock()
mock_db.query.return_value.filter.return_value.all.return_value = [
mock_dep1,
mock_dep2,
]
repo = ArtifactRepository(mock_db)
result = repo.get_artifact_dependencies("artifact_hash_123")
assert len(result) == 2
assert result[0] == mock_dep1
assert result[1] == mock_dep2
def test_get_artifact_dependencies_returns_empty_list(self):
"""get_artifact_dependencies should return empty list when no dependencies."""
from app.db_utils import ArtifactRepository
mock_db = MagicMock()
mock_db.query.return_value.filter.return_value.all.return_value = []
repo = ArtifactRepository(mock_db)
result = repo.get_artifact_dependencies("artifact_without_deps")
assert result == []

View File

@@ -0,0 +1,194 @@
"""Tests for HttpClientManager."""
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
class TestHttpClientManager:
"""Tests for HTTP client pool management."""
@pytest.mark.unit
def test_manager_initializes_with_settings(self):
"""Manager should initialize with config settings."""
from app.http_client import HttpClientManager
from app.config import Settings
settings = Settings(
http_max_connections=50,
http_connect_timeout=15.0,
)
manager = HttpClientManager(settings)
assert manager.max_connections == 50
assert manager.connect_timeout == 15.0
assert manager._default_client is None # Not started yet
@pytest.mark.asyncio
@pytest.mark.unit
async def test_startup_creates_client(self):
"""Startup should create the default async client."""
from app.http_client import HttpClientManager
from app.config import Settings
settings = Settings()
manager = HttpClientManager(settings)
await manager.startup()
assert manager._default_client is not None
await manager.shutdown()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_shutdown_closes_client(self):
"""Shutdown should close all clients gracefully."""
from app.http_client import HttpClientManager
from app.config import Settings
settings = Settings()
manager = HttpClientManager(settings)
await manager.startup()
client = manager._default_client
await manager.shutdown()
assert manager._default_client is None
assert client.is_closed
@pytest.mark.asyncio
@pytest.mark.unit
async def test_get_client_returns_default(self):
"""get_client() should return the default client."""
from app.http_client import HttpClientManager
from app.config import Settings
settings = Settings()
manager = HttpClientManager(settings)
await manager.startup()
client = manager.get_client()
assert client is manager._default_client
await manager.shutdown()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_get_client_raises_if_not_started(self):
"""get_client() should raise RuntimeError if manager not started."""
from app.http_client import HttpClientManager
from app.config import Settings
settings = Settings()
manager = HttpClientManager(settings)
with pytest.raises(RuntimeError, match="not started"):
manager.get_client()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_run_blocking_executes_in_thread_pool(self):
"""run_blocking should execute sync functions in thread pool."""
from app.http_client import HttpClientManager
from app.config import Settings
import threading
settings = Settings()
manager = HttpClientManager(settings)
await manager.startup()
main_thread = threading.current_thread()
execution_thread = None
def blocking_func():
nonlocal execution_thread
execution_thread = threading.current_thread()
return "result"
result = await manager.run_blocking(blocking_func)
assert result == "result"
assert execution_thread is not main_thread
await manager.shutdown()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_run_blocking_raises_if_not_started(self):
"""run_blocking should raise RuntimeError if manager not started."""
from app.http_client import HttpClientManager
from app.config import Settings
settings = Settings()
manager = HttpClientManager(settings)
with pytest.raises(RuntimeError, match="not started"):
await manager.run_blocking(lambda: None)
@pytest.mark.asyncio
@pytest.mark.unit
async def test_startup_idempotent(self):
"""Calling startup multiple times should be safe."""
from app.http_client import HttpClientManager
from app.config import Settings
settings = Settings()
manager = HttpClientManager(settings)
await manager.startup()
client1 = manager._default_client
await manager.startup() # Should not create a new client
client2 = manager._default_client
assert client1 is client2 # Same client instance
await manager.shutdown()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_shutdown_idempotent(self):
"""Calling shutdown multiple times should be safe."""
from app.http_client import HttpClientManager
from app.config import Settings
settings = Settings()
manager = HttpClientManager(settings)
await manager.startup()
await manager.shutdown()
await manager.shutdown() # Should not raise
assert manager._default_client is None
@pytest.mark.asyncio
@pytest.mark.unit
async def test_properties_return_configured_values(self):
"""Properties should return configured values."""
from app.http_client import HttpClientManager
from app.config import Settings
settings = Settings(
http_max_connections=75,
http_worker_threads=16,
)
manager = HttpClientManager(settings)
await manager.startup()
assert manager.pool_size == 75
assert manager.executor_max == 16
await manager.shutdown()
@pytest.mark.asyncio
@pytest.mark.unit
async def test_active_connections_when_not_started(self):
"""active_connections should return 0 when not started."""
from app.http_client import HttpClientManager
from app.config import Settings
settings = Settings()
manager = HttpClientManager(settings)
assert manager.active_connections == 0

View File

@@ -228,7 +228,7 @@ minioIngress:
secretName: minio-tls # Overridden by CI secretName: minio-tls # Overridden by CI
redis: redis:
enabled: false enabled: true
waitForDatabase: true waitForDatabase: true

View File

@@ -140,7 +140,7 @@ minioIngress:
enabled: false enabled: false
redis: redis:
enabled: false enabled: true
waitForDatabase: true waitForDatabase: true

View File

@@ -146,7 +146,7 @@ minioIngress:
# Redis subchart configuration (for future caching) # Redis subchart configuration (for future caching)
redis: redis:
enabled: false enabled: true
image: image:
registry: containers.global.bsf.tools registry: containers.global.bsf.tools
repository: bitnami/redis repository: bitnami/redis