Compare commits
14 Commits
196f3f957c
...
632bf54087
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
632bf54087 | ||
|
|
170561b32a | ||
|
|
6e05697ae2 | ||
|
|
08b6589712 | ||
|
|
7ad5a15ef4 | ||
|
|
8fdb73901e | ||
|
|
79dd7b833e | ||
|
|
71089aee0e | ||
|
|
ffe0529ea8 | ||
|
|
146ca2ad74 | ||
|
|
a045509fe4 | ||
|
|
14806b05f0 | ||
|
|
c67004af52 | ||
|
|
8c6ba01a73 |
23
CHANGELOG.md
23
CHANGELOG.md
@@ -7,6 +7,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
### Added
|
### Added
|
||||||
|
- Added HTTP connection pooling infrastructure for improved PyPI proxy performance
|
||||||
|
- `HttpClientManager` with configurable pool size, timeouts, and thread pool executor
|
||||||
|
- Eliminates per-request connection overhead (~100-500ms → ~5ms)
|
||||||
|
- Added Redis caching layer with category-aware TTL for hermetic builds
|
||||||
|
- `CacheService` with graceful fallback when Redis unavailable
|
||||||
|
- Immutable data (artifact metadata, dependencies) cached forever
|
||||||
|
- Mutable data (package index, versions) uses configurable TTL
|
||||||
|
- Added `ArtifactRepository` for batch database operations
|
||||||
|
- `batch_upsert_dependencies()` reduces N+1 queries to single INSERT
|
||||||
|
- `get_or_create_artifact()` uses atomic ON CONFLICT upsert
|
||||||
|
- Added infrastructure status to health endpoint (`/health`)
|
||||||
|
- Reports HTTP pool size and worker threads
|
||||||
|
- Reports Redis cache connection status
|
||||||
|
- Added new configuration settings for HTTP client, Redis, and cache TTL
|
||||||
|
- `ORCHARD_HTTP_MAX_CONNECTIONS`, `ORCHARD_HTTP_CONNECT_TIMEOUT`, etc.
|
||||||
|
- `ORCHARD_REDIS_HOST`, `ORCHARD_REDIS_PORT`, `ORCHARD_REDIS_ENABLED`
|
||||||
|
- `ORCHARD_CACHE_TTL_INDEX`, `ORCHARD_CACHE_TTL_VERSIONS`, etc.
|
||||||
- Added transparent PyPI proxy implementing PEP 503 Simple API (#108)
|
- Added transparent PyPI proxy implementing PEP 503 Simple API (#108)
|
||||||
- `GET /pypi/simple/` - package index (proxied from upstream)
|
- `GET /pypi/simple/` - package index (proxied from upstream)
|
||||||
- `GET /pypi/simple/{package}/` - version list with rewritten download links
|
- `GET /pypi/simple/{package}/` - version list with rewritten download links
|
||||||
@@ -16,6 +33,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
- Added `POST /api/v1/cache/resolve` endpoint to cache packages by coordinates instead of URL (#108)
|
- Added `POST /api/v1/cache/resolve` endpoint to cache packages by coordinates instead of URL (#108)
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
|
- PyPI proxy now uses shared HTTP connection pool instead of per-request clients
|
||||||
|
- PyPI proxy now caches upstream source configuration in Redis
|
||||||
|
- Dependency storage now uses batch INSERT instead of individual queries
|
||||||
|
- Increased default database pool size from 5 to 20 connections
|
||||||
|
- Increased default database max overflow from 10 to 30 connections
|
||||||
|
- Enabled Redis in Helm chart values for dev, stage, and prod environments
|
||||||
- Upstream sources table text is now centered under column headers (#108)
|
- Upstream sources table text is now centered under column headers (#108)
|
||||||
- ENV badge now appears inline with source name instead of separate column (#108)
|
- ENV badge now appears inline with source name instead of separate column (#108)
|
||||||
- Test and Edit buttons now have more prominent button styling (#108)
|
- Test and Edit buttons now have more prominent button styling (#108)
|
||||||
|
|||||||
262
backend/app/cache_service.py
Normal file
262
backend/app/cache_service.py
Normal file
@@ -0,0 +1,262 @@
|
|||||||
|
"""
|
||||||
|
Redis-backed caching service with category-aware TTL and invalidation.
|
||||||
|
|
||||||
|
Provides:
|
||||||
|
- Immutable caching for artifact data (hermetic builds)
|
||||||
|
- TTL-based caching for discovery data
|
||||||
|
- Event-driven invalidation for config changes
|
||||||
|
- Graceful fallback when Redis unavailable
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from enum import Enum
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from .config import Settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class CacheCategory(Enum):
|
||||||
|
"""
|
||||||
|
Cache categories with different TTL and invalidation rules.
|
||||||
|
|
||||||
|
Immutable (cache forever):
|
||||||
|
- ARTIFACT_METADATA: Artifact info by SHA256
|
||||||
|
- ARTIFACT_DEPENDENCIES: Extracted deps by SHA256
|
||||||
|
- DEPENDENCY_RESOLUTION: Resolution results by input hash
|
||||||
|
|
||||||
|
Mutable (TTL + event invalidation):
|
||||||
|
- UPSTREAM_SOURCES: Upstream config, invalidate on DB change
|
||||||
|
- PACKAGE_INDEX: PyPI/npm index pages, TTL only
|
||||||
|
- PACKAGE_VERSIONS: Version listings, TTL only
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Immutable - cache forever (hermetic builds)
|
||||||
|
ARTIFACT_METADATA = "artifact"
|
||||||
|
ARTIFACT_DEPENDENCIES = "deps"
|
||||||
|
DEPENDENCY_RESOLUTION = "resolve"
|
||||||
|
|
||||||
|
# Mutable - TTL + event invalidation
|
||||||
|
UPSTREAM_SOURCES = "upstream"
|
||||||
|
PACKAGE_INDEX = "index"
|
||||||
|
PACKAGE_VERSIONS = "versions"
|
||||||
|
|
||||||
|
|
||||||
|
def get_category_ttl(category: CacheCategory, settings: Settings) -> Optional[int]:
|
||||||
|
"""
|
||||||
|
Get TTL for a cache category.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
TTL in seconds, or None for no expiry (immutable).
|
||||||
|
"""
|
||||||
|
ttl_map = {
|
||||||
|
# Immutable - no TTL
|
||||||
|
CacheCategory.ARTIFACT_METADATA: None,
|
||||||
|
CacheCategory.ARTIFACT_DEPENDENCIES: None,
|
||||||
|
CacheCategory.DEPENDENCY_RESOLUTION: None,
|
||||||
|
# Mutable - configurable TTL
|
||||||
|
CacheCategory.UPSTREAM_SOURCES: settings.cache_ttl_upstream,
|
||||||
|
CacheCategory.PACKAGE_INDEX: settings.cache_ttl_index,
|
||||||
|
CacheCategory.PACKAGE_VERSIONS: settings.cache_ttl_versions,
|
||||||
|
}
|
||||||
|
return ttl_map.get(category)
|
||||||
|
|
||||||
|
|
||||||
|
class CacheService:
|
||||||
|
"""
|
||||||
|
Redis-backed caching with category-aware TTL.
|
||||||
|
|
||||||
|
Key format: orchard:{category}:{protocol}:{identifier}
|
||||||
|
Example: orchard:deps:pypi:abc123def456
|
||||||
|
|
||||||
|
When Redis is disabled or unavailable, operations gracefully
|
||||||
|
return None/no-op to allow the application to function without caching.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, settings: Settings):
|
||||||
|
self._settings = settings
|
||||||
|
self._enabled = settings.redis_enabled
|
||||||
|
self._redis: Optional["redis.asyncio.Redis"] = None
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
async def startup(self) -> None:
|
||||||
|
"""Initialize Redis connection. Called by FastAPI lifespan."""
|
||||||
|
if self._started:
|
||||||
|
return
|
||||||
|
|
||||||
|
if not self._enabled:
|
||||||
|
logger.info("CacheService disabled (redis_enabled=False)")
|
||||||
|
self._started = True
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
import redis.asyncio as redis
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Connecting to Redis at {self._settings.redis_host}:"
|
||||||
|
f"{self._settings.redis_port}/{self._settings.redis_db}"
|
||||||
|
)
|
||||||
|
|
||||||
|
self._redis = redis.Redis(
|
||||||
|
host=self._settings.redis_host,
|
||||||
|
port=self._settings.redis_port,
|
||||||
|
db=self._settings.redis_db,
|
||||||
|
password=self._settings.redis_password,
|
||||||
|
decode_responses=False, # We handle bytes
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test connection
|
||||||
|
await self._redis.ping()
|
||||||
|
logger.info("CacheService connected to Redis")
|
||||||
|
|
||||||
|
except ImportError:
|
||||||
|
logger.warning("redis package not installed, caching disabled")
|
||||||
|
self._enabled = False
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Redis connection failed, caching disabled: {e}")
|
||||||
|
self._enabled = False
|
||||||
|
self._redis = None
|
||||||
|
|
||||||
|
self._started = True
|
||||||
|
|
||||||
|
async def shutdown(self) -> None:
|
||||||
|
"""Close Redis connection. Called by FastAPI lifespan."""
|
||||||
|
if not self._started:
|
||||||
|
return
|
||||||
|
|
||||||
|
if self._redis:
|
||||||
|
await self._redis.aclose()
|
||||||
|
self._redis = None
|
||||||
|
|
||||||
|
self._started = False
|
||||||
|
logger.info("CacheService shutdown complete")
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _make_key(category: CacheCategory, protocol: str, identifier: str) -> str:
|
||||||
|
"""Build namespaced cache key."""
|
||||||
|
return f"orchard:{category.value}:{protocol}:{identifier}"
|
||||||
|
|
||||||
|
async def get(
|
||||||
|
self,
|
||||||
|
category: CacheCategory,
|
||||||
|
key: str,
|
||||||
|
protocol: str = "default",
|
||||||
|
) -> Optional[bytes]:
|
||||||
|
"""
|
||||||
|
Get cached value.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
category: Cache category for TTL rules
|
||||||
|
key: Unique identifier within category
|
||||||
|
protocol: Protocol namespace (pypi, npm, etc.)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Cached bytes or None if not found/disabled.
|
||||||
|
"""
|
||||||
|
if not self._enabled or not self._redis:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
full_key = self._make_key(category, protocol, key)
|
||||||
|
return await self._redis.get(full_key)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Cache get failed for {key}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def set(
|
||||||
|
self,
|
||||||
|
category: CacheCategory,
|
||||||
|
key: str,
|
||||||
|
value: bytes,
|
||||||
|
protocol: str = "default",
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
Set cached value with category-appropriate TTL.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
category: Cache category for TTL rules
|
||||||
|
key: Unique identifier within category
|
||||||
|
value: Bytes to cache
|
||||||
|
protocol: Protocol namespace (pypi, npm, etc.)
|
||||||
|
"""
|
||||||
|
if not self._enabled or not self._redis:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
full_key = self._make_key(category, protocol, key)
|
||||||
|
ttl = get_category_ttl(category, self._settings)
|
||||||
|
|
||||||
|
if ttl is None:
|
||||||
|
await self._redis.set(full_key, value)
|
||||||
|
else:
|
||||||
|
await self._redis.setex(full_key, ttl, value)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Cache set failed for {key}: {e}")
|
||||||
|
|
||||||
|
async def delete(
|
||||||
|
self,
|
||||||
|
category: CacheCategory,
|
||||||
|
key: str,
|
||||||
|
protocol: str = "default",
|
||||||
|
) -> None:
|
||||||
|
"""Delete a specific cache entry."""
|
||||||
|
if not self._enabled or not self._redis:
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
|
full_key = self._make_key(category, protocol, key)
|
||||||
|
await self._redis.delete(full_key)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Cache delete failed for {key}: {e}")
|
||||||
|
|
||||||
|
async def invalidate_pattern(
|
||||||
|
self,
|
||||||
|
category: CacheCategory,
|
||||||
|
pattern: str = "*",
|
||||||
|
protocol: str = "default",
|
||||||
|
) -> int:
|
||||||
|
"""
|
||||||
|
Invalidate all entries matching pattern.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
category: Cache category
|
||||||
|
pattern: Glob pattern for keys (default "*" = all in category)
|
||||||
|
protocol: Protocol namespace
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of keys deleted.
|
||||||
|
"""
|
||||||
|
if not self._enabled or not self._redis:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
try:
|
||||||
|
full_pattern = self._make_key(category, protocol, pattern)
|
||||||
|
keys = []
|
||||||
|
async for key in self._redis.scan_iter(match=full_pattern):
|
||||||
|
keys.append(key)
|
||||||
|
|
||||||
|
if keys:
|
||||||
|
return await self._redis.delete(*keys)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Cache invalidate failed for pattern {pattern}: {e}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
async def ping(self) -> bool:
|
||||||
|
"""Check if Redis is connected and responding."""
|
||||||
|
if not self._enabled or not self._redis:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
await self._redis.ping()
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return False
|
||||||
|
|
||||||
|
@property
|
||||||
|
def enabled(self) -> bool:
|
||||||
|
"""Check if caching is enabled."""
|
||||||
|
return self._enabled
|
||||||
@@ -22,8 +22,8 @@ class Settings(BaseSettings):
|
|||||||
database_sslmode: str = "disable"
|
database_sslmode: str = "disable"
|
||||||
|
|
||||||
# Database connection pool settings
|
# Database connection pool settings
|
||||||
database_pool_size: int = 5 # Number of connections to keep open
|
database_pool_size: int = 20 # Number of connections to keep open
|
||||||
database_max_overflow: int = 10 # Max additional connections beyond pool_size
|
database_max_overflow: int = 30 # Max additional connections beyond pool_size
|
||||||
database_pool_timeout: int = 30 # Seconds to wait for a connection from pool
|
database_pool_timeout: int = 30 # Seconds to wait for a connection from pool
|
||||||
database_pool_recycle: int = (
|
database_pool_recycle: int = (
|
||||||
1800 # Recycle connections after this many seconds (30 min)
|
1800 # Recycle connections after this many seconds (30 min)
|
||||||
@@ -53,6 +53,25 @@ class Settings(BaseSettings):
|
|||||||
)
|
)
|
||||||
pypi_download_mode: str = "redirect" # "redirect" (to S3) or "proxy" (stream through Orchard)
|
pypi_download_mode: str = "redirect" # "redirect" (to S3) or "proxy" (stream through Orchard)
|
||||||
|
|
||||||
|
# HTTP Client pool settings
|
||||||
|
http_max_connections: int = 100 # Max connections per pool
|
||||||
|
http_max_keepalive: int = 20 # Keep-alive connections
|
||||||
|
http_connect_timeout: float = 30.0 # Connection timeout seconds
|
||||||
|
http_read_timeout: float = 60.0 # Read timeout seconds
|
||||||
|
http_worker_threads: int = 32 # Thread pool for blocking ops
|
||||||
|
|
||||||
|
# Redis cache settings
|
||||||
|
redis_host: str = "localhost"
|
||||||
|
redis_port: int = 6379
|
||||||
|
redis_db: int = 0
|
||||||
|
redis_password: Optional[str] = None
|
||||||
|
redis_enabled: bool = True # Set False to disable caching
|
||||||
|
|
||||||
|
# Cache TTL settings (seconds, 0 = no expiry)
|
||||||
|
cache_ttl_index: int = 300 # Package index pages: 5 min
|
||||||
|
cache_ttl_versions: int = 300 # Version listings: 5 min
|
||||||
|
cache_ttl_upstream: int = 3600 # Upstream source config: 1 hour
|
||||||
|
|
||||||
# Logging settings
|
# Logging settings
|
||||||
log_level: str = "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL
|
log_level: str = "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL
|
||||||
log_format: str = "auto" # "json", "standard", or "auto" (json in production)
|
log_format: str = "auto" # "json", "standard", or "auto" (json in production)
|
||||||
|
|||||||
175
backend/app/db_utils.py
Normal file
175
backend/app/db_utils.py
Normal file
@@ -0,0 +1,175 @@
|
|||||||
|
"""
|
||||||
|
Database utilities for optimized artifact operations.
|
||||||
|
|
||||||
|
Provides batch operations to eliminate N+1 queries.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import logging
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from sqlalchemy.dialects.postgresql import insert as pg_insert
|
||||||
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
|
from .models import Artifact, ArtifactDependency, CachedUrl
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class ArtifactRepository:
|
||||||
|
"""
|
||||||
|
Optimized database operations for artifact storage.
|
||||||
|
|
||||||
|
Key optimizations:
|
||||||
|
- Atomic upserts using ON CONFLICT
|
||||||
|
- Batch inserts for dependencies
|
||||||
|
- Joined queries to avoid N+1
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, db: Session):
|
||||||
|
self.db = db
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _format_dependency_values(
|
||||||
|
artifact_id: str,
|
||||||
|
dependencies: list[tuple[str, str, str]],
|
||||||
|
) -> list[dict]:
|
||||||
|
"""
|
||||||
|
Format dependencies for batch insert.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
artifact_id: SHA256 of the artifact
|
||||||
|
dependencies: List of (project, package, version_constraint)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of dicts ready for bulk insert.
|
||||||
|
"""
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
"artifact_id": artifact_id,
|
||||||
|
"dependency_project": proj,
|
||||||
|
"dependency_package": pkg,
|
||||||
|
"version_constraint": ver,
|
||||||
|
}
|
||||||
|
for proj, pkg, ver in dependencies
|
||||||
|
]
|
||||||
|
|
||||||
|
def get_or_create_artifact(
|
||||||
|
self,
|
||||||
|
sha256: str,
|
||||||
|
size: int,
|
||||||
|
filename: str,
|
||||||
|
content_type: Optional[str] = None,
|
||||||
|
created_by: str = "system",
|
||||||
|
s3_key: Optional[str] = None,
|
||||||
|
) -> tuple[Artifact, bool]:
|
||||||
|
"""
|
||||||
|
Get existing artifact or create new one atomically.
|
||||||
|
|
||||||
|
Uses INSERT ... ON CONFLICT DO UPDATE to handle races.
|
||||||
|
If artifact exists, increments ref_count.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
sha256: Content hash (primary key)
|
||||||
|
size: File size in bytes
|
||||||
|
filename: Original filename
|
||||||
|
content_type: MIME type
|
||||||
|
created_by: User who created the artifact
|
||||||
|
s3_key: S3 storage key (defaults to standard path)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(artifact, created) tuple where created is True for new artifacts.
|
||||||
|
"""
|
||||||
|
if s3_key is None:
|
||||||
|
s3_key = f"fruits/{sha256[:2]}/{sha256[2:4]}/{sha256}"
|
||||||
|
|
||||||
|
stmt = pg_insert(Artifact).values(
|
||||||
|
id=sha256,
|
||||||
|
size=size,
|
||||||
|
original_name=filename,
|
||||||
|
content_type=content_type,
|
||||||
|
ref_count=1,
|
||||||
|
created_by=created_by,
|
||||||
|
s3_key=s3_key,
|
||||||
|
).on_conflict_do_update(
|
||||||
|
index_elements=['id'],
|
||||||
|
set_={'ref_count': Artifact.ref_count + 1}
|
||||||
|
).returning(Artifact)
|
||||||
|
|
||||||
|
result = self.db.execute(stmt)
|
||||||
|
artifact = result.scalar_one()
|
||||||
|
|
||||||
|
# Check if this was an insert or update by comparing ref_count
|
||||||
|
# ref_count=1 means new, >1 means existing
|
||||||
|
created = artifact.ref_count == 1
|
||||||
|
|
||||||
|
return artifact, created
|
||||||
|
|
||||||
|
def batch_upsert_dependencies(
|
||||||
|
self,
|
||||||
|
artifact_id: str,
|
||||||
|
dependencies: list[tuple[str, str, str]],
|
||||||
|
) -> int:
|
||||||
|
"""
|
||||||
|
Insert dependencies in a single batch operation.
|
||||||
|
|
||||||
|
Uses ON CONFLICT DO NOTHING to skip duplicates.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
artifact_id: SHA256 of the artifact
|
||||||
|
dependencies: List of (project, package, version_constraint)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Number of dependencies inserted.
|
||||||
|
"""
|
||||||
|
if not dependencies:
|
||||||
|
return 0
|
||||||
|
|
||||||
|
values = self._format_dependency_values(artifact_id, dependencies)
|
||||||
|
|
||||||
|
stmt = pg_insert(ArtifactDependency).values(values)
|
||||||
|
stmt = stmt.on_conflict_do_nothing(
|
||||||
|
index_elements=['artifact_id', 'dependency_project', 'dependency_package']
|
||||||
|
)
|
||||||
|
|
||||||
|
result = self.db.execute(stmt)
|
||||||
|
return result.rowcount
|
||||||
|
|
||||||
|
def get_cached_url_with_artifact(
|
||||||
|
self,
|
||||||
|
url_hash: str,
|
||||||
|
) -> Optional[tuple[CachedUrl, Artifact]]:
|
||||||
|
"""
|
||||||
|
Get cached URL and its artifact in a single query.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url_hash: SHA256 of the URL
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(CachedUrl, Artifact) tuple or None if not found.
|
||||||
|
"""
|
||||||
|
result = (
|
||||||
|
self.db.query(CachedUrl, Artifact)
|
||||||
|
.join(Artifact, CachedUrl.artifact_id == Artifact.id)
|
||||||
|
.filter(CachedUrl.url_hash == url_hash)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def get_artifact_dependencies(
|
||||||
|
self,
|
||||||
|
artifact_id: str,
|
||||||
|
) -> list[ArtifactDependency]:
|
||||||
|
"""
|
||||||
|
Get all dependencies for an artifact in a single query.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
artifact_id: SHA256 of the artifact
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of ArtifactDependency objects.
|
||||||
|
"""
|
||||||
|
return (
|
||||||
|
self.db.query(ArtifactDependency)
|
||||||
|
.filter(ArtifactDependency.artifact_id == artifact_id)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
179
backend/app/http_client.py
Normal file
179
backend/app/http_client.py
Normal file
@@ -0,0 +1,179 @@
|
|||||||
|
"""
|
||||||
|
HTTP client manager with connection pooling and lifecycle management.
|
||||||
|
|
||||||
|
Provides:
|
||||||
|
- Shared connection pools for upstream requests
|
||||||
|
- Per-upstream client isolation when needed
|
||||||
|
- Thread pool for blocking I/O operations
|
||||||
|
- FastAPI lifespan integration
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import logging
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from typing import Any, Callable, Optional
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
from .config import Settings
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class HttpClientManager:
|
||||||
|
"""
|
||||||
|
Manages httpx.AsyncClient pools with FastAPI lifespan integration.
|
||||||
|
|
||||||
|
Features:
|
||||||
|
- Default shared pool for general requests
|
||||||
|
- Per-upstream pools for sources needing specific config/auth
|
||||||
|
- Dedicated thread pool for blocking operations
|
||||||
|
- Graceful shutdown
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, settings: Settings):
|
||||||
|
self.max_connections = settings.http_max_connections
|
||||||
|
self.max_keepalive = settings.http_max_keepalive
|
||||||
|
self.connect_timeout = settings.http_connect_timeout
|
||||||
|
self.read_timeout = settings.http_read_timeout
|
||||||
|
self.worker_threads = settings.http_worker_threads
|
||||||
|
|
||||||
|
self._default_client: Optional[httpx.AsyncClient] = None
|
||||||
|
self._upstream_clients: dict[str, httpx.AsyncClient] = {}
|
||||||
|
self._executor: Optional[ThreadPoolExecutor] = None
|
||||||
|
self._started = False
|
||||||
|
|
||||||
|
async def startup(self) -> None:
|
||||||
|
"""Initialize clients and thread pool. Called by FastAPI lifespan."""
|
||||||
|
if self._started:
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
f"Starting HttpClientManager: max_connections={self.max_connections}, "
|
||||||
|
f"worker_threads={self.worker_threads}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create connection limits
|
||||||
|
limits = httpx.Limits(
|
||||||
|
max_connections=self.max_connections,
|
||||||
|
max_keepalive_connections=self.max_keepalive,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create timeout config
|
||||||
|
timeout = httpx.Timeout(
|
||||||
|
connect=self.connect_timeout,
|
||||||
|
read=self.read_timeout,
|
||||||
|
write=self.read_timeout,
|
||||||
|
pool=self.connect_timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create default client
|
||||||
|
self._default_client = httpx.AsyncClient(
|
||||||
|
limits=limits,
|
||||||
|
timeout=timeout,
|
||||||
|
follow_redirects=False, # Handle redirects manually for auth
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create thread pool for blocking operations
|
||||||
|
self._executor = ThreadPoolExecutor(
|
||||||
|
max_workers=self.worker_threads,
|
||||||
|
thread_name_prefix="orchard-blocking-",
|
||||||
|
)
|
||||||
|
|
||||||
|
self._started = True
|
||||||
|
logger.info("HttpClientManager started")
|
||||||
|
|
||||||
|
async def shutdown(self) -> None:
|
||||||
|
"""Close all clients and thread pool. Called by FastAPI lifespan."""
|
||||||
|
if not self._started:
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.info("Shutting down HttpClientManager")
|
||||||
|
|
||||||
|
# Close default client
|
||||||
|
if self._default_client:
|
||||||
|
await self._default_client.aclose()
|
||||||
|
self._default_client = None
|
||||||
|
|
||||||
|
# Close upstream-specific clients
|
||||||
|
for name, client in self._upstream_clients.items():
|
||||||
|
logger.debug(f"Closing upstream client: {name}")
|
||||||
|
await client.aclose()
|
||||||
|
self._upstream_clients.clear()
|
||||||
|
|
||||||
|
# Shutdown thread pool
|
||||||
|
if self._executor:
|
||||||
|
self._executor.shutdown(wait=True)
|
||||||
|
self._executor = None
|
||||||
|
|
||||||
|
self._started = False
|
||||||
|
logger.info("HttpClientManager shutdown complete")
|
||||||
|
|
||||||
|
def get_client(self, upstream_name: Optional[str] = None) -> httpx.AsyncClient:
|
||||||
|
"""
|
||||||
|
Get HTTP client for making requests.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
upstream_name: Optional upstream source name for dedicated pool.
|
||||||
|
If None, returns the default shared client.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
httpx.AsyncClient configured for the request.
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
RuntimeError: If manager not started.
|
||||||
|
"""
|
||||||
|
if not self._started or not self._default_client:
|
||||||
|
raise RuntimeError("HttpClientManager not started. Call startup() first.")
|
||||||
|
|
||||||
|
if upstream_name and upstream_name in self._upstream_clients:
|
||||||
|
return self._upstream_clients[upstream_name]
|
||||||
|
|
||||||
|
return self._default_client
|
||||||
|
|
||||||
|
async def run_blocking(self, func: Callable[..., Any], *args: Any) -> Any:
|
||||||
|
"""
|
||||||
|
Run a blocking function in the thread pool.
|
||||||
|
|
||||||
|
Use this for:
|
||||||
|
- File I/O operations
|
||||||
|
- Archive extraction (zipfile, tarfile)
|
||||||
|
- Hash computation on large data
|
||||||
|
|
||||||
|
Args:
|
||||||
|
func: Synchronous function to execute
|
||||||
|
*args: Arguments to pass to the function
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The function's return value.
|
||||||
|
"""
|
||||||
|
if not self._executor:
|
||||||
|
raise RuntimeError("HttpClientManager not started. Call startup() first.")
|
||||||
|
|
||||||
|
loop = asyncio.get_event_loop()
|
||||||
|
return await loop.run_in_executor(self._executor, func, *args)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def active_connections(self) -> int:
|
||||||
|
"""Get approximate number of active connections (for health checks)."""
|
||||||
|
if not self._default_client:
|
||||||
|
return 0
|
||||||
|
# httpx doesn't expose this directly, return pool size as approximation
|
||||||
|
return self.max_connections
|
||||||
|
|
||||||
|
@property
|
||||||
|
def pool_size(self) -> int:
|
||||||
|
"""Get configured pool size."""
|
||||||
|
return self.max_connections
|
||||||
|
|
||||||
|
@property
|
||||||
|
def executor_active(self) -> int:
|
||||||
|
"""Get number of active thread pool workers."""
|
||||||
|
if not self._executor:
|
||||||
|
return 0
|
||||||
|
return len(self._executor._threads)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def executor_max(self) -> int:
|
||||||
|
"""Get max thread pool workers."""
|
||||||
|
return self.worker_threads
|
||||||
@@ -15,6 +15,8 @@ from .pypi_proxy import router as pypi_router
|
|||||||
from .seed import seed_database
|
from .seed import seed_database
|
||||||
from .auth import create_default_admin
|
from .auth import create_default_admin
|
||||||
from .rate_limit import limiter
|
from .rate_limit import limiter
|
||||||
|
from .http_client import HttpClientManager
|
||||||
|
from .cache_service import CacheService
|
||||||
|
|
||||||
settings = get_settings()
|
settings = get_settings()
|
||||||
logging.basicConfig(level=logging.INFO)
|
logging.basicConfig(level=logging.INFO)
|
||||||
@@ -38,6 +40,17 @@ async def lifespan(app: FastAPI):
|
|||||||
finally:
|
finally:
|
||||||
db.close()
|
db.close()
|
||||||
|
|
||||||
|
# Initialize infrastructure services
|
||||||
|
logger.info("Initializing infrastructure services...")
|
||||||
|
|
||||||
|
app.state.http_client = HttpClientManager(settings)
|
||||||
|
await app.state.http_client.startup()
|
||||||
|
|
||||||
|
app.state.cache = CacheService(settings)
|
||||||
|
await app.state.cache.startup()
|
||||||
|
|
||||||
|
logger.info("Infrastructure services ready")
|
||||||
|
|
||||||
# Seed test data in development mode
|
# Seed test data in development mode
|
||||||
if settings.is_development:
|
if settings.is_development:
|
||||||
logger.info(f"Running in {settings.env} mode - checking for seed data")
|
logger.info(f"Running in {settings.env} mode - checking for seed data")
|
||||||
@@ -51,6 +64,12 @@ async def lifespan(app: FastAPI):
|
|||||||
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
# Shutdown infrastructure services
|
||||||
|
logger.info("Shutting down infrastructure services...")
|
||||||
|
await app.state.http_client.shutdown()
|
||||||
|
await app.state.cache.shutdown()
|
||||||
|
logger.info("Shutdown complete")
|
||||||
|
|
||||||
|
|
||||||
app = FastAPI(
|
app = FastAPI(
|
||||||
title="Orchard",
|
title="Orchard",
|
||||||
|
|||||||
@@ -23,15 +23,28 @@ from fastapi.responses import StreamingResponse, HTMLResponse, RedirectResponse
|
|||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
|
||||||
from .database import get_db
|
from .database import get_db
|
||||||
from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, PackageVersion, ArtifactDependency
|
from .models import UpstreamSource, CachedUrl, Artifact, Project, Package, PackageVersion
|
||||||
from .storage import S3Storage, get_storage
|
from .storage import S3Storage, get_storage
|
||||||
from .config import get_env_upstream_sources, get_settings
|
from .config import get_env_upstream_sources, get_settings
|
||||||
|
from .http_client import HttpClientManager
|
||||||
|
from .cache_service import CacheService, CacheCategory
|
||||||
|
from .db_utils import ArtifactRepository
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
router = APIRouter(prefix="/pypi", tags=["pypi-proxy"])
|
router = APIRouter(prefix="/pypi", tags=["pypi-proxy"])
|
||||||
|
|
||||||
|
|
||||||
|
def get_http_client(request: Request) -> HttpClientManager:
|
||||||
|
"""Get HttpClientManager from app state."""
|
||||||
|
return request.app.state.http_client
|
||||||
|
|
||||||
|
|
||||||
|
def get_cache(request: Request) -> CacheService:
|
||||||
|
"""Get CacheService from app state."""
|
||||||
|
return request.app.state.cache
|
||||||
|
|
||||||
|
|
||||||
# Timeout configuration for proxy requests
|
# Timeout configuration for proxy requests
|
||||||
PROXY_CONNECT_TIMEOUT = 30.0
|
PROXY_CONNECT_TIMEOUT = 30.0
|
||||||
PROXY_READ_TIMEOUT = 60.0
|
PROXY_READ_TIMEOUT = 60.0
|
||||||
@@ -241,6 +254,62 @@ def _extract_pypi_version(filename: str) -> Optional[str]:
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
async def _get_pypi_upstream_sources_cached(
|
||||||
|
db: Session,
|
||||||
|
cache: CacheService,
|
||||||
|
) -> list[UpstreamSource]:
|
||||||
|
"""
|
||||||
|
Get PyPI upstream sources with caching.
|
||||||
|
|
||||||
|
Sources are cached for cache_ttl_upstream seconds to avoid
|
||||||
|
repeated database queries on every request.
|
||||||
|
"""
|
||||||
|
cache_key = "sources"
|
||||||
|
|
||||||
|
# Try cache first
|
||||||
|
cached = await cache.get(CacheCategory.UPSTREAM_SOURCES, cache_key, protocol="pypi")
|
||||||
|
if cached:
|
||||||
|
source_data = json.loads(cached.decode())
|
||||||
|
# Reconstruct UpstreamSource-like objects from cached data
|
||||||
|
# We cache just the essential fields needed for requests
|
||||||
|
return [type('CachedSource', (), d)() for d in source_data]
|
||||||
|
|
||||||
|
# Query database
|
||||||
|
db_sources = (
|
||||||
|
db.query(UpstreamSource)
|
||||||
|
.filter(UpstreamSource.source_type == "pypi", UpstreamSource.enabled == True)
|
||||||
|
.order_by(UpstreamSource.priority)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
|
||||||
|
# Combine with env sources
|
||||||
|
env_sources = [s for s in get_env_upstream_sources() if s.source_type == "pypi"]
|
||||||
|
all_sources = list(db_sources) + list(env_sources)
|
||||||
|
all_sources = sorted(all_sources, key=lambda s: s.priority)
|
||||||
|
|
||||||
|
# Cache the essential fields
|
||||||
|
if all_sources and cache.enabled:
|
||||||
|
cache_data = [
|
||||||
|
{
|
||||||
|
"name": s.name,
|
||||||
|
"url": s.url,
|
||||||
|
"priority": s.priority,
|
||||||
|
"auth_type": getattr(s, "auth_type", "none"),
|
||||||
|
"username": getattr(s, "username", None),
|
||||||
|
"password": getattr(s, "password", None),
|
||||||
|
}
|
||||||
|
for s in all_sources
|
||||||
|
]
|
||||||
|
await cache.set(
|
||||||
|
CacheCategory.UPSTREAM_SOURCES,
|
||||||
|
cache_key,
|
||||||
|
json.dumps(cache_data).encode(),
|
||||||
|
protocol="pypi",
|
||||||
|
)
|
||||||
|
|
||||||
|
return all_sources
|
||||||
|
|
||||||
|
|
||||||
def _get_pypi_upstream_sources(db: Session) -> list[UpstreamSource]:
|
def _get_pypi_upstream_sources(db: Session) -> list[UpstreamSource]:
|
||||||
"""Get all enabled upstream sources configured for PyPI."""
|
"""Get all enabled upstream sources configured for PyPI."""
|
||||||
# Get database sources
|
# Get database sources
|
||||||
@@ -573,6 +642,8 @@ async def pypi_download_file(
|
|||||||
upstream: Optional[str] = None,
|
upstream: Optional[str] = None,
|
||||||
db: Session = Depends(get_db),
|
db: Session = Depends(get_db),
|
||||||
storage: S3Storage = Depends(get_storage),
|
storage: S3Storage = Depends(get_storage),
|
||||||
|
http_client: HttpClientManager = Depends(get_http_client),
|
||||||
|
cache: CacheService = Depends(get_cache),
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Download a package file, caching it in Orchard.
|
Download a package file, caching it in Orchard.
|
||||||
@@ -654,7 +725,9 @@ async def pypi_download_file(
|
|||||||
headers.update(_build_auth_headers(matched_source))
|
headers.update(_build_auth_headers(matched_source))
|
||||||
auth = _get_basic_auth(matched_source) if matched_source else None
|
auth = _get_basic_auth(matched_source) if matched_source else None
|
||||||
|
|
||||||
timeout = httpx.Timeout(300.0, connect=PROXY_CONNECT_TIMEOUT) # 5 minutes for large files
|
# Use shared HTTP client from pool with longer timeout for file downloads
|
||||||
|
client = http_client.get_client()
|
||||||
|
download_timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0)
|
||||||
|
|
||||||
# Initialize extracted dependencies list
|
# Initialize extracted dependencies list
|
||||||
extracted_deps = []
|
extracted_deps = []
|
||||||
@@ -662,78 +735,79 @@ async def pypi_download_file(
|
|||||||
# Fetch the file
|
# Fetch the file
|
||||||
logger.info(f"PyPI proxy: fetching {filename} from {upstream_url}")
|
logger.info(f"PyPI proxy: fetching {filename} from {upstream_url}")
|
||||||
|
|
||||||
async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client:
|
response = await client.get(
|
||||||
|
upstream_url,
|
||||||
|
headers=headers,
|
||||||
|
auth=auth,
|
||||||
|
timeout=download_timeout,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Handle redirects manually
|
||||||
|
redirect_count = 0
|
||||||
|
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
|
||||||
|
redirect_url = response.headers.get('location')
|
||||||
|
if not redirect_url:
|
||||||
|
break
|
||||||
|
|
||||||
|
if not redirect_url.startswith('http'):
|
||||||
|
redirect_url = urljoin(upstream_url, redirect_url)
|
||||||
|
|
||||||
|
logger.info(f"PyPI proxy: following redirect to {redirect_url}")
|
||||||
|
|
||||||
|
# Don't send auth to different hosts
|
||||||
|
redirect_headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
||||||
|
redirect_auth = None
|
||||||
|
if urlparse(redirect_url).netloc == urlparse(upstream_url).netloc:
|
||||||
|
redirect_headers.update(headers)
|
||||||
|
redirect_auth = auth
|
||||||
|
|
||||||
response = await client.get(
|
response = await client.get(
|
||||||
upstream_url,
|
redirect_url,
|
||||||
headers=headers,
|
headers=redirect_headers,
|
||||||
auth=auth,
|
auth=redirect_auth,
|
||||||
|
follow_redirects=False,
|
||||||
|
timeout=download_timeout,
|
||||||
|
)
|
||||||
|
redirect_count += 1
|
||||||
|
|
||||||
|
if response.status_code != 200:
|
||||||
|
# Parse upstream error for policy/blocking messages
|
||||||
|
error_detail = _parse_upstream_error(response)
|
||||||
|
logger.warning(f"PyPI proxy: upstream returned {response.status_code} for {filename}: {error_detail}")
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=response.status_code,
|
||||||
|
detail=f"Upstream error: {error_detail}"
|
||||||
)
|
)
|
||||||
|
|
||||||
# Handle redirects manually
|
content_type = response.headers.get('content-type', 'application/octet-stream')
|
||||||
redirect_count = 0
|
|
||||||
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
|
|
||||||
redirect_url = response.headers.get('location')
|
|
||||||
if not redirect_url:
|
|
||||||
break
|
|
||||||
|
|
||||||
if not redirect_url.startswith('http'):
|
# Stream to temp file to avoid loading large packages into memory
|
||||||
redirect_url = urljoin(upstream_url, redirect_url)
|
# This keeps memory usage constant regardless of package size
|
||||||
|
# Using async iteration to avoid blocking the event loop
|
||||||
|
tmp_path = None
|
||||||
|
try:
|
||||||
|
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file:
|
||||||
|
tmp_path = tmp_file.name
|
||||||
|
async for chunk in response.aiter_bytes(chunk_size=65536): # 64KB chunks
|
||||||
|
tmp_file.write(chunk)
|
||||||
|
|
||||||
logger.info(f"PyPI proxy: following redirect to {redirect_url}")
|
# Store in S3 from temp file (computes hash and deduplicates automatically)
|
||||||
|
with open(tmp_path, 'rb') as f:
|
||||||
|
result = storage.store(f)
|
||||||
|
sha256 = result.sha256
|
||||||
|
size = result.size
|
||||||
|
s3_key = result.s3_key
|
||||||
|
|
||||||
# Don't send auth to different hosts
|
# Extract dependencies from the temp file before cleaning up
|
||||||
redirect_headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
|
extracted_deps = _extract_dependencies_from_file(tmp_path, filename)
|
||||||
redirect_auth = None
|
if extracted_deps:
|
||||||
if urlparse(redirect_url).netloc == urlparse(upstream_url).netloc:
|
logger.info(f"PyPI proxy: extracted {len(extracted_deps)} dependencies from {filename}")
|
||||||
redirect_headers.update(headers)
|
|
||||||
redirect_auth = auth
|
|
||||||
|
|
||||||
response = await client.get(
|
logger.info(f"PyPI proxy: downloaded {filename}, {size} bytes, sha256={sha256[:12]}")
|
||||||
redirect_url,
|
finally:
|
||||||
headers=redirect_headers,
|
# Clean up temp file
|
||||||
auth=redirect_auth,
|
if tmp_path and os.path.exists(tmp_path):
|
||||||
follow_redirects=False,
|
os.unlink(tmp_path)
|
||||||
)
|
|
||||||
redirect_count += 1
|
|
||||||
|
|
||||||
if response.status_code != 200:
|
|
||||||
# Parse upstream error for policy/blocking messages
|
|
||||||
error_detail = _parse_upstream_error(response)
|
|
||||||
logger.warning(f"PyPI proxy: upstream returned {response.status_code} for {filename}: {error_detail}")
|
|
||||||
raise HTTPException(
|
|
||||||
status_code=response.status_code,
|
|
||||||
detail=f"Upstream error: {error_detail}"
|
|
||||||
)
|
|
||||||
|
|
||||||
content_type = response.headers.get('content-type', 'application/octet-stream')
|
|
||||||
|
|
||||||
# Stream to temp file to avoid loading large packages into memory
|
|
||||||
# This keeps memory usage constant regardless of package size
|
|
||||||
# Using async iteration to avoid blocking the event loop
|
|
||||||
tmp_path = None
|
|
||||||
try:
|
|
||||||
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file:
|
|
||||||
tmp_path = tmp_file.name
|
|
||||||
async for chunk in response.aiter_bytes(chunk_size=65536): # 64KB chunks
|
|
||||||
tmp_file.write(chunk)
|
|
||||||
|
|
||||||
# Store in S3 from temp file (computes hash and deduplicates automatically)
|
|
||||||
with open(tmp_path, 'rb') as f:
|
|
||||||
result = storage.store(f)
|
|
||||||
sha256 = result.sha256
|
|
||||||
size = result.size
|
|
||||||
s3_key = result.s3_key
|
|
||||||
|
|
||||||
# Extract dependencies from the temp file before cleaning up
|
|
||||||
extracted_deps = _extract_dependencies_from_file(tmp_path, filename)
|
|
||||||
if extracted_deps:
|
|
||||||
logger.info(f"PyPI proxy: extracted {len(extracted_deps)} dependencies from {filename}")
|
|
||||||
|
|
||||||
logger.info(f"PyPI proxy: downloaded {filename}, {size} bytes, sha256={sha256[:12]}")
|
|
||||||
finally:
|
|
||||||
# Clean up temp file
|
|
||||||
if tmp_path and os.path.exists(tmp_path):
|
|
||||||
os.unlink(tmp_path)
|
|
||||||
|
|
||||||
# Check if artifact already exists
|
# Check if artifact already exists
|
||||||
existing = db.query(Artifact).filter(Artifact.id == sha256).first()
|
existing = db.query(Artifact).filter(Artifact.id == sha256).first()
|
||||||
@@ -821,7 +895,7 @@ async def pypi_download_file(
|
|||||||
)
|
)
|
||||||
db.add(cached_url_record)
|
db.add(cached_url_record)
|
||||||
|
|
||||||
# Store extracted dependencies (deduplicate first - METADATA can list same dep under multiple extras)
|
# Store extracted dependencies using batch operation
|
||||||
if extracted_deps:
|
if extracted_deps:
|
||||||
# Deduplicate: keep first version constraint seen for each package name
|
# Deduplicate: keep first version constraint seen for each package name
|
||||||
seen_deps: dict[str, str] = {}
|
seen_deps: dict[str, str] = {}
|
||||||
@@ -829,22 +903,17 @@ async def pypi_download_file(
|
|||||||
if dep_name not in seen_deps:
|
if dep_name not in seen_deps:
|
||||||
seen_deps[dep_name] = dep_version if dep_version else "*"
|
seen_deps[dep_name] = dep_version if dep_version else "*"
|
||||||
|
|
||||||
for dep_name, dep_version in seen_deps.items():
|
# Convert to list of tuples for batch insert
|
||||||
# Check if this dependency already exists for this artifact
|
deps_to_store = [
|
||||||
existing_dep = db.query(ArtifactDependency).filter(
|
("_pypi", dep_name, dep_version)
|
||||||
ArtifactDependency.artifact_id == sha256,
|
for dep_name, dep_version in seen_deps.items()
|
||||||
ArtifactDependency.dependency_project == "_pypi",
|
]
|
||||||
ArtifactDependency.dependency_package == dep_name,
|
|
||||||
).first()
|
|
||||||
|
|
||||||
if not existing_dep:
|
# Batch upsert - handles duplicates with ON CONFLICT DO NOTHING
|
||||||
dep = ArtifactDependency(
|
repo = ArtifactRepository(db)
|
||||||
artifact_id=sha256,
|
inserted = repo.batch_upsert_dependencies(sha256, deps_to_store)
|
||||||
dependency_project="_pypi",
|
if inserted > 0:
|
||||||
dependency_package=dep_name,
|
logger.debug(f"Stored {inserted} dependencies for {sha256[:12]}...")
|
||||||
version_constraint=dep_version,
|
|
||||||
)
|
|
||||||
db.add(dep)
|
|
||||||
|
|
||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
|
|||||||
@@ -421,7 +421,8 @@ def _log_audit(
|
|||||||
|
|
||||||
# Health check
|
# Health check
|
||||||
@router.get("/health", response_model=HealthResponse)
|
@router.get("/health", response_model=HealthResponse)
|
||||||
def health_check(
|
async def health_check(
|
||||||
|
request: Request,
|
||||||
db: Session = Depends(get_db),
|
db: Session = Depends(get_db),
|
||||||
storage: S3Storage = Depends(get_storage),
|
storage: S3Storage = Depends(get_storage),
|
||||||
):
|
):
|
||||||
@@ -449,11 +450,30 @@ def health_check(
|
|||||||
|
|
||||||
overall_status = "ok" if (storage_healthy and database_healthy) else "degraded"
|
overall_status = "ok" if (storage_healthy and database_healthy) else "degraded"
|
||||||
|
|
||||||
return HealthResponse(
|
# Build response with optional infrastructure status
|
||||||
status=overall_status,
|
response_data = {
|
||||||
storage_healthy=storage_healthy,
|
"status": overall_status,
|
||||||
database_healthy=database_healthy,
|
"storage_healthy": storage_healthy,
|
||||||
)
|
"database_healthy": database_healthy,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add HTTP pool status if available
|
||||||
|
if hasattr(request.app.state, 'http_client'):
|
||||||
|
http_client = request.app.state.http_client
|
||||||
|
response_data["http_pool"] = {
|
||||||
|
"pool_size": http_client.pool_size,
|
||||||
|
"worker_threads": http_client.executor_max,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add cache status if available
|
||||||
|
if hasattr(request.app.state, 'cache'):
|
||||||
|
cache = request.app.state.cache
|
||||||
|
response_data["cache"] = {
|
||||||
|
"enabled": cache.enabled,
|
||||||
|
"connected": await cache.ping() if cache.enabled else False,
|
||||||
|
}
|
||||||
|
|
||||||
|
return HealthResponse(**response_data)
|
||||||
|
|
||||||
|
|
||||||
# --- Authentication Routes ---
|
# --- Authentication Routes ---
|
||||||
|
|||||||
@@ -493,6 +493,8 @@ class HealthResponse(BaseModel):
|
|||||||
version: str = "1.0.0"
|
version: str = "1.0.0"
|
||||||
storage_healthy: Optional[bool] = None
|
storage_healthy: Optional[bool] = None
|
||||||
database_healthy: Optional[bool] = None
|
database_healthy: Optional[bool] = None
|
||||||
|
http_pool: Optional[Dict[str, Any]] = None
|
||||||
|
cache: Optional[Dict[str, Any]] = None
|
||||||
|
|
||||||
|
|
||||||
# Garbage collection schemas
|
# Garbage collection schemas
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ passlib[bcrypt]==1.7.4
|
|||||||
bcrypt==4.0.1
|
bcrypt==4.0.1
|
||||||
slowapi==0.1.9
|
slowapi==0.1.9
|
||||||
httpx>=0.25.0
|
httpx>=0.25.0
|
||||||
|
redis>=5.0.0
|
||||||
|
|
||||||
# Test dependencies
|
# Test dependencies
|
||||||
pytest>=7.4.0
|
pytest>=7.4.0
|
||||||
|
|||||||
@@ -135,3 +135,19 @@ class TestPyPIPackageNormalization:
|
|||||||
assert "text/html" in response.headers.get("content-type", "")
|
assert "text/html" in response.headers.get("content-type", "")
|
||||||
elif response.status_code == 503:
|
elif response.status_code == 503:
|
||||||
assert "No PyPI upstream sources configured" in response.json()["detail"]
|
assert "No PyPI upstream sources configured" in response.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestPyPIProxyInfrastructure:
|
||||||
|
"""Tests for PyPI proxy infrastructure integration."""
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_health_endpoint_includes_infrastructure(self, integration_client):
|
||||||
|
"""Health endpoint should report infrastructure status."""
|
||||||
|
response = integration_client.get("/health")
|
||||||
|
assert response.status_code == 200
|
||||||
|
|
||||||
|
data = response.json()
|
||||||
|
assert data["status"] == "ok"
|
||||||
|
# Infrastructure status should be present
|
||||||
|
assert "http_pool" in data
|
||||||
|
assert "cache" in data
|
||||||
|
|||||||
374
backend/tests/unit/test_cache_service.py
Normal file
374
backend/tests/unit/test_cache_service.py
Normal file
@@ -0,0 +1,374 @@
|
|||||||
|
"""Tests for CacheService."""
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock, AsyncMock, patch
|
||||||
|
|
||||||
|
|
||||||
|
class TestCacheCategory:
|
||||||
|
"""Tests for cache category enum."""
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_immutable_categories_have_no_ttl(self):
|
||||||
|
"""Immutable categories should return None for TTL."""
|
||||||
|
from app.cache_service import CacheCategory, get_category_ttl
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
|
||||||
|
assert get_category_ttl(CacheCategory.ARTIFACT_METADATA, settings) is None
|
||||||
|
assert get_category_ttl(CacheCategory.ARTIFACT_DEPENDENCIES, settings) is None
|
||||||
|
assert get_category_ttl(CacheCategory.DEPENDENCY_RESOLUTION, settings) is None
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_mutable_categories_have_ttl(self):
|
||||||
|
"""Mutable categories should return configured TTL."""
|
||||||
|
from app.cache_service import CacheCategory, get_category_ttl
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(
|
||||||
|
cache_ttl_index=300,
|
||||||
|
cache_ttl_upstream=3600,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert get_category_ttl(CacheCategory.PACKAGE_INDEX, settings) == 300
|
||||||
|
assert get_category_ttl(CacheCategory.UPSTREAM_SOURCES, settings) == 3600
|
||||||
|
|
||||||
|
|
||||||
|
class TestCacheService:
|
||||||
|
"""Tests for Redis cache service."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_disabled_cache_returns_none(self):
|
||||||
|
"""When Redis disabled, get() should return None."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=False)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
await cache.startup()
|
||||||
|
|
||||||
|
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key")
|
||||||
|
|
||||||
|
assert result is None
|
||||||
|
await cache.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_disabled_cache_set_is_noop(self):
|
||||||
|
"""When Redis disabled, set() should be a no-op."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=False)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
await cache.startup()
|
||||||
|
|
||||||
|
# Should not raise
|
||||||
|
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"test-value")
|
||||||
|
|
||||||
|
await cache.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_cache_key_namespacing(self):
|
||||||
|
"""Cache keys should be properly namespaced."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
|
||||||
|
key = CacheService._make_key(CacheCategory.PACKAGE_INDEX, "pypi", "numpy")
|
||||||
|
|
||||||
|
assert key == "orchard:index:pypi:numpy"
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_ping_returns_false_when_disabled(self):
|
||||||
|
"""ping() should return False when Redis is disabled."""
|
||||||
|
from app.cache_service import CacheService
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=False)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
await cache.startup()
|
||||||
|
|
||||||
|
result = await cache.ping()
|
||||||
|
|
||||||
|
assert result is False
|
||||||
|
await cache.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_enabled_property(self):
|
||||||
|
"""enabled property should reflect Redis state."""
|
||||||
|
from app.cache_service import CacheService
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=False)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
|
||||||
|
assert cache.enabled is False
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_delete_is_noop_when_disabled(self):
|
||||||
|
"""delete() should be a no-op when Redis is disabled."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=False)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
await cache.startup()
|
||||||
|
|
||||||
|
# Should not raise
|
||||||
|
await cache.delete(CacheCategory.PACKAGE_INDEX, "test-key")
|
||||||
|
|
||||||
|
await cache.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_invalidate_pattern_returns_zero_when_disabled(self):
|
||||||
|
"""invalidate_pattern() should return 0 when Redis is disabled."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=False)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
await cache.startup()
|
||||||
|
|
||||||
|
result = await cache.invalidate_pattern(CacheCategory.PACKAGE_INDEX)
|
||||||
|
|
||||||
|
assert result == 0
|
||||||
|
await cache.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_startup_already_started(self):
|
||||||
|
"""startup() should be idempotent."""
|
||||||
|
from app.cache_service import CacheService
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=False)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
await cache.startup()
|
||||||
|
await cache.startup() # Should not raise
|
||||||
|
|
||||||
|
assert cache._started is True
|
||||||
|
await cache.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_shutdown_not_started(self):
|
||||||
|
"""shutdown() should handle not-started state."""
|
||||||
|
from app.cache_service import CacheService
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=False)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
|
||||||
|
# Should not raise
|
||||||
|
await cache.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_make_key_with_default_protocol(self):
|
||||||
|
"""_make_key should work with default protocol."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
|
||||||
|
key = CacheService._make_key(CacheCategory.ARTIFACT_METADATA, "default", "abc123")
|
||||||
|
|
||||||
|
assert key == "orchard:artifact:default:abc123"
|
||||||
|
|
||||||
|
|
||||||
|
class TestCacheServiceWithMockedRedis:
|
||||||
|
"""Tests for CacheService with mocked Redis client."""
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_get_returns_cached_value(self):
|
||||||
|
"""get() should return cached value when available."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=True)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
|
||||||
|
# Mock the redis client
|
||||||
|
mock_redis = AsyncMock()
|
||||||
|
mock_redis.get.return_value = b"cached-data"
|
||||||
|
cache._redis = mock_redis
|
||||||
|
cache._enabled = True
|
||||||
|
cache._started = True
|
||||||
|
|
||||||
|
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key", "pypi")
|
||||||
|
|
||||||
|
assert result == b"cached-data"
|
||||||
|
mock_redis.get.assert_called_once_with("orchard:index:pypi:test-key")
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_set_with_ttl(self):
|
||||||
|
"""set() should use setex for mutable categories."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=True, cache_ttl_index=300)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
|
||||||
|
mock_redis = AsyncMock()
|
||||||
|
cache._redis = mock_redis
|
||||||
|
cache._enabled = True
|
||||||
|
cache._started = True
|
||||||
|
|
||||||
|
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"test-value", "pypi")
|
||||||
|
|
||||||
|
mock_redis.setex.assert_called_once_with(
|
||||||
|
"orchard:index:pypi:test-key", 300, b"test-value"
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_set_without_ttl(self):
|
||||||
|
"""set() should use set (no expiry) for immutable categories."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=True)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
|
||||||
|
mock_redis = AsyncMock()
|
||||||
|
cache._redis = mock_redis
|
||||||
|
cache._enabled = True
|
||||||
|
cache._started = True
|
||||||
|
|
||||||
|
await cache.set(
|
||||||
|
CacheCategory.ARTIFACT_METADATA, "abc123", b"metadata", "pypi"
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_redis.set.assert_called_once_with(
|
||||||
|
"orchard:artifact:pypi:abc123", b"metadata"
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_delete_calls_redis_delete(self):
|
||||||
|
"""delete() should call Redis delete."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=True)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
|
||||||
|
mock_redis = AsyncMock()
|
||||||
|
cache._redis = mock_redis
|
||||||
|
cache._enabled = True
|
||||||
|
cache._started = True
|
||||||
|
|
||||||
|
await cache.delete(CacheCategory.PACKAGE_INDEX, "test-key", "pypi")
|
||||||
|
|
||||||
|
mock_redis.delete.assert_called_once_with("orchard:index:pypi:test-key")
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_invalidate_pattern_deletes_matching_keys(self):
|
||||||
|
"""invalidate_pattern() should delete all matching keys."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=True)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
|
||||||
|
mock_redis = AsyncMock()
|
||||||
|
|
||||||
|
# Create an async generator for scan_iter
|
||||||
|
async def mock_scan_iter(match=None):
|
||||||
|
for key in [b"orchard:index:pypi:numpy", b"orchard:index:pypi:requests"]:
|
||||||
|
yield key
|
||||||
|
|
||||||
|
mock_redis.scan_iter = mock_scan_iter
|
||||||
|
mock_redis.delete.return_value = 2
|
||||||
|
cache._redis = mock_redis
|
||||||
|
cache._enabled = True
|
||||||
|
cache._started = True
|
||||||
|
|
||||||
|
result = await cache.invalidate_pattern(CacheCategory.PACKAGE_INDEX, "*", "pypi")
|
||||||
|
|
||||||
|
assert result == 2
|
||||||
|
mock_redis.delete.assert_called_once()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_ping_returns_true_when_connected(self):
|
||||||
|
"""ping() should return True when Redis responds."""
|
||||||
|
from app.cache_service import CacheService
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=True)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
|
||||||
|
mock_redis = AsyncMock()
|
||||||
|
mock_redis.ping.return_value = True
|
||||||
|
cache._redis = mock_redis
|
||||||
|
cache._enabled = True
|
||||||
|
cache._started = True
|
||||||
|
|
||||||
|
result = await cache.ping()
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_get_handles_exception(self):
|
||||||
|
"""get() should return None and log warning on exception."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=True)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
|
||||||
|
mock_redis = AsyncMock()
|
||||||
|
mock_redis.get.side_effect = Exception("Connection lost")
|
||||||
|
cache._redis = mock_redis
|
||||||
|
cache._enabled = True
|
||||||
|
cache._started = True
|
||||||
|
|
||||||
|
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key")
|
||||||
|
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_set_handles_exception(self):
|
||||||
|
"""set() should log warning on exception."""
|
||||||
|
from app.cache_service import CacheService, CacheCategory
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=True, cache_ttl_index=300)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
|
||||||
|
mock_redis = AsyncMock()
|
||||||
|
mock_redis.setex.side_effect = Exception("Connection lost")
|
||||||
|
cache._redis = mock_redis
|
||||||
|
cache._enabled = True
|
||||||
|
cache._started = True
|
||||||
|
|
||||||
|
# Should not raise
|
||||||
|
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"value")
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_ping_returns_false_on_exception(self):
|
||||||
|
"""ping() should return False on exception."""
|
||||||
|
from app.cache_service import CacheService
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(redis_enabled=True)
|
||||||
|
cache = CacheService(settings)
|
||||||
|
|
||||||
|
mock_redis = AsyncMock()
|
||||||
|
mock_redis.ping.side_effect = Exception("Connection lost")
|
||||||
|
cache._redis = mock_redis
|
||||||
|
cache._enabled = True
|
||||||
|
cache._started = True
|
||||||
|
|
||||||
|
result = await cache.ping()
|
||||||
|
|
||||||
|
assert result is False
|
||||||
|
|
||||||
167
backend/tests/unit/test_db_utils.py
Normal file
167
backend/tests/unit/test_db_utils.py
Normal file
@@ -0,0 +1,167 @@
|
|||||||
|
"""Tests for database utility functions."""
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
|
||||||
|
class TestArtifactRepository:
|
||||||
|
"""Tests for ArtifactRepository."""
|
||||||
|
|
||||||
|
def test_batch_dependency_values_formatting(self):
|
||||||
|
"""batch_upsert_dependencies should format values correctly."""
|
||||||
|
from app.db_utils import ArtifactRepository
|
||||||
|
|
||||||
|
deps = [
|
||||||
|
("_pypi", "numpy", ">=1.21.0"),
|
||||||
|
("_pypi", "requests", "*"),
|
||||||
|
("myproject", "mylib", "==1.0.0"),
|
||||||
|
]
|
||||||
|
|
||||||
|
values = ArtifactRepository._format_dependency_values("abc123", deps)
|
||||||
|
|
||||||
|
assert len(values) == 3
|
||||||
|
assert values[0] == {
|
||||||
|
"artifact_id": "abc123",
|
||||||
|
"dependency_project": "_pypi",
|
||||||
|
"dependency_package": "numpy",
|
||||||
|
"version_constraint": ">=1.21.0",
|
||||||
|
}
|
||||||
|
assert values[2]["dependency_project"] == "myproject"
|
||||||
|
|
||||||
|
def test_empty_dependencies_returns_empty_list(self):
|
||||||
|
"""Empty dependency list should return empty values."""
|
||||||
|
from app.db_utils import ArtifactRepository
|
||||||
|
|
||||||
|
values = ArtifactRepository._format_dependency_values("abc123", [])
|
||||||
|
|
||||||
|
assert values == []
|
||||||
|
|
||||||
|
def test_format_dependency_values_preserves_special_characters(self):
|
||||||
|
"""Version constraints with special characters should be preserved."""
|
||||||
|
from app.db_utils import ArtifactRepository
|
||||||
|
|
||||||
|
deps = [
|
||||||
|
("_pypi", "package-name", ">=1.0.0,<2.0.0"),
|
||||||
|
("_pypi", "another_pkg", "~=1.4.2"),
|
||||||
|
]
|
||||||
|
|
||||||
|
values = ArtifactRepository._format_dependency_values("hash123", deps)
|
||||||
|
|
||||||
|
assert values[0]["version_constraint"] == ">=1.0.0,<2.0.0"
|
||||||
|
assert values[1]["version_constraint"] == "~=1.4.2"
|
||||||
|
|
||||||
|
def test_batch_upsert_dependencies_returns_zero_for_empty(self):
|
||||||
|
"""batch_upsert_dependencies should return 0 for empty list without DB call."""
|
||||||
|
from app.db_utils import ArtifactRepository
|
||||||
|
|
||||||
|
mock_db = MagicMock()
|
||||||
|
repo = ArtifactRepository(mock_db)
|
||||||
|
|
||||||
|
result = repo.batch_upsert_dependencies("abc123", [])
|
||||||
|
|
||||||
|
assert result == 0
|
||||||
|
# Verify no DB operations were performed
|
||||||
|
mock_db.execute.assert_not_called()
|
||||||
|
|
||||||
|
def test_get_or_create_artifact_builds_correct_statement(self):
|
||||||
|
"""get_or_create_artifact should use ON CONFLICT DO UPDATE."""
|
||||||
|
from app.db_utils import ArtifactRepository
|
||||||
|
from app.models import Artifact
|
||||||
|
|
||||||
|
mock_db = MagicMock()
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_artifact = MagicMock()
|
||||||
|
mock_artifact.ref_count = 1
|
||||||
|
mock_result.scalar_one.return_value = mock_artifact
|
||||||
|
mock_db.execute.return_value = mock_result
|
||||||
|
|
||||||
|
repo = ArtifactRepository(mock_db)
|
||||||
|
artifact, created = repo.get_or_create_artifact(
|
||||||
|
sha256="abc123def456",
|
||||||
|
size=1024,
|
||||||
|
filename="test.whl",
|
||||||
|
content_type="application/zip",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert mock_db.execute.called
|
||||||
|
assert created is True
|
||||||
|
assert artifact == mock_artifact
|
||||||
|
|
||||||
|
def test_get_or_create_artifact_existing_not_created(self):
|
||||||
|
"""get_or_create_artifact should return created=False for existing artifact."""
|
||||||
|
from app.db_utils import ArtifactRepository
|
||||||
|
|
||||||
|
mock_db = MagicMock()
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_artifact = MagicMock()
|
||||||
|
mock_artifact.ref_count = 5 # Existing artifact with ref_count > 1
|
||||||
|
mock_result.scalar_one.return_value = mock_artifact
|
||||||
|
mock_db.execute.return_value = mock_result
|
||||||
|
|
||||||
|
repo = ArtifactRepository(mock_db)
|
||||||
|
artifact, created = repo.get_or_create_artifact(
|
||||||
|
sha256="abc123def456",
|
||||||
|
size=1024,
|
||||||
|
filename="test.whl",
|
||||||
|
)
|
||||||
|
|
||||||
|
assert created is False
|
||||||
|
|
||||||
|
def test_get_cached_url_with_artifact_returns_tuple(self):
|
||||||
|
"""get_cached_url_with_artifact should return (CachedUrl, Artifact) tuple."""
|
||||||
|
from app.db_utils import ArtifactRepository
|
||||||
|
|
||||||
|
mock_db = MagicMock()
|
||||||
|
mock_cached_url = MagicMock()
|
||||||
|
mock_artifact = MagicMock()
|
||||||
|
mock_db.query.return_value.join.return_value.filter.return_value.first.return_value = (
|
||||||
|
mock_cached_url,
|
||||||
|
mock_artifact,
|
||||||
|
)
|
||||||
|
|
||||||
|
repo = ArtifactRepository(mock_db)
|
||||||
|
result = repo.get_cached_url_with_artifact("url_hash_123")
|
||||||
|
|
||||||
|
assert result == (mock_cached_url, mock_artifact)
|
||||||
|
|
||||||
|
def test_get_cached_url_with_artifact_returns_none_when_not_found(self):
|
||||||
|
"""get_cached_url_with_artifact should return None when URL not cached."""
|
||||||
|
from app.db_utils import ArtifactRepository
|
||||||
|
|
||||||
|
mock_db = MagicMock()
|
||||||
|
mock_db.query.return_value.join.return_value.filter.return_value.first.return_value = None
|
||||||
|
|
||||||
|
repo = ArtifactRepository(mock_db)
|
||||||
|
result = repo.get_cached_url_with_artifact("nonexistent_hash")
|
||||||
|
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_get_artifact_dependencies_returns_list(self):
|
||||||
|
"""get_artifact_dependencies should return list of dependencies."""
|
||||||
|
from app.db_utils import ArtifactRepository
|
||||||
|
|
||||||
|
mock_db = MagicMock()
|
||||||
|
mock_dep1 = MagicMock()
|
||||||
|
mock_dep2 = MagicMock()
|
||||||
|
mock_db.query.return_value.filter.return_value.all.return_value = [
|
||||||
|
mock_dep1,
|
||||||
|
mock_dep2,
|
||||||
|
]
|
||||||
|
|
||||||
|
repo = ArtifactRepository(mock_db)
|
||||||
|
result = repo.get_artifact_dependencies("artifact_hash_123")
|
||||||
|
|
||||||
|
assert len(result) == 2
|
||||||
|
assert result[0] == mock_dep1
|
||||||
|
assert result[1] == mock_dep2
|
||||||
|
|
||||||
|
def test_get_artifact_dependencies_returns_empty_list(self):
|
||||||
|
"""get_artifact_dependencies should return empty list when no dependencies."""
|
||||||
|
from app.db_utils import ArtifactRepository
|
||||||
|
|
||||||
|
mock_db = MagicMock()
|
||||||
|
mock_db.query.return_value.filter.return_value.all.return_value = []
|
||||||
|
|
||||||
|
repo = ArtifactRepository(mock_db)
|
||||||
|
result = repo.get_artifact_dependencies("artifact_without_deps")
|
||||||
|
|
||||||
|
assert result == []
|
||||||
194
backend/tests/unit/test_http_client.py
Normal file
194
backend/tests/unit/test_http_client.py
Normal file
@@ -0,0 +1,194 @@
|
|||||||
|
"""Tests for HttpClientManager."""
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import MagicMock, AsyncMock, patch
|
||||||
|
|
||||||
|
|
||||||
|
class TestHttpClientManager:
|
||||||
|
"""Tests for HTTP client pool management."""
|
||||||
|
|
||||||
|
@pytest.mark.unit
|
||||||
|
def test_manager_initializes_with_settings(self):
|
||||||
|
"""Manager should initialize with config settings."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(
|
||||||
|
http_max_connections=50,
|
||||||
|
http_connect_timeout=15.0,
|
||||||
|
)
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
assert manager.max_connections == 50
|
||||||
|
assert manager.connect_timeout == 15.0
|
||||||
|
assert manager._default_client is None # Not started yet
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_startup_creates_client(self):
|
||||||
|
"""Startup should create the default async client."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
await manager.startup()
|
||||||
|
|
||||||
|
assert manager._default_client is not None
|
||||||
|
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_shutdown_closes_client(self):
|
||||||
|
"""Shutdown should close all clients gracefully."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
await manager.startup()
|
||||||
|
client = manager._default_client
|
||||||
|
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
|
assert manager._default_client is None
|
||||||
|
assert client.is_closed
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_get_client_returns_default(self):
|
||||||
|
"""get_client() should return the default client."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
await manager.startup()
|
||||||
|
|
||||||
|
client = manager.get_client()
|
||||||
|
|
||||||
|
assert client is manager._default_client
|
||||||
|
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_get_client_raises_if_not_started(self):
|
||||||
|
"""get_client() should raise RuntimeError if manager not started."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="not started"):
|
||||||
|
manager.get_client()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_run_blocking_executes_in_thread_pool(self):
|
||||||
|
"""run_blocking should execute sync functions in thread pool."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
import threading
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
await manager.startup()
|
||||||
|
|
||||||
|
main_thread = threading.current_thread()
|
||||||
|
execution_thread = None
|
||||||
|
|
||||||
|
def blocking_func():
|
||||||
|
nonlocal execution_thread
|
||||||
|
execution_thread = threading.current_thread()
|
||||||
|
return "result"
|
||||||
|
|
||||||
|
result = await manager.run_blocking(blocking_func)
|
||||||
|
|
||||||
|
assert result == "result"
|
||||||
|
assert execution_thread is not main_thread
|
||||||
|
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_run_blocking_raises_if_not_started(self):
|
||||||
|
"""run_blocking should raise RuntimeError if manager not started."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
with pytest.raises(RuntimeError, match="not started"):
|
||||||
|
await manager.run_blocking(lambda: None)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_startup_idempotent(self):
|
||||||
|
"""Calling startup multiple times should be safe."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
await manager.startup()
|
||||||
|
client1 = manager._default_client
|
||||||
|
|
||||||
|
await manager.startup() # Should not create a new client
|
||||||
|
client2 = manager._default_client
|
||||||
|
|
||||||
|
assert client1 is client2 # Same client instance
|
||||||
|
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_shutdown_idempotent(self):
|
||||||
|
"""Calling shutdown multiple times should be safe."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
await manager.startup()
|
||||||
|
await manager.shutdown()
|
||||||
|
await manager.shutdown() # Should not raise
|
||||||
|
|
||||||
|
assert manager._default_client is None
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_properties_return_configured_values(self):
|
||||||
|
"""Properties should return configured values."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings(
|
||||||
|
http_max_connections=75,
|
||||||
|
http_worker_threads=16,
|
||||||
|
)
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
await manager.startup()
|
||||||
|
|
||||||
|
assert manager.pool_size == 75
|
||||||
|
assert manager.executor_max == 16
|
||||||
|
|
||||||
|
await manager.shutdown()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.unit
|
||||||
|
async def test_active_connections_when_not_started(self):
|
||||||
|
"""active_connections should return 0 when not started."""
|
||||||
|
from app.http_client import HttpClientManager
|
||||||
|
from app.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
manager = HttpClientManager(settings)
|
||||||
|
|
||||||
|
assert manager.active_connections == 0
|
||||||
@@ -228,7 +228,7 @@ minioIngress:
|
|||||||
secretName: minio-tls # Overridden by CI
|
secretName: minio-tls # Overridden by CI
|
||||||
|
|
||||||
redis:
|
redis:
|
||||||
enabled: false
|
enabled: true
|
||||||
|
|
||||||
waitForDatabase: true
|
waitForDatabase: true
|
||||||
|
|
||||||
|
|||||||
@@ -140,7 +140,7 @@ minioIngress:
|
|||||||
enabled: false
|
enabled: false
|
||||||
|
|
||||||
redis:
|
redis:
|
||||||
enabled: false
|
enabled: true
|
||||||
|
|
||||||
waitForDatabase: true
|
waitForDatabase: true
|
||||||
|
|
||||||
|
|||||||
@@ -146,7 +146,7 @@ minioIngress:
|
|||||||
|
|
||||||
# Redis subchart configuration (for future caching)
|
# Redis subchart configuration (for future caching)
|
||||||
redis:
|
redis:
|
||||||
enabled: false
|
enabled: true
|
||||||
image:
|
image:
|
||||||
registry: containers.global.bsf.tools
|
registry: containers.global.bsf.tools
|
||||||
repository: bitnami/redis
|
repository: bitnami/redis
|
||||||
|
|||||||
Reference in New Issue
Block a user