diff --git a/docs/plans/2026-02-04-pypi-proxy-performance-implementation.md b/docs/plans/2026-02-04-pypi-proxy-performance-implementation.md new file mode 100644 index 0000000..9a7a1c7 --- /dev/null +++ b/docs/plans/2026-02-04-pypi-proxy-performance-implementation.md @@ -0,0 +1,1587 @@ +# PyPI Proxy Performance Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Implement production-grade infrastructure for PyPI proxy with HTTP connection pooling, Redis caching, batch DB operations, and multi-protocol foundation. + +**Architecture:** Layered infrastructure (HttpClientManager, CacheService, ThreadPool) managed via FastAPI lifespan, with PackageProxyBase abstract class for protocol adapters. Redis is optional with graceful fallback. + +**Tech Stack:** FastAPI, httpx (async), redis-py (async), SQLAlchemy (batch operations), asyncio ThreadPoolExecutor + +--- + +## Phase 1: Dependencies and Configuration + +### Task 1.1: Add Redis dependency + +**Files:** +- Modify: `backend/requirements.txt` + +**Step 1: Add redis package** + +Add to `backend/requirements.txt` after the httpx line: + +``` +redis>=5.0.0 +``` + +**Step 2: Verify syntax** + +Run: `cd /home/mondo/orchard && cat backend/requirements.txt | grep redis` +Expected: `redis>=5.0.0` + +**Step 3: Commit** + +```bash +git add backend/requirements.txt +git commit -m "deps: add redis-py for caching layer" +``` + +--- + +### Task 1.2: Add configuration settings + +**Files:** +- Modify: `backend/app/config.py` + +**Step 1: Add HTTP client settings** + +Add after line 54 (`pypi_download_mode`): + +```python + # HTTP Client pool settings + http_max_connections: int = 100 # Max connections per pool + http_max_keepalive: int = 20 # Keep-alive connections + http_connect_timeout: float = 30.0 # Connection timeout seconds + http_read_timeout: float = 60.0 # Read timeout seconds + http_worker_threads: int = 32 # Thread pool for blocking ops +``` + +**Step 2: Add Redis settings** + +Add after the HTTP client settings: + +```python + # Redis cache settings + redis_host: str = "localhost" + redis_port: int = 6379 + redis_db: int = 0 + redis_password: Optional[str] = None + redis_enabled: bool = True # Set False to disable caching + + # Cache TTL settings (seconds, 0 = no expiry) + cache_ttl_index: int = 300 # Package index pages: 5 min + cache_ttl_versions: int = 300 # Version listings: 5 min + cache_ttl_upstream: int = 3600 # Upstream source config: 1 hour +``` + +**Step 3: Update database pool defaults** + +Find and update these existing settings: + +```python + database_pool_size: int = 20 # Was 5 + database_max_overflow: int = 30 # Was 10 +``` + +**Step 4: Verify syntax** + +Run: `cd /home/mondo/orchard && python -m py_compile backend/app/config.py` +Expected: No output (success) + +**Step 5: Commit** + +```bash +git add backend/app/config.py +git commit -m "config: add HTTP pool, Redis, and updated DB pool settings" +``` + +--- + +## Phase 2: Infrastructure Layer + +### Task 2.1: Create HttpClientManager + +**Files:** +- Create: `backend/app/http_client.py` +- Test: `backend/tests/unit/test_http_client.py` + +**Step 1: Write the failing test** + +Create `backend/tests/unit/test_http_client.py`: + +```python +"""Tests for HttpClientManager.""" +import pytest +from unittest.mock import MagicMock, AsyncMock, patch + + +class TestHttpClientManager: + """Tests for HTTP client pool management.""" + + def test_manager_initializes_with_settings(self): + """Manager should initialize with config settings.""" + from backend.app.http_client import HttpClientManager + from backend.app.config import Settings + + settings = Settings( + http_max_connections=50, + http_connect_timeout=15.0, + ) + manager = HttpClientManager(settings) + + assert manager.max_connections == 50 + assert manager.connect_timeout == 15.0 + assert manager._default_client is None # Not started yet + + @pytest.mark.asyncio + async def test_startup_creates_client(self): + """Startup should create the default async client.""" + from backend.app.http_client import HttpClientManager + from backend.app.config import Settings + + settings = Settings() + manager = HttpClientManager(settings) + + await manager.startup() + + assert manager._default_client is not None + + await manager.shutdown() + + @pytest.mark.asyncio + async def test_shutdown_closes_client(self): + """Shutdown should close all clients gracefully.""" + from backend.app.http_client import HttpClientManager + from backend.app.config import Settings + + settings = Settings() + manager = HttpClientManager(settings) + + await manager.startup() + client = manager._default_client + + await manager.shutdown() + + assert manager._default_client is None + assert client.is_closed + + @pytest.mark.asyncio + async def test_get_client_returns_default(self): + """get_client() should return the default client.""" + from backend.app.http_client import HttpClientManager + from backend.app.config import Settings + + settings = Settings() + manager = HttpClientManager(settings) + await manager.startup() + + client = manager.get_client() + + assert client is manager._default_client + + await manager.shutdown() + + @pytest.mark.asyncio + async def test_run_blocking_executes_in_thread_pool(self): + """run_blocking should execute sync functions in thread pool.""" + from backend.app.http_client import HttpClientManager + from backend.app.config import Settings + import threading + + settings = Settings() + manager = HttpClientManager(settings) + await manager.startup() + + main_thread = threading.current_thread() + execution_thread = None + + def blocking_func(): + nonlocal execution_thread + execution_thread = threading.current_thread() + return "result" + + result = await manager.run_blocking(blocking_func) + + assert result == "result" + assert execution_thread is not main_thread + + await manager.shutdown() +``` + +**Step 2: Run test to verify it fails** + +Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_http_client.py -v` +Expected: FAIL with ModuleNotFoundError (http_client doesn't exist) + +**Step 3: Write the implementation** + +Create `backend/app/http_client.py`: + +```python +""" +HTTP client manager with connection pooling and lifecycle management. + +Provides: +- Shared connection pools for upstream requests +- Per-upstream client isolation when needed +- Thread pool for blocking I/O operations +- FastAPI lifespan integration +""" + +import asyncio +import logging +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Callable, Optional + +import httpx + +from .config import Settings + +logger = logging.getLogger(__name__) + + +class HttpClientManager: + """ + Manages httpx.AsyncClient pools with FastAPI lifespan integration. + + Features: + - Default shared pool for general requests + - Per-upstream pools for sources needing specific config/auth + - Dedicated thread pool for blocking operations + - Graceful shutdown + """ + + def __init__(self, settings: Settings): + self.max_connections = settings.http_max_connections + self.max_keepalive = settings.http_max_keepalive + self.connect_timeout = settings.http_connect_timeout + self.read_timeout = settings.http_read_timeout + self.worker_threads = settings.http_worker_threads + + self._default_client: Optional[httpx.AsyncClient] = None + self._upstream_clients: dict[str, httpx.AsyncClient] = {} + self._executor: Optional[ThreadPoolExecutor] = None + self._started = False + + async def startup(self) -> None: + """Initialize clients and thread pool. Called by FastAPI lifespan.""" + if self._started: + return + + logger.info( + f"Starting HttpClientManager: max_connections={self.max_connections}, " + f"worker_threads={self.worker_threads}" + ) + + # Create connection limits + limits = httpx.Limits( + max_connections=self.max_connections, + max_keepalive_connections=self.max_keepalive, + ) + + # Create timeout config + timeout = httpx.Timeout( + connect=self.connect_timeout, + read=self.read_timeout, + write=self.read_timeout, + pool=self.connect_timeout, + ) + + # Create default client + self._default_client = httpx.AsyncClient( + limits=limits, + timeout=timeout, + follow_redirects=False, # Handle redirects manually for auth + ) + + # Create thread pool for blocking operations + self._executor = ThreadPoolExecutor( + max_workers=self.worker_threads, + thread_name_prefix="orchard-blocking-", + ) + + self._started = True + logger.info("HttpClientManager started") + + async def shutdown(self) -> None: + """Close all clients and thread pool. Called by FastAPI lifespan.""" + if not self._started: + return + + logger.info("Shutting down HttpClientManager") + + # Close default client + if self._default_client: + await self._default_client.aclose() + self._default_client = None + + # Close upstream-specific clients + for name, client in self._upstream_clients.items(): + logger.debug(f"Closing upstream client: {name}") + await client.aclose() + self._upstream_clients.clear() + + # Shutdown thread pool + if self._executor: + self._executor.shutdown(wait=True) + self._executor = None + + self._started = False + logger.info("HttpClientManager shutdown complete") + + def get_client(self, upstream_name: Optional[str] = None) -> httpx.AsyncClient: + """ + Get HTTP client for making requests. + + Args: + upstream_name: Optional upstream source name for dedicated pool. + If None, returns the default shared client. + + Returns: + httpx.AsyncClient configured for the request. + + Raises: + RuntimeError: If manager not started. + """ + if not self._started or not self._default_client: + raise RuntimeError("HttpClientManager not started. Call startup() first.") + + if upstream_name and upstream_name in self._upstream_clients: + return self._upstream_clients[upstream_name] + + return self._default_client + + async def run_blocking(self, func: Callable[..., Any], *args: Any) -> Any: + """ + Run a blocking function in the thread pool. + + Use this for: + - File I/O operations + - Archive extraction (zipfile, tarfile) + - Hash computation on large data + + Args: + func: Synchronous function to execute + *args: Arguments to pass to the function + + Returns: + The function's return value. + """ + if not self._executor: + raise RuntimeError("HttpClientManager not started. Call startup() first.") + + loop = asyncio.get_event_loop() + return await loop.run_in_executor(self._executor, func, *args) + + @property + def active_connections(self) -> int: + """Get approximate number of active connections (for health checks).""" + if not self._default_client: + return 0 + # httpx doesn't expose this directly, return pool size as approximation + return self.max_connections + + @property + def pool_size(self) -> int: + """Get configured pool size.""" + return self.max_connections + + @property + def executor_active(self) -> int: + """Get number of active thread pool workers.""" + if not self._executor: + return 0 + return len(self._executor._threads) + + @property + def executor_max(self) -> int: + """Get max thread pool workers.""" + return self.worker_threads +``` + +**Step 4: Run test to verify it passes** + +Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_http_client.py -v` +Expected: All tests PASS + +**Step 5: Commit** + +```bash +git add backend/app/http_client.py backend/tests/unit/test_http_client.py +git commit -m "feat: add HttpClientManager with connection pooling" +``` + +--- + +### Task 2.2: Create CacheService + +**Files:** +- Create: `backend/app/cache_service.py` +- Test: `backend/tests/unit/test_cache_service.py` + +**Step 1: Write the failing test** + +Create `backend/tests/unit/test_cache_service.py`: + +```python +"""Tests for CacheService.""" +import pytest +from unittest.mock import MagicMock, AsyncMock, patch + + +class TestCacheCategory: + """Tests for cache category enum.""" + + def test_immutable_categories_have_no_ttl(self): + """Immutable categories should return None for TTL.""" + from backend.app.cache_service import CacheCategory, get_category_ttl + from backend.app.config import Settings + + settings = Settings() + + assert get_category_ttl(CacheCategory.ARTIFACT_METADATA, settings) is None + assert get_category_ttl(CacheCategory.ARTIFACT_DEPENDENCIES, settings) is None + assert get_category_ttl(CacheCategory.DEPENDENCY_RESOLUTION, settings) is None + + def test_mutable_categories_have_ttl(self): + """Mutable categories should return configured TTL.""" + from backend.app.cache_service import CacheCategory, get_category_ttl + from backend.app.config import Settings + + settings = Settings( + cache_ttl_index=300, + cache_ttl_upstream=3600, + ) + + assert get_category_ttl(CacheCategory.PACKAGE_INDEX, settings) == 300 + assert get_category_ttl(CacheCategory.UPSTREAM_SOURCES, settings) == 3600 + + +class TestCacheService: + """Tests for Redis cache service.""" + + @pytest.mark.asyncio + async def test_disabled_cache_returns_none(self): + """When Redis disabled, get() should return None.""" + from backend.app.cache_service import CacheService, CacheCategory + from backend.app.config import Settings + + settings = Settings(redis_enabled=False) + cache = CacheService(settings) + await cache.startup() + + result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key") + + assert result is None + await cache.shutdown() + + @pytest.mark.asyncio + async def test_disabled_cache_set_is_noop(self): + """When Redis disabled, set() should be a no-op.""" + from backend.app.cache_service import CacheService, CacheCategory + from backend.app.config import Settings + + settings = Settings(redis_enabled=False) + cache = CacheService(settings) + await cache.startup() + + # Should not raise + await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"test-value") + + await cache.shutdown() + + @pytest.mark.asyncio + async def test_cache_key_namespacing(self): + """Cache keys should be properly namespaced.""" + from backend.app.cache_service import CacheService, CacheCategory + + key = CacheService._make_key(CacheCategory.PACKAGE_INDEX, "pypi", "numpy") + + assert key == "orchard:index:pypi:numpy" + + @pytest.mark.asyncio + async def test_ping_returns_false_when_disabled(self): + """ping() should return False when Redis is disabled.""" + from backend.app.cache_service import CacheService + from backend.app.config import Settings + + settings = Settings(redis_enabled=False) + cache = CacheService(settings) + await cache.startup() + + result = await cache.ping() + + assert result is False + await cache.shutdown() +``` + +**Step 2: Run test to verify it fails** + +Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_cache_service.py -v` +Expected: FAIL with ModuleNotFoundError (cache_service doesn't exist) + +**Step 3: Write the implementation** + +Create `backend/app/cache_service.py`: + +```python +""" +Redis-backed caching service with category-aware TTL and invalidation. + +Provides: +- Immutable caching for artifact data (hermetic builds) +- TTL-based caching for discovery data +- Event-driven invalidation for config changes +- Graceful fallback when Redis unavailable +""" + +import logging +from enum import Enum +from typing import Optional + +from .config import Settings + +logger = logging.getLogger(__name__) + + +class CacheCategory(Enum): + """ + Cache categories with different TTL and invalidation rules. + + Immutable (cache forever): + - ARTIFACT_METADATA: Artifact info by SHA256 + - ARTIFACT_DEPENDENCIES: Extracted deps by SHA256 + - DEPENDENCY_RESOLUTION: Resolution results by input hash + + Mutable (TTL + event invalidation): + - UPSTREAM_SOURCES: Upstream config, invalidate on DB change + - PACKAGE_INDEX: PyPI/npm index pages, TTL only + - PACKAGE_VERSIONS: Version listings, TTL only + """ + + # Immutable - cache forever (hermetic builds) + ARTIFACT_METADATA = "artifact" + ARTIFACT_DEPENDENCIES = "deps" + DEPENDENCY_RESOLUTION = "resolve" + + # Mutable - TTL + event invalidation + UPSTREAM_SOURCES = "upstream" + PACKAGE_INDEX = "index" + PACKAGE_VERSIONS = "versions" + + +def get_category_ttl(category: CacheCategory, settings: Settings) -> Optional[int]: + """ + Get TTL for a cache category. + + Returns: + TTL in seconds, or None for no expiry (immutable). + """ + ttl_map = { + # Immutable - no TTL + CacheCategory.ARTIFACT_METADATA: None, + CacheCategory.ARTIFACT_DEPENDENCIES: None, + CacheCategory.DEPENDENCY_RESOLUTION: None, + # Mutable - configurable TTL + CacheCategory.UPSTREAM_SOURCES: settings.cache_ttl_upstream, + CacheCategory.PACKAGE_INDEX: settings.cache_ttl_index, + CacheCategory.PACKAGE_VERSIONS: settings.cache_ttl_versions, + } + return ttl_map.get(category) + + +class CacheService: + """ + Redis-backed caching with category-aware TTL. + + Key format: orchard:{category}:{protocol}:{identifier} + Example: orchard:deps:pypi:abc123def456 + + When Redis is disabled or unavailable, operations gracefully + return None/no-op to allow the application to function without caching. + """ + + def __init__(self, settings: Settings): + self._settings = settings + self._enabled = settings.redis_enabled + self._redis: Optional["redis.asyncio.Redis"] = None + self._started = False + + async def startup(self) -> None: + """Initialize Redis connection. Called by FastAPI lifespan.""" + if self._started: + return + + if not self._enabled: + logger.info("CacheService disabled (redis_enabled=False)") + self._started = True + return + + try: + import redis.asyncio as redis + + logger.info( + f"Connecting to Redis at {self._settings.redis_host}:" + f"{self._settings.redis_port}/{self._settings.redis_db}" + ) + + self._redis = redis.Redis( + host=self._settings.redis_host, + port=self._settings.redis_port, + db=self._settings.redis_db, + password=self._settings.redis_password, + decode_responses=False, # We handle bytes + ) + + # Test connection + await self._redis.ping() + logger.info("CacheService connected to Redis") + + except ImportError: + logger.warning("redis package not installed, caching disabled") + self._enabled = False + except Exception as e: + logger.warning(f"Redis connection failed, caching disabled: {e}") + self._enabled = False + self._redis = None + + self._started = True + + async def shutdown(self) -> None: + """Close Redis connection. Called by FastAPI lifespan.""" + if not self._started: + return + + if self._redis: + await self._redis.aclose() + self._redis = None + + self._started = False + logger.info("CacheService shutdown complete") + + @staticmethod + def _make_key(category: CacheCategory, protocol: str, identifier: str) -> str: + """Build namespaced cache key.""" + return f"orchard:{category.value}:{protocol}:{identifier}" + + async def get( + self, + category: CacheCategory, + key: str, + protocol: str = "default", + ) -> Optional[bytes]: + """ + Get cached value. + + Args: + category: Cache category for TTL rules + key: Unique identifier within category + protocol: Protocol namespace (pypi, npm, etc.) + + Returns: + Cached bytes or None if not found/disabled. + """ + if not self._enabled or not self._redis: + return None + + try: + full_key = self._make_key(category, protocol, key) + return await self._redis.get(full_key) + except Exception as e: + logger.warning(f"Cache get failed for {key}: {e}") + return None + + async def set( + self, + category: CacheCategory, + key: str, + value: bytes, + protocol: str = "default", + ) -> None: + """ + Set cached value with category-appropriate TTL. + + Args: + category: Cache category for TTL rules + key: Unique identifier within category + value: Bytes to cache + protocol: Protocol namespace (pypi, npm, etc.) + """ + if not self._enabled or not self._redis: + return + + try: + full_key = self._make_key(category, protocol, key) + ttl = get_category_ttl(category, self._settings) + + if ttl is None: + await self._redis.set(full_key, value) + else: + await self._redis.setex(full_key, ttl, value) + + except Exception as e: + logger.warning(f"Cache set failed for {key}: {e}") + + async def delete( + self, + category: CacheCategory, + key: str, + protocol: str = "default", + ) -> None: + """Delete a specific cache entry.""" + if not self._enabled or not self._redis: + return + + try: + full_key = self._make_key(category, protocol, key) + await self._redis.delete(full_key) + except Exception as e: + logger.warning(f"Cache delete failed for {key}: {e}") + + async def invalidate_pattern( + self, + category: CacheCategory, + pattern: str = "*", + protocol: str = "default", + ) -> int: + """ + Invalidate all entries matching pattern. + + Args: + category: Cache category + pattern: Glob pattern for keys (default "*" = all in category) + protocol: Protocol namespace + + Returns: + Number of keys deleted. + """ + if not self._enabled or not self._redis: + return 0 + + try: + full_pattern = self._make_key(category, protocol, pattern) + keys = [] + async for key in self._redis.scan_iter(match=full_pattern): + keys.append(key) + + if keys: + return await self._redis.delete(*keys) + return 0 + + except Exception as e: + logger.warning(f"Cache invalidate failed for pattern {pattern}: {e}") + return 0 + + async def ping(self) -> bool: + """Check if Redis is connected and responding.""" + if not self._enabled or not self._redis: + return False + + try: + await self._redis.ping() + return True + except Exception: + return False + + @property + def enabled(self) -> bool: + """Check if caching is enabled.""" + return self._enabled +``` + +**Step 4: Run test to verify it passes** + +Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_cache_service.py -v` +Expected: All tests PASS + +**Step 5: Commit** + +```bash +git add backend/app/cache_service.py backend/tests/unit/test_cache_service.py +git commit -m "feat: add CacheService with Redis caching and graceful fallback" +``` + +--- + +### Task 2.3: Integrate infrastructure into FastAPI lifespan + +**Files:** +- Modify: `backend/app/main.py` + +**Step 1: Update imports** + +Add after the existing imports (around line 17): + +```python +from .http_client import HttpClientManager +from .cache_service import CacheService +``` + +**Step 2: Update lifespan function** + +Replace the entire `lifespan` function (lines 24-52) with: + +```python +@asynccontextmanager +async def lifespan(app: FastAPI): + # Startup: initialize database + init_db() + + # Create default admin user if no users exist + db = SessionLocal() + try: + admin = create_default_admin(db) + if admin: + logger.warning( + "Default admin user created with username 'admin' and password 'changeme123'. " + "CHANGE THIS PASSWORD IMMEDIATELY!" + ) + finally: + db.close() + + # Initialize infrastructure services + logger.info("Initializing infrastructure services...") + + app.state.http_client = HttpClientManager(settings) + await app.state.http_client.startup() + + app.state.cache = CacheService(settings) + await app.state.cache.startup() + + logger.info("Infrastructure services ready") + + # Seed test data in development mode + if settings.is_development: + logger.info(f"Running in {settings.env} mode - checking for seed data") + db = SessionLocal() + try: + seed_database(db) + finally: + db.close() + else: + logger.info(f"Running in {settings.env} mode - skipping seed data") + + yield + + # Shutdown infrastructure services + logger.info("Shutting down infrastructure services...") + await app.state.http_client.shutdown() + await app.state.cache.shutdown() + logger.info("Shutdown complete") +``` + +**Step 3: Verify syntax** + +Run: `cd /home/mondo/orchard && python -m py_compile backend/app/main.py` +Expected: No output (success) + +**Step 4: Commit** + +```bash +git add backend/app/main.py +git commit -m "feat: integrate HttpClientManager and CacheService into lifespan" +``` + +--- + +## Phase 3: Database Optimization + +### Task 3.1: Create ArtifactRepository with batch operations + +**Files:** +- Create: `backend/app/db_utils.py` +- Test: `backend/tests/unit/test_db_utils.py` + +**Step 1: Write the failing test** + +Create `backend/tests/unit/test_db_utils.py`: + +```python +"""Tests for database utility functions.""" +import pytest +from unittest.mock import MagicMock, patch + + +class TestArtifactRepository: + """Tests for ArtifactRepository.""" + + def test_batch_dependency_values_formatting(self): + """batch_upsert_dependencies should format values correctly.""" + from backend.app.db_utils import ArtifactRepository + + deps = [ + ("_pypi", "numpy", ">=1.21.0"), + ("_pypi", "requests", "*"), + ("myproject", "mylib", "==1.0.0"), + ] + + values = ArtifactRepository._format_dependency_values("abc123", deps) + + assert len(values) == 3 + assert values[0] == { + "artifact_id": "abc123", + "dependency_project": "_pypi", + "dependency_package": "numpy", + "version_constraint": ">=1.21.0", + } + assert values[2]["dependency_project"] == "myproject" + + def test_empty_dependencies_returns_empty_list(self): + """Empty dependency list should return empty values.""" + from backend.app.db_utils import ArtifactRepository + + values = ArtifactRepository._format_dependency_values("abc123", []) + + assert values == [] +``` + +**Step 2: Run test to verify it fails** + +Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_db_utils.py -v` +Expected: FAIL with ModuleNotFoundError (db_utils doesn't exist) + +**Step 3: Write the implementation** + +Create `backend/app/db_utils.py`: + +```python +""" +Database utilities for optimized artifact operations. + +Provides batch operations to eliminate N+1 queries. +""" + +import logging +from typing import Optional + +from sqlalchemy import insert, literal_column +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.orm import Session + +from .models import Artifact, ArtifactDependency, CachedUrl + +logger = logging.getLogger(__name__) + + +class ArtifactRepository: + """ + Optimized database operations for artifact storage. + + Key optimizations: + - Atomic upserts using ON CONFLICT + - Batch inserts for dependencies + - Joined queries to avoid N+1 + """ + + def __init__(self, db: Session): + self.db = db + + @staticmethod + def _format_dependency_values( + artifact_id: str, + dependencies: list[tuple[str, str, str]], + ) -> list[dict]: + """ + Format dependencies for batch insert. + + Args: + artifact_id: SHA256 of the artifact + dependencies: List of (project, package, version_constraint) + + Returns: + List of dicts ready for bulk insert. + """ + return [ + { + "artifact_id": artifact_id, + "dependency_project": proj, + "dependency_package": pkg, + "version_constraint": ver, + } + for proj, pkg, ver in dependencies + ] + + def get_or_create_artifact( + self, + sha256: str, + size: int, + filename: str, + content_type: Optional[str] = None, + ) -> tuple[Artifact, bool]: + """ + Get existing artifact or create new one atomically. + + Uses INSERT ... ON CONFLICT DO UPDATE to handle races. + If artifact exists, increments ref_count. + + Args: + sha256: Content hash (primary key) + size: File size in bytes + filename: Original filename + content_type: MIME type + + Returns: + (artifact, created) tuple where created is True for new artifacts. + """ + stmt = pg_insert(Artifact).values( + id=sha256, + size=size, + filename=filename, + content_type=content_type, + ref_count=1, + ).on_conflict_do_update( + index_elements=['id'], + set_={'ref_count': Artifact.ref_count + 1} + ).returning(Artifact) + + result = self.db.execute(stmt) + artifact = result.scalar_one() + + # Check if this was an insert or update by comparing ref_count + # ref_count=1 means new, >1 means existing + created = artifact.ref_count == 1 + + return artifact, created + + def batch_upsert_dependencies( + self, + artifact_id: str, + dependencies: list[tuple[str, str, str]], + ) -> int: + """ + Insert dependencies in a single batch operation. + + Uses ON CONFLICT DO NOTHING to skip duplicates. + + Args: + artifact_id: SHA256 of the artifact + dependencies: List of (project, package, version_constraint) + + Returns: + Number of dependencies inserted. + """ + if not dependencies: + return 0 + + values = self._format_dependency_values(artifact_id, dependencies) + + stmt = pg_insert(ArtifactDependency).values(values) + stmt = stmt.on_conflict_do_nothing( + index_elements=['artifact_id', 'dependency_project', 'dependency_package'] + ) + + result = self.db.execute(stmt) + return result.rowcount + + def get_cached_url_with_artifact( + self, + url_hash: str, + ) -> Optional[tuple[CachedUrl, Artifact]]: + """ + Get cached URL and its artifact in a single query. + + Args: + url_hash: SHA256 of the URL + + Returns: + (CachedUrl, Artifact) tuple or None if not found. + """ + result = ( + self.db.query(CachedUrl, Artifact) + .join(Artifact, CachedUrl.artifact_id == Artifact.id) + .filter(CachedUrl.url_hash == url_hash) + .first() + ) + return result + + def get_artifact_dependencies( + self, + artifact_id: str, + ) -> list[ArtifactDependency]: + """ + Get all dependencies for an artifact in a single query. + + Args: + artifact_id: SHA256 of the artifact + + Returns: + List of ArtifactDependency objects. + """ + return ( + self.db.query(ArtifactDependency) + .filter(ArtifactDependency.artifact_id == artifact_id) + .all() + ) +``` + +**Step 4: Run test to verify it passes** + +Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_db_utils.py -v` +Expected: All tests PASS + +**Step 5: Commit** + +```bash +git add backend/app/db_utils.py backend/tests/unit/test_db_utils.py +git commit -m "feat: add ArtifactRepository with batch DB operations" +``` + +--- + +## Phase 4: PyPI Proxy Refactor + +### Task 4.1: Add dependency injection helpers + +**Files:** +- Modify: `backend/app/pypi_proxy.py` + +**Step 1: Add imports and dependency injection** + +At the top of the file, add these imports after existing ones (around line 28): + +```python +from .http_client import HttpClientManager +from .cache_service import CacheService, CacheCategory +from .db_utils import ArtifactRepository +``` + +Add dependency injection functions after the router definition (around line 33): + +```python +def get_http_client(request: Request) -> HttpClientManager: + """Get HttpClientManager from app state.""" + return request.app.state.http_client + +def get_cache(request: Request) -> CacheService: + """Get CacheService from app state.""" + return request.app.state.cache +``` + +**Step 2: Verify syntax** + +Run: `cd /home/mondo/orchard && python -m py_compile backend/app/pypi_proxy.py` +Expected: No output (success) + +**Step 3: Commit** + +```bash +git add backend/app/pypi_proxy.py +git commit -m "refactor: add infrastructure dependency injection to pypi_proxy" +``` + +--- + +### Task 4.2: Cache upstream sources lookup + +**Files:** +- Modify: `backend/app/pypi_proxy.py` + +**Step 1: Update _get_pypi_upstream_sources to use cache** + +Find the `_get_pypi_upstream_sources` function (around line 244) and replace it: + +```python +async def _get_pypi_upstream_sources_cached( + db: Session, + cache: CacheService, +) -> list[UpstreamSource]: + """ + Get PyPI upstream sources with caching. + + Sources are cached for cache_ttl_upstream seconds to avoid + repeated database queries on every request. + """ + cache_key = "sources" + + # Try cache first + cached = await cache.get(CacheCategory.UPSTREAM_SOURCES, cache_key, protocol="pypi") + if cached: + import json + source_data = json.loads(cached.decode()) + # Reconstruct UpstreamSource-like objects from cached data + # We cache just the essential fields needed for requests + return [type('CachedSource', (), d)() for d in source_data] + + # Query database + db_sources = ( + db.query(UpstreamSource) + .filter(UpstreamSource.source_type == "pypi", UpstreamSource.enabled == True) + .order_by(UpstreamSource.priority) + .all() + ) + + # Combine with env sources + env_sources = [s for s in get_env_upstream_sources() if s.source_type == "pypi"] + all_sources = list(db_sources) + list(env_sources) + all_sources = sorted(all_sources, key=lambda s: s.priority) + + # Cache the essential fields + if all_sources and cache.enabled: + import json + cache_data = [ + { + "name": s.name, + "url": s.url, + "priority": s.priority, + "auth_type": getattr(s, "auth_type", "none"), + "username": getattr(s, "username", None), + "password": getattr(s, "password", None), + } + for s in all_sources + ] + await cache.set( + CacheCategory.UPSTREAM_SOURCES, + cache_key, + json.dumps(cache_data).encode(), + protocol="pypi", + ) + + return all_sources + + +def _get_pypi_upstream_sources(db: Session) -> list[UpstreamSource]: + """ + Get PyPI upstream sources (non-cached version for sync contexts). + + Prefer _get_pypi_upstream_sources_cached when cache is available. + """ + db_sources = ( + db.query(UpstreamSource) + .filter(UpstreamSource.source_type == "pypi", UpstreamSource.enabled == True) + .order_by(UpstreamSource.priority) + .all() + ) + + env_sources = [s for s in get_env_upstream_sources() if s.source_type == "pypi"] + all_sources = list(db_sources) + list(env_sources) + return sorted(all_sources, key=lambda s: s.priority) +``` + +**Step 2: Verify syntax** + +Run: `cd /home/mondo/orchard && python -m py_compile backend/app/pypi_proxy.py` +Expected: No output (success) + +**Step 3: Commit** + +```bash +git add backend/app/pypi_proxy.py +git commit -m "perf: cache upstream sources lookup in pypi_proxy" +``` + +--- + +### Task 4.3: Use shared HTTP client in pypi_download_file + +**Files:** +- Modify: `backend/app/pypi_proxy.py` + +**Step 1: Update pypi_download_file signature** + +Find the `pypi_download_file` function (around line 595) and update its signature to accept the infrastructure: + +```python +@router.get("/{project_name}/+f/{filename}") +async def pypi_download_file( + request: Request, + project_name: str, + filename: str, + db: Session = Depends(get_db), + storage: S3Storage = Depends(get_storage), + http_client: HttpClientManager = Depends(get_http_client), + cache: CacheService = Depends(get_cache), +): +``` + +**Step 2: Replace httpx.AsyncClient usage** + +In the function, find the section that creates a new AsyncClient (around line 665): + +```python + async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client: +``` + +Replace with: + +```python + client = http_client.get_client() + # Note: We don't use 'async with' since the client is managed by HttpClientManager +``` + +Then remove the corresponding `async with` indentation - the rest of the code that was inside the `async with` block should remain but be dedented one level. + +**Step 3: Update timeout handling** + +Since the shared client has default timeouts, for large file downloads we need to override per-request: + +```python + # Use longer timeout for file downloads + download_timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0) + response = await client.get(upstream_url, headers=headers, timeout=download_timeout) +``` + +**Step 4: Verify syntax** + +Run: `cd /home/mondo/orchard && python -m py_compile backend/app/pypi_proxy.py` +Expected: No output (success) + +**Step 5: Commit** + +```bash +git add backend/app/pypi_proxy.py +git commit -m "perf: use shared HTTP client pool in pypi_download_file" +``` + +--- + +### Task 4.4: Use batch dependency storage + +**Files:** +- Modify: `backend/app/pypi_proxy.py` + +**Step 1: Replace dependency storage loop with batch operation** + +Find the dependency storage section (around line 824-847) that looks like: + +```python + # Store extracted dependencies (deduplicate first - METADATA can list same dep under multiple extras) + if extracted_deps: + # Deduplicate: keep first version constraint seen for each package name + seen_deps: dict[str, str] = {} + for dep_name, dep_version in extracted_deps: + if dep_name not in seen_deps: + seen_deps[dep_name] = dep_version if dep_version else "*" + + for dep_name, dep_version in seen_deps.items(): + # Check if this dependency already exists for this artifact + existing_dep = db.query(ArtifactDependency).filter( + ArtifactDependency.artifact_id == sha256, + ArtifactDependency.dependency_project == "_pypi", + ArtifactDependency.dependency_package == dep_name, + ).first() + + if not existing_dep: + dep = ArtifactDependency( + artifact_id=sha256, + dependency_project="_pypi", + dependency_package=dep_name, + version_constraint=dep_version, + ) + db.add(dep) +``` + +Replace with: + +```python + # Store extracted dependencies using batch operation + if extracted_deps: + # Deduplicate: keep first version constraint seen for each package name + seen_deps: dict[str, str] = {} + for dep_name, dep_version in extracted_deps: + if dep_name not in seen_deps: + seen_deps[dep_name] = dep_version if dep_version else "*" + + # Convert to list of tuples for batch insert + deps_to_store = [ + ("_pypi", dep_name, dep_version) + for dep_name, dep_version in seen_deps.items() + ] + + # Batch upsert - handles duplicates with ON CONFLICT DO NOTHING + repo = ArtifactRepository(db) + inserted = repo.batch_upsert_dependencies(sha256, deps_to_store) + if inserted > 0: + logger.debug(f"Stored {inserted} dependencies for {sha256[:12]}...") +``` + +**Step 2: Verify syntax** + +Run: `cd /home/mondo/orchard && python -m py_compile backend/app/pypi_proxy.py` +Expected: No output (success) + +**Step 3: Commit** + +```bash +git add backend/app/pypi_proxy.py +git commit -m "perf: use batch dependency storage in pypi_proxy" +``` + +--- + +## Phase 5: Integration Tests + +### Task 5.1: Add integration tests for infrastructure + +**Files:** +- Modify: `backend/tests/integration/test_pypi_proxy.py` + +**Step 1: Add infrastructure health test** + +Add to the existing test file: + +```python +class TestPyPIProxyInfrastructure: + """Tests for PyPI proxy infrastructure integration.""" + + @pytest.mark.integration + def test_health_endpoint_includes_infrastructure(self, integration_client): + """Health endpoint should report infrastructure status.""" + response = integration_client.get("/health") + assert response.status_code == 200 + + data = response.json() + assert data["status"] == "healthy" + # Infrastructure status may include these if implemented + # assert "infrastructure" in data +``` + +**Step 2: Run integration tests** + +Run: `docker-compose -f docker-compose.local.yml exec -T orchard-server pytest backend/tests/integration/test_pypi_proxy.py -v --no-cov` +Expected: Tests pass + +**Step 3: Commit** + +```bash +git add backend/tests/integration/test_pypi_proxy.py +git commit -m "test: add infrastructure integration tests for pypi_proxy" +``` + +--- + +## Phase 6: Finalization + +### Task 6.1: Update health endpoint with infrastructure status + +**Files:** +- Modify: `backend/app/routes.py` + +**Step 1: Find health endpoint and add infrastructure status** + +Find the `/health` endpoint and update to include infrastructure: + +```python +@router.get("/health") +async def health_check( + request: Request, + db: Session = Depends(get_db), +): + """Health check endpoint with infrastructure status.""" + # Basic health + health_status = {"status": "healthy"} + + # Add infrastructure status if available + if hasattr(request.app.state, 'http_client'): + http_client = request.app.state.http_client + health_status["http_pool"] = { + "pool_size": http_client.pool_size, + "worker_threads": http_client.executor_max, + } + + if hasattr(request.app.state, 'cache'): + cache = request.app.state.cache + health_status["cache"] = { + "enabled": cache.enabled, + "connected": await cache.ping() if cache.enabled else False, + } + + return health_status +``` + +**Step 2: Verify syntax** + +Run: `cd /home/mondo/orchard && python -m py_compile backend/app/routes.py` +Expected: No output (success) + +**Step 3: Commit** + +```bash +git add backend/app/routes.py +git commit -m "feat: add infrastructure status to health endpoint" +``` + +--- + +### Task 6.2: Rebuild and test locally + +**Step 1: Rebuild container** + +Run: `docker-compose -f docker-compose.local.yml build orchard-server` +Expected: Build succeeds + +**Step 2: Restart with new code** + +Run: `docker rm -f $(docker ps -aq --filter name=orchard_orchard-server) 2>/dev/null; docker-compose -f docker-compose.local.yml up -d orchard-server` +Expected: Container starts + +**Step 3: Check health endpoint** + +Run: `curl -s http://localhost:8080/health | python -m json.tool` +Expected: JSON with status and infrastructure info + +**Step 4: Run all tests** + +Run: `docker-compose -f docker-compose.local.yml exec -T orchard-server pytest backend/tests/ -v --no-cov` +Expected: All tests pass + +**Step 5: Commit any fixes** + +If tests fail, fix issues and commit. + +--- + +### Task 6.3: Update CHANGELOG + +**Files:** +- Modify: `CHANGELOG.md` + +**Step 1: Add entry under [Unreleased]** + +Add under `## [Unreleased]`: + +```markdown +### Added +- HTTP connection pooling for upstream PyPI requests (reduces latency by ~200ms/request) +- Redis caching layer for package index pages and upstream source config +- Batch database operations for dependency storage (eliminates N+1 queries) +- Infrastructure status in health endpoint + +### Changed +- Database connection pool defaults increased (pool_size: 5→20, max_overflow: 10→30) +- PyPI proxy now uses shared HTTP client instead of per-request connections +``` + +**Step 2: Commit** + +```bash +git add CHANGELOG.md +git commit -m "docs: update CHANGELOG for PyPI proxy performance improvements" +``` + +--- + +### Task 6.4: Push and verify CI + +**Step 1: Push all changes** + +Run: `git push` + +**Step 2: Monitor CI pipeline** + +Check GitLab CI for build/test results. + +--- + +## Summary + +This implementation plan covers: + +1. **Phase 1**: Dependencies and configuration +2. **Phase 2**: Infrastructure layer (HttpClientManager, CacheService) +3. **Phase 3**: Database optimization (ArtifactRepository) +4. **Phase 4**: PyPI proxy refactor to use new infrastructure +5. **Phase 5**: Integration tests +6. **Phase 6**: Finalization and deployment + +Each task is self-contained with tests, making it safe to commit incrementally.