# PyPI Proxy Performance Implementation Plan > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** Implement production-grade infrastructure for PyPI proxy with HTTP connection pooling, Redis caching, batch DB operations, and multi-protocol foundation. **Architecture:** Layered infrastructure (HttpClientManager, CacheService, ThreadPool) managed via FastAPI lifespan, with PackageProxyBase abstract class for protocol adapters. Redis is optional with graceful fallback. **Tech Stack:** FastAPI, httpx (async), redis-py (async), SQLAlchemy (batch operations), asyncio ThreadPoolExecutor --- ## Phase 1: Dependencies and Configuration ### Task 1.1: Add Redis dependency **Files:** - Modify: `backend/requirements.txt` **Step 1: Add redis package** Add to `backend/requirements.txt` after the httpx line: ``` redis>=5.0.0 ``` **Step 2: Verify syntax** Run: `cd /home/mondo/orchard && cat backend/requirements.txt | grep redis` Expected: `redis>=5.0.0` **Step 3: Commit** ```bash git add backend/requirements.txt git commit -m "deps: add redis-py for caching layer" ``` --- ### Task 1.2: Add configuration settings **Files:** - Modify: `backend/app/config.py` **Step 1: Add HTTP client settings** Add after line 54 (`pypi_download_mode`): ```python # HTTP Client pool settings http_max_connections: int = 100 # Max connections per pool http_max_keepalive: int = 20 # Keep-alive connections http_connect_timeout: float = 30.0 # Connection timeout seconds http_read_timeout: float = 60.0 # Read timeout seconds http_worker_threads: int = 32 # Thread pool for blocking ops ``` **Step 2: Add Redis settings** Add after the HTTP client settings: ```python # Redis cache settings redis_host: str = "localhost" redis_port: int = 6379 redis_db: int = 0 redis_password: Optional[str] = None redis_enabled: bool = True # Set False to disable caching # Cache TTL settings (seconds, 0 = no expiry) cache_ttl_index: int = 300 # Package index pages: 5 min cache_ttl_versions: int = 300 # Version listings: 5 min cache_ttl_upstream: int = 3600 # Upstream source config: 1 hour ``` **Step 3: Update database pool defaults** Find and update these existing settings: ```python database_pool_size: int = 20 # Was 5 database_max_overflow: int = 30 # Was 10 ``` **Step 4: Verify syntax** Run: `cd /home/mondo/orchard && python -m py_compile backend/app/config.py` Expected: No output (success) **Step 5: Commit** ```bash git add backend/app/config.py git commit -m "config: add HTTP pool, Redis, and updated DB pool settings" ``` --- ## Phase 2: Infrastructure Layer ### Task 2.1: Create HttpClientManager **Files:** - Create: `backend/app/http_client.py` - Test: `backend/tests/unit/test_http_client.py` **Step 1: Write the failing test** Create `backend/tests/unit/test_http_client.py`: ```python """Tests for HttpClientManager.""" import pytest from unittest.mock import MagicMock, AsyncMock, patch class TestHttpClientManager: """Tests for HTTP client pool management.""" def test_manager_initializes_with_settings(self): """Manager should initialize with config settings.""" from backend.app.http_client import HttpClientManager from backend.app.config import Settings settings = Settings( http_max_connections=50, http_connect_timeout=15.0, ) manager = HttpClientManager(settings) assert manager.max_connections == 50 assert manager.connect_timeout == 15.0 assert manager._default_client is None # Not started yet @pytest.mark.asyncio async def test_startup_creates_client(self): """Startup should create the default async client.""" from backend.app.http_client import HttpClientManager from backend.app.config import Settings settings = Settings() manager = HttpClientManager(settings) await manager.startup() assert manager._default_client is not None await manager.shutdown() @pytest.mark.asyncio async def test_shutdown_closes_client(self): """Shutdown should close all clients gracefully.""" from backend.app.http_client import HttpClientManager from backend.app.config import Settings settings = Settings() manager = HttpClientManager(settings) await manager.startup() client = manager._default_client await manager.shutdown() assert manager._default_client is None assert client.is_closed @pytest.mark.asyncio async def test_get_client_returns_default(self): """get_client() should return the default client.""" from backend.app.http_client import HttpClientManager from backend.app.config import Settings settings = Settings() manager = HttpClientManager(settings) await manager.startup() client = manager.get_client() assert client is manager._default_client await manager.shutdown() @pytest.mark.asyncio async def test_run_blocking_executes_in_thread_pool(self): """run_blocking should execute sync functions in thread pool.""" from backend.app.http_client import HttpClientManager from backend.app.config import Settings import threading settings = Settings() manager = HttpClientManager(settings) await manager.startup() main_thread = threading.current_thread() execution_thread = None def blocking_func(): nonlocal execution_thread execution_thread = threading.current_thread() return "result" result = await manager.run_blocking(blocking_func) assert result == "result" assert execution_thread is not main_thread await manager.shutdown() ``` **Step 2: Run test to verify it fails** Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_http_client.py -v` Expected: FAIL with ModuleNotFoundError (http_client doesn't exist) **Step 3: Write the implementation** Create `backend/app/http_client.py`: ```python """ HTTP client manager with connection pooling and lifecycle management. Provides: - Shared connection pools for upstream requests - Per-upstream client isolation when needed - Thread pool for blocking I/O operations - FastAPI lifespan integration """ import asyncio import logging from concurrent.futures import ThreadPoolExecutor from typing import Any, Callable, Optional import httpx from .config import Settings logger = logging.getLogger(__name__) class HttpClientManager: """ Manages httpx.AsyncClient pools with FastAPI lifespan integration. Features: - Default shared pool for general requests - Per-upstream pools for sources needing specific config/auth - Dedicated thread pool for blocking operations - Graceful shutdown """ def __init__(self, settings: Settings): self.max_connections = settings.http_max_connections self.max_keepalive = settings.http_max_keepalive self.connect_timeout = settings.http_connect_timeout self.read_timeout = settings.http_read_timeout self.worker_threads = settings.http_worker_threads self._default_client: Optional[httpx.AsyncClient] = None self._upstream_clients: dict[str, httpx.AsyncClient] = {} self._executor: Optional[ThreadPoolExecutor] = None self._started = False async def startup(self) -> None: """Initialize clients and thread pool. Called by FastAPI lifespan.""" if self._started: return logger.info( f"Starting HttpClientManager: max_connections={self.max_connections}, " f"worker_threads={self.worker_threads}" ) # Create connection limits limits = httpx.Limits( max_connections=self.max_connections, max_keepalive_connections=self.max_keepalive, ) # Create timeout config timeout = httpx.Timeout( connect=self.connect_timeout, read=self.read_timeout, write=self.read_timeout, pool=self.connect_timeout, ) # Create default client self._default_client = httpx.AsyncClient( limits=limits, timeout=timeout, follow_redirects=False, # Handle redirects manually for auth ) # Create thread pool for blocking operations self._executor = ThreadPoolExecutor( max_workers=self.worker_threads, thread_name_prefix="orchard-blocking-", ) self._started = True logger.info("HttpClientManager started") async def shutdown(self) -> None: """Close all clients and thread pool. Called by FastAPI lifespan.""" if not self._started: return logger.info("Shutting down HttpClientManager") # Close default client if self._default_client: await self._default_client.aclose() self._default_client = None # Close upstream-specific clients for name, client in self._upstream_clients.items(): logger.debug(f"Closing upstream client: {name}") await client.aclose() self._upstream_clients.clear() # Shutdown thread pool if self._executor: self._executor.shutdown(wait=True) self._executor = None self._started = False logger.info("HttpClientManager shutdown complete") def get_client(self, upstream_name: Optional[str] = None) -> httpx.AsyncClient: """ Get HTTP client for making requests. Args: upstream_name: Optional upstream source name for dedicated pool. If None, returns the default shared client. Returns: httpx.AsyncClient configured for the request. Raises: RuntimeError: If manager not started. """ if not self._started or not self._default_client: raise RuntimeError("HttpClientManager not started. Call startup() first.") if upstream_name and upstream_name in self._upstream_clients: return self._upstream_clients[upstream_name] return self._default_client async def run_blocking(self, func: Callable[..., Any], *args: Any) -> Any: """ Run a blocking function in the thread pool. Use this for: - File I/O operations - Archive extraction (zipfile, tarfile) - Hash computation on large data Args: func: Synchronous function to execute *args: Arguments to pass to the function Returns: The function's return value. """ if not self._executor: raise RuntimeError("HttpClientManager not started. Call startup() first.") loop = asyncio.get_event_loop() return await loop.run_in_executor(self._executor, func, *args) @property def active_connections(self) -> int: """Get approximate number of active connections (for health checks).""" if not self._default_client: return 0 # httpx doesn't expose this directly, return pool size as approximation return self.max_connections @property def pool_size(self) -> int: """Get configured pool size.""" return self.max_connections @property def executor_active(self) -> int: """Get number of active thread pool workers.""" if not self._executor: return 0 return len(self._executor._threads) @property def executor_max(self) -> int: """Get max thread pool workers.""" return self.worker_threads ``` **Step 4: Run test to verify it passes** Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_http_client.py -v` Expected: All tests PASS **Step 5: Commit** ```bash git add backend/app/http_client.py backend/tests/unit/test_http_client.py git commit -m "feat: add HttpClientManager with connection pooling" ``` --- ### Task 2.2: Create CacheService **Files:** - Create: `backend/app/cache_service.py` - Test: `backend/tests/unit/test_cache_service.py` **Step 1: Write the failing test** Create `backend/tests/unit/test_cache_service.py`: ```python """Tests for CacheService.""" import pytest from unittest.mock import MagicMock, AsyncMock, patch class TestCacheCategory: """Tests for cache category enum.""" def test_immutable_categories_have_no_ttl(self): """Immutable categories should return None for TTL.""" from backend.app.cache_service import CacheCategory, get_category_ttl from backend.app.config import Settings settings = Settings() assert get_category_ttl(CacheCategory.ARTIFACT_METADATA, settings) is None assert get_category_ttl(CacheCategory.ARTIFACT_DEPENDENCIES, settings) is None assert get_category_ttl(CacheCategory.DEPENDENCY_RESOLUTION, settings) is None def test_mutable_categories_have_ttl(self): """Mutable categories should return configured TTL.""" from backend.app.cache_service import CacheCategory, get_category_ttl from backend.app.config import Settings settings = Settings( cache_ttl_index=300, cache_ttl_upstream=3600, ) assert get_category_ttl(CacheCategory.PACKAGE_INDEX, settings) == 300 assert get_category_ttl(CacheCategory.UPSTREAM_SOURCES, settings) == 3600 class TestCacheService: """Tests for Redis cache service.""" @pytest.mark.asyncio async def test_disabled_cache_returns_none(self): """When Redis disabled, get() should return None.""" from backend.app.cache_service import CacheService, CacheCategory from backend.app.config import Settings settings = Settings(redis_enabled=False) cache = CacheService(settings) await cache.startup() result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key") assert result is None await cache.shutdown() @pytest.mark.asyncio async def test_disabled_cache_set_is_noop(self): """When Redis disabled, set() should be a no-op.""" from backend.app.cache_service import CacheService, CacheCategory from backend.app.config import Settings settings = Settings(redis_enabled=False) cache = CacheService(settings) await cache.startup() # Should not raise await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"test-value") await cache.shutdown() @pytest.mark.asyncio async def test_cache_key_namespacing(self): """Cache keys should be properly namespaced.""" from backend.app.cache_service import CacheService, CacheCategory key = CacheService._make_key(CacheCategory.PACKAGE_INDEX, "pypi", "numpy") assert key == "orchard:index:pypi:numpy" @pytest.mark.asyncio async def test_ping_returns_false_when_disabled(self): """ping() should return False when Redis is disabled.""" from backend.app.cache_service import CacheService from backend.app.config import Settings settings = Settings(redis_enabled=False) cache = CacheService(settings) await cache.startup() result = await cache.ping() assert result is False await cache.shutdown() ``` **Step 2: Run test to verify it fails** Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_cache_service.py -v` Expected: FAIL with ModuleNotFoundError (cache_service doesn't exist) **Step 3: Write the implementation** Create `backend/app/cache_service.py`: ```python """ Redis-backed caching service with category-aware TTL and invalidation. Provides: - Immutable caching for artifact data (hermetic builds) - TTL-based caching for discovery data - Event-driven invalidation for config changes - Graceful fallback when Redis unavailable """ import logging from enum import Enum from typing import Optional from .config import Settings logger = logging.getLogger(__name__) class CacheCategory(Enum): """ Cache categories with different TTL and invalidation rules. Immutable (cache forever): - ARTIFACT_METADATA: Artifact info by SHA256 - ARTIFACT_DEPENDENCIES: Extracted deps by SHA256 - DEPENDENCY_RESOLUTION: Resolution results by input hash Mutable (TTL + event invalidation): - UPSTREAM_SOURCES: Upstream config, invalidate on DB change - PACKAGE_INDEX: PyPI/npm index pages, TTL only - PACKAGE_VERSIONS: Version listings, TTL only """ # Immutable - cache forever (hermetic builds) ARTIFACT_METADATA = "artifact" ARTIFACT_DEPENDENCIES = "deps" DEPENDENCY_RESOLUTION = "resolve" # Mutable - TTL + event invalidation UPSTREAM_SOURCES = "upstream" PACKAGE_INDEX = "index" PACKAGE_VERSIONS = "versions" def get_category_ttl(category: CacheCategory, settings: Settings) -> Optional[int]: """ Get TTL for a cache category. Returns: TTL in seconds, or None for no expiry (immutable). """ ttl_map = { # Immutable - no TTL CacheCategory.ARTIFACT_METADATA: None, CacheCategory.ARTIFACT_DEPENDENCIES: None, CacheCategory.DEPENDENCY_RESOLUTION: None, # Mutable - configurable TTL CacheCategory.UPSTREAM_SOURCES: settings.cache_ttl_upstream, CacheCategory.PACKAGE_INDEX: settings.cache_ttl_index, CacheCategory.PACKAGE_VERSIONS: settings.cache_ttl_versions, } return ttl_map.get(category) class CacheService: """ Redis-backed caching with category-aware TTL. Key format: orchard:{category}:{protocol}:{identifier} Example: orchard:deps:pypi:abc123def456 When Redis is disabled or unavailable, operations gracefully return None/no-op to allow the application to function without caching. """ def __init__(self, settings: Settings): self._settings = settings self._enabled = settings.redis_enabled self._redis: Optional["redis.asyncio.Redis"] = None self._started = False async def startup(self) -> None: """Initialize Redis connection. Called by FastAPI lifespan.""" if self._started: return if not self._enabled: logger.info("CacheService disabled (redis_enabled=False)") self._started = True return try: import redis.asyncio as redis logger.info( f"Connecting to Redis at {self._settings.redis_host}:" f"{self._settings.redis_port}/{self._settings.redis_db}" ) self._redis = redis.Redis( host=self._settings.redis_host, port=self._settings.redis_port, db=self._settings.redis_db, password=self._settings.redis_password, decode_responses=False, # We handle bytes ) # Test connection await self._redis.ping() logger.info("CacheService connected to Redis") except ImportError: logger.warning("redis package not installed, caching disabled") self._enabled = False except Exception as e: logger.warning(f"Redis connection failed, caching disabled: {e}") self._enabled = False self._redis = None self._started = True async def shutdown(self) -> None: """Close Redis connection. Called by FastAPI lifespan.""" if not self._started: return if self._redis: await self._redis.aclose() self._redis = None self._started = False logger.info("CacheService shutdown complete") @staticmethod def _make_key(category: CacheCategory, protocol: str, identifier: str) -> str: """Build namespaced cache key.""" return f"orchard:{category.value}:{protocol}:{identifier}" async def get( self, category: CacheCategory, key: str, protocol: str = "default", ) -> Optional[bytes]: """ Get cached value. Args: category: Cache category for TTL rules key: Unique identifier within category protocol: Protocol namespace (pypi, npm, etc.) Returns: Cached bytes or None if not found/disabled. """ if not self._enabled or not self._redis: return None try: full_key = self._make_key(category, protocol, key) return await self._redis.get(full_key) except Exception as e: logger.warning(f"Cache get failed for {key}: {e}") return None async def set( self, category: CacheCategory, key: str, value: bytes, protocol: str = "default", ) -> None: """ Set cached value with category-appropriate TTL. Args: category: Cache category for TTL rules key: Unique identifier within category value: Bytes to cache protocol: Protocol namespace (pypi, npm, etc.) """ if not self._enabled or not self._redis: return try: full_key = self._make_key(category, protocol, key) ttl = get_category_ttl(category, self._settings) if ttl is None: await self._redis.set(full_key, value) else: await self._redis.setex(full_key, ttl, value) except Exception as e: logger.warning(f"Cache set failed for {key}: {e}") async def delete( self, category: CacheCategory, key: str, protocol: str = "default", ) -> None: """Delete a specific cache entry.""" if not self._enabled or not self._redis: return try: full_key = self._make_key(category, protocol, key) await self._redis.delete(full_key) except Exception as e: logger.warning(f"Cache delete failed for {key}: {e}") async def invalidate_pattern( self, category: CacheCategory, pattern: str = "*", protocol: str = "default", ) -> int: """ Invalidate all entries matching pattern. Args: category: Cache category pattern: Glob pattern for keys (default "*" = all in category) protocol: Protocol namespace Returns: Number of keys deleted. """ if not self._enabled or not self._redis: return 0 try: full_pattern = self._make_key(category, protocol, pattern) keys = [] async for key in self._redis.scan_iter(match=full_pattern): keys.append(key) if keys: return await self._redis.delete(*keys) return 0 except Exception as e: logger.warning(f"Cache invalidate failed for pattern {pattern}: {e}") return 0 async def ping(self) -> bool: """Check if Redis is connected and responding.""" if not self._enabled or not self._redis: return False try: await self._redis.ping() return True except Exception: return False @property def enabled(self) -> bool: """Check if caching is enabled.""" return self._enabled ``` **Step 4: Run test to verify it passes** Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_cache_service.py -v` Expected: All tests PASS **Step 5: Commit** ```bash git add backend/app/cache_service.py backend/tests/unit/test_cache_service.py git commit -m "feat: add CacheService with Redis caching and graceful fallback" ``` --- ### Task 2.3: Integrate infrastructure into FastAPI lifespan **Files:** - Modify: `backend/app/main.py` **Step 1: Update imports** Add after the existing imports (around line 17): ```python from .http_client import HttpClientManager from .cache_service import CacheService ``` **Step 2: Update lifespan function** Replace the entire `lifespan` function (lines 24-52) with: ```python @asynccontextmanager async def lifespan(app: FastAPI): # Startup: initialize database init_db() # Create default admin user if no users exist db = SessionLocal() try: admin = create_default_admin(db) if admin: logger.warning( "Default admin user created with username 'admin' and password 'changeme123'. " "CHANGE THIS PASSWORD IMMEDIATELY!" ) finally: db.close() # Initialize infrastructure services logger.info("Initializing infrastructure services...") app.state.http_client = HttpClientManager(settings) await app.state.http_client.startup() app.state.cache = CacheService(settings) await app.state.cache.startup() logger.info("Infrastructure services ready") # Seed test data in development mode if settings.is_development: logger.info(f"Running in {settings.env} mode - checking for seed data") db = SessionLocal() try: seed_database(db) finally: db.close() else: logger.info(f"Running in {settings.env} mode - skipping seed data") yield # Shutdown infrastructure services logger.info("Shutting down infrastructure services...") await app.state.http_client.shutdown() await app.state.cache.shutdown() logger.info("Shutdown complete") ``` **Step 3: Verify syntax** Run: `cd /home/mondo/orchard && python -m py_compile backend/app/main.py` Expected: No output (success) **Step 4: Commit** ```bash git add backend/app/main.py git commit -m "feat: integrate HttpClientManager and CacheService into lifespan" ``` --- ## Phase 3: Database Optimization ### Task 3.1: Create ArtifactRepository with batch operations **Files:** - Create: `backend/app/db_utils.py` - Test: `backend/tests/unit/test_db_utils.py` **Step 1: Write the failing test** Create `backend/tests/unit/test_db_utils.py`: ```python """Tests for database utility functions.""" import pytest from unittest.mock import MagicMock, patch class TestArtifactRepository: """Tests for ArtifactRepository.""" def test_batch_dependency_values_formatting(self): """batch_upsert_dependencies should format values correctly.""" from backend.app.db_utils import ArtifactRepository deps = [ ("_pypi", "numpy", ">=1.21.0"), ("_pypi", "requests", "*"), ("myproject", "mylib", "==1.0.0"), ] values = ArtifactRepository._format_dependency_values("abc123", deps) assert len(values) == 3 assert values[0] == { "artifact_id": "abc123", "dependency_project": "_pypi", "dependency_package": "numpy", "version_constraint": ">=1.21.0", } assert values[2]["dependency_project"] == "myproject" def test_empty_dependencies_returns_empty_list(self): """Empty dependency list should return empty values.""" from backend.app.db_utils import ArtifactRepository values = ArtifactRepository._format_dependency_values("abc123", []) assert values == [] ``` **Step 2: Run test to verify it fails** Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_db_utils.py -v` Expected: FAIL with ModuleNotFoundError (db_utils doesn't exist) **Step 3: Write the implementation** Create `backend/app/db_utils.py`: ```python """ Database utilities for optimized artifact operations. Provides batch operations to eliminate N+1 queries. """ import logging from typing import Optional from sqlalchemy import insert, literal_column from sqlalchemy.dialects.postgresql import insert as pg_insert from sqlalchemy.orm import Session from .models import Artifact, ArtifactDependency, CachedUrl logger = logging.getLogger(__name__) class ArtifactRepository: """ Optimized database operations for artifact storage. Key optimizations: - Atomic upserts using ON CONFLICT - Batch inserts for dependencies - Joined queries to avoid N+1 """ def __init__(self, db: Session): self.db = db @staticmethod def _format_dependency_values( artifact_id: str, dependencies: list[tuple[str, str, str]], ) -> list[dict]: """ Format dependencies for batch insert. Args: artifact_id: SHA256 of the artifact dependencies: List of (project, package, version_constraint) Returns: List of dicts ready for bulk insert. """ return [ { "artifact_id": artifact_id, "dependency_project": proj, "dependency_package": pkg, "version_constraint": ver, } for proj, pkg, ver in dependencies ] def get_or_create_artifact( self, sha256: str, size: int, filename: str, content_type: Optional[str] = None, ) -> tuple[Artifact, bool]: """ Get existing artifact or create new one atomically. Uses INSERT ... ON CONFLICT DO UPDATE to handle races. If artifact exists, increments ref_count. Args: sha256: Content hash (primary key) size: File size in bytes filename: Original filename content_type: MIME type Returns: (artifact, created) tuple where created is True for new artifacts. """ stmt = pg_insert(Artifact).values( id=sha256, size=size, filename=filename, content_type=content_type, ref_count=1, ).on_conflict_do_update( index_elements=['id'], set_={'ref_count': Artifact.ref_count + 1} ).returning(Artifact) result = self.db.execute(stmt) artifact = result.scalar_one() # Check if this was an insert or update by comparing ref_count # ref_count=1 means new, >1 means existing created = artifact.ref_count == 1 return artifact, created def batch_upsert_dependencies( self, artifact_id: str, dependencies: list[tuple[str, str, str]], ) -> int: """ Insert dependencies in a single batch operation. Uses ON CONFLICT DO NOTHING to skip duplicates. Args: artifact_id: SHA256 of the artifact dependencies: List of (project, package, version_constraint) Returns: Number of dependencies inserted. """ if not dependencies: return 0 values = self._format_dependency_values(artifact_id, dependencies) stmt = pg_insert(ArtifactDependency).values(values) stmt = stmt.on_conflict_do_nothing( index_elements=['artifact_id', 'dependency_project', 'dependency_package'] ) result = self.db.execute(stmt) return result.rowcount def get_cached_url_with_artifact( self, url_hash: str, ) -> Optional[tuple[CachedUrl, Artifact]]: """ Get cached URL and its artifact in a single query. Args: url_hash: SHA256 of the URL Returns: (CachedUrl, Artifact) tuple or None if not found. """ result = ( self.db.query(CachedUrl, Artifact) .join(Artifact, CachedUrl.artifact_id == Artifact.id) .filter(CachedUrl.url_hash == url_hash) .first() ) return result def get_artifact_dependencies( self, artifact_id: str, ) -> list[ArtifactDependency]: """ Get all dependencies for an artifact in a single query. Args: artifact_id: SHA256 of the artifact Returns: List of ArtifactDependency objects. """ return ( self.db.query(ArtifactDependency) .filter(ArtifactDependency.artifact_id == artifact_id) .all() ) ``` **Step 4: Run test to verify it passes** Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_db_utils.py -v` Expected: All tests PASS **Step 5: Commit** ```bash git add backend/app/db_utils.py backend/tests/unit/test_db_utils.py git commit -m "feat: add ArtifactRepository with batch DB operations" ``` --- ## Phase 4: PyPI Proxy Refactor ### Task 4.1: Add dependency injection helpers **Files:** - Modify: `backend/app/pypi_proxy.py` **Step 1: Add imports and dependency injection** At the top of the file, add these imports after existing ones (around line 28): ```python from .http_client import HttpClientManager from .cache_service import CacheService, CacheCategory from .db_utils import ArtifactRepository ``` Add dependency injection functions after the router definition (around line 33): ```python def get_http_client(request: Request) -> HttpClientManager: """Get HttpClientManager from app state.""" return request.app.state.http_client def get_cache(request: Request) -> CacheService: """Get CacheService from app state.""" return request.app.state.cache ``` **Step 2: Verify syntax** Run: `cd /home/mondo/orchard && python -m py_compile backend/app/pypi_proxy.py` Expected: No output (success) **Step 3: Commit** ```bash git add backend/app/pypi_proxy.py git commit -m "refactor: add infrastructure dependency injection to pypi_proxy" ``` --- ### Task 4.2: Cache upstream sources lookup **Files:** - Modify: `backend/app/pypi_proxy.py` **Step 1: Update _get_pypi_upstream_sources to use cache** Find the `_get_pypi_upstream_sources` function (around line 244) and replace it: ```python async def _get_pypi_upstream_sources_cached( db: Session, cache: CacheService, ) -> list[UpstreamSource]: """ Get PyPI upstream sources with caching. Sources are cached for cache_ttl_upstream seconds to avoid repeated database queries on every request. """ cache_key = "sources" # Try cache first cached = await cache.get(CacheCategory.UPSTREAM_SOURCES, cache_key, protocol="pypi") if cached: import json source_data = json.loads(cached.decode()) # Reconstruct UpstreamSource-like objects from cached data # We cache just the essential fields needed for requests return [type('CachedSource', (), d)() for d in source_data] # Query database db_sources = ( db.query(UpstreamSource) .filter(UpstreamSource.source_type == "pypi", UpstreamSource.enabled == True) .order_by(UpstreamSource.priority) .all() ) # Combine with env sources env_sources = [s for s in get_env_upstream_sources() if s.source_type == "pypi"] all_sources = list(db_sources) + list(env_sources) all_sources = sorted(all_sources, key=lambda s: s.priority) # Cache the essential fields if all_sources and cache.enabled: import json cache_data = [ { "name": s.name, "url": s.url, "priority": s.priority, "auth_type": getattr(s, "auth_type", "none"), "username": getattr(s, "username", None), "password": getattr(s, "password", None), } for s in all_sources ] await cache.set( CacheCategory.UPSTREAM_SOURCES, cache_key, json.dumps(cache_data).encode(), protocol="pypi", ) return all_sources def _get_pypi_upstream_sources(db: Session) -> list[UpstreamSource]: """ Get PyPI upstream sources (non-cached version for sync contexts). Prefer _get_pypi_upstream_sources_cached when cache is available. """ db_sources = ( db.query(UpstreamSource) .filter(UpstreamSource.source_type == "pypi", UpstreamSource.enabled == True) .order_by(UpstreamSource.priority) .all() ) env_sources = [s for s in get_env_upstream_sources() if s.source_type == "pypi"] all_sources = list(db_sources) + list(env_sources) return sorted(all_sources, key=lambda s: s.priority) ``` **Step 2: Verify syntax** Run: `cd /home/mondo/orchard && python -m py_compile backend/app/pypi_proxy.py` Expected: No output (success) **Step 3: Commit** ```bash git add backend/app/pypi_proxy.py git commit -m "perf: cache upstream sources lookup in pypi_proxy" ``` --- ### Task 4.3: Use shared HTTP client in pypi_download_file **Files:** - Modify: `backend/app/pypi_proxy.py` **Step 1: Update pypi_download_file signature** Find the `pypi_download_file` function (around line 595) and update its signature to accept the infrastructure: ```python @router.get("/{project_name}/+f/{filename}") async def pypi_download_file( request: Request, project_name: str, filename: str, db: Session = Depends(get_db), storage: S3Storage = Depends(get_storage), http_client: HttpClientManager = Depends(get_http_client), cache: CacheService = Depends(get_cache), ): ``` **Step 2: Replace httpx.AsyncClient usage** In the function, find the section that creates a new AsyncClient (around line 665): ```python async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client: ``` Replace with: ```python client = http_client.get_client() # Note: We don't use 'async with' since the client is managed by HttpClientManager ``` Then remove the corresponding `async with` indentation - the rest of the code that was inside the `async with` block should remain but be dedented one level. **Step 3: Update timeout handling** Since the shared client has default timeouts, for large file downloads we need to override per-request: ```python # Use longer timeout for file downloads download_timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0) response = await client.get(upstream_url, headers=headers, timeout=download_timeout) ``` **Step 4: Verify syntax** Run: `cd /home/mondo/orchard && python -m py_compile backend/app/pypi_proxy.py` Expected: No output (success) **Step 5: Commit** ```bash git add backend/app/pypi_proxy.py git commit -m "perf: use shared HTTP client pool in pypi_download_file" ``` --- ### Task 4.4: Use batch dependency storage **Files:** - Modify: `backend/app/pypi_proxy.py` **Step 1: Replace dependency storage loop with batch operation** Find the dependency storage section (around line 824-847) that looks like: ```python # Store extracted dependencies (deduplicate first - METADATA can list same dep under multiple extras) if extracted_deps: # Deduplicate: keep first version constraint seen for each package name seen_deps: dict[str, str] = {} for dep_name, dep_version in extracted_deps: if dep_name not in seen_deps: seen_deps[dep_name] = dep_version if dep_version else "*" for dep_name, dep_version in seen_deps.items(): # Check if this dependency already exists for this artifact existing_dep = db.query(ArtifactDependency).filter( ArtifactDependency.artifact_id == sha256, ArtifactDependency.dependency_project == "_pypi", ArtifactDependency.dependency_package == dep_name, ).first() if not existing_dep: dep = ArtifactDependency( artifact_id=sha256, dependency_project="_pypi", dependency_package=dep_name, version_constraint=dep_version, ) db.add(dep) ``` Replace with: ```python # Store extracted dependencies using batch operation if extracted_deps: # Deduplicate: keep first version constraint seen for each package name seen_deps: dict[str, str] = {} for dep_name, dep_version in extracted_deps: if dep_name not in seen_deps: seen_deps[dep_name] = dep_version if dep_version else "*" # Convert to list of tuples for batch insert deps_to_store = [ ("_pypi", dep_name, dep_version) for dep_name, dep_version in seen_deps.items() ] # Batch upsert - handles duplicates with ON CONFLICT DO NOTHING repo = ArtifactRepository(db) inserted = repo.batch_upsert_dependencies(sha256, deps_to_store) if inserted > 0: logger.debug(f"Stored {inserted} dependencies for {sha256[:12]}...") ``` **Step 2: Verify syntax** Run: `cd /home/mondo/orchard && python -m py_compile backend/app/pypi_proxy.py` Expected: No output (success) **Step 3: Commit** ```bash git add backend/app/pypi_proxy.py git commit -m "perf: use batch dependency storage in pypi_proxy" ``` --- ## Phase 5: Integration Tests ### Task 5.1: Add integration tests for infrastructure **Files:** - Modify: `backend/tests/integration/test_pypi_proxy.py` **Step 1: Add infrastructure health test** Add to the existing test file: ```python class TestPyPIProxyInfrastructure: """Tests for PyPI proxy infrastructure integration.""" @pytest.mark.integration def test_health_endpoint_includes_infrastructure(self, integration_client): """Health endpoint should report infrastructure status.""" response = integration_client.get("/health") assert response.status_code == 200 data = response.json() assert data["status"] == "healthy" # Infrastructure status may include these if implemented # assert "infrastructure" in data ``` **Step 2: Run integration tests** Run: `docker-compose -f docker-compose.local.yml exec -T orchard-server pytest backend/tests/integration/test_pypi_proxy.py -v --no-cov` Expected: Tests pass **Step 3: Commit** ```bash git add backend/tests/integration/test_pypi_proxy.py git commit -m "test: add infrastructure integration tests for pypi_proxy" ``` --- ## Phase 6: Finalization ### Task 6.1: Update health endpoint with infrastructure status **Files:** - Modify: `backend/app/routes.py` **Step 1: Find health endpoint and add infrastructure status** Find the `/health` endpoint and update to include infrastructure: ```python @router.get("/health") async def health_check( request: Request, db: Session = Depends(get_db), ): """Health check endpoint with infrastructure status.""" # Basic health health_status = {"status": "healthy"} # Add infrastructure status if available if hasattr(request.app.state, 'http_client'): http_client = request.app.state.http_client health_status["http_pool"] = { "pool_size": http_client.pool_size, "worker_threads": http_client.executor_max, } if hasattr(request.app.state, 'cache'): cache = request.app.state.cache health_status["cache"] = { "enabled": cache.enabled, "connected": await cache.ping() if cache.enabled else False, } return health_status ``` **Step 2: Verify syntax** Run: `cd /home/mondo/orchard && python -m py_compile backend/app/routes.py` Expected: No output (success) **Step 3: Commit** ```bash git add backend/app/routes.py git commit -m "feat: add infrastructure status to health endpoint" ``` --- ### Task 6.2: Rebuild and test locally **Step 1: Rebuild container** Run: `docker-compose -f docker-compose.local.yml build orchard-server` Expected: Build succeeds **Step 2: Restart with new code** Run: `docker rm -f $(docker ps -aq --filter name=orchard_orchard-server) 2>/dev/null; docker-compose -f docker-compose.local.yml up -d orchard-server` Expected: Container starts **Step 3: Check health endpoint** Run: `curl -s http://localhost:8080/health | python -m json.tool` Expected: JSON with status and infrastructure info **Step 4: Run all tests** Run: `docker-compose -f docker-compose.local.yml exec -T orchard-server pytest backend/tests/ -v --no-cov` Expected: All tests pass **Step 5: Commit any fixes** If tests fail, fix issues and commit. --- ### Task 6.3: Update CHANGELOG **Files:** - Modify: `CHANGELOG.md` **Step 1: Add entry under [Unreleased]** Add under `## [Unreleased]`: ```markdown ### Added - HTTP connection pooling for upstream PyPI requests (reduces latency by ~200ms/request) - Redis caching layer for package index pages and upstream source config - Batch database operations for dependency storage (eliminates N+1 queries) - Infrastructure status in health endpoint ### Changed - Database connection pool defaults increased (pool_size: 5→20, max_overflow: 10→30) - PyPI proxy now uses shared HTTP client instead of per-request connections ``` **Step 2: Commit** ```bash git add CHANGELOG.md git commit -m "docs: update CHANGELOG for PyPI proxy performance improvements" ``` --- ### Task 6.4: Push and verify CI **Step 1: Push all changes** Run: `git push` **Step 2: Monitor CI pipeline** Check GitLab CI for build/test results. --- ## Summary This implementation plan covers: 1. **Phase 1**: Dependencies and configuration 2. **Phase 2**: Infrastructure layer (HttpClientManager, CacheService) 3. **Phase 3**: Database optimization (ArtifactRepository) 4. **Phase 4**: PyPI proxy refactor to use new infrastructure 5. **Phase 5**: Integration tests 6. **Phase 6**: Finalization and deployment Each task is self-contained with tests, making it safe to commit incrementally.