Files
orchard/docs/plans/2026-02-04-pypi-proxy-performance-implementation.md

1588 lines
45 KiB
Markdown

# PyPI Proxy Performance Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Implement production-grade infrastructure for PyPI proxy with HTTP connection pooling, Redis caching, batch DB operations, and multi-protocol foundation.
**Architecture:** Layered infrastructure (HttpClientManager, CacheService, ThreadPool) managed via FastAPI lifespan, with PackageProxyBase abstract class for protocol adapters. Redis is optional with graceful fallback.
**Tech Stack:** FastAPI, httpx (async), redis-py (async), SQLAlchemy (batch operations), asyncio ThreadPoolExecutor
---
## Phase 1: Dependencies and Configuration
### Task 1.1: Add Redis dependency
**Files:**
- Modify: `backend/requirements.txt`
**Step 1: Add redis package**
Add to `backend/requirements.txt` after the httpx line:
```
redis>=5.0.0
```
**Step 2: Verify syntax**
Run: `cd /home/mondo/orchard && cat backend/requirements.txt | grep redis`
Expected: `redis>=5.0.0`
**Step 3: Commit**
```bash
git add backend/requirements.txt
git commit -m "deps: add redis-py for caching layer"
```
---
### Task 1.2: Add configuration settings
**Files:**
- Modify: `backend/app/config.py`
**Step 1: Add HTTP client settings**
Add after line 54 (`pypi_download_mode`):
```python
# HTTP Client pool settings
http_max_connections: int = 100 # Max connections per pool
http_max_keepalive: int = 20 # Keep-alive connections
http_connect_timeout: float = 30.0 # Connection timeout seconds
http_read_timeout: float = 60.0 # Read timeout seconds
http_worker_threads: int = 32 # Thread pool for blocking ops
```
**Step 2: Add Redis settings**
Add after the HTTP client settings:
```python
# Redis cache settings
redis_host: str = "localhost"
redis_port: int = 6379
redis_db: int = 0
redis_password: Optional[str] = None
redis_enabled: bool = True # Set False to disable caching
# Cache TTL settings (seconds, 0 = no expiry)
cache_ttl_index: int = 300 # Package index pages: 5 min
cache_ttl_versions: int = 300 # Version listings: 5 min
cache_ttl_upstream: int = 3600 # Upstream source config: 1 hour
```
**Step 3: Update database pool defaults**
Find and update these existing settings:
```python
database_pool_size: int = 20 # Was 5
database_max_overflow: int = 30 # Was 10
```
**Step 4: Verify syntax**
Run: `cd /home/mondo/orchard && python -m py_compile backend/app/config.py`
Expected: No output (success)
**Step 5: Commit**
```bash
git add backend/app/config.py
git commit -m "config: add HTTP pool, Redis, and updated DB pool settings"
```
---
## Phase 2: Infrastructure Layer
### Task 2.1: Create HttpClientManager
**Files:**
- Create: `backend/app/http_client.py`
- Test: `backend/tests/unit/test_http_client.py`
**Step 1: Write the failing test**
Create `backend/tests/unit/test_http_client.py`:
```python
"""Tests for HttpClientManager."""
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
class TestHttpClientManager:
"""Tests for HTTP client pool management."""
def test_manager_initializes_with_settings(self):
"""Manager should initialize with config settings."""
from backend.app.http_client import HttpClientManager
from backend.app.config import Settings
settings = Settings(
http_max_connections=50,
http_connect_timeout=15.0,
)
manager = HttpClientManager(settings)
assert manager.max_connections == 50
assert manager.connect_timeout == 15.0
assert manager._default_client is None # Not started yet
@pytest.mark.asyncio
async def test_startup_creates_client(self):
"""Startup should create the default async client."""
from backend.app.http_client import HttpClientManager
from backend.app.config import Settings
settings = Settings()
manager = HttpClientManager(settings)
await manager.startup()
assert manager._default_client is not None
await manager.shutdown()
@pytest.mark.asyncio
async def test_shutdown_closes_client(self):
"""Shutdown should close all clients gracefully."""
from backend.app.http_client import HttpClientManager
from backend.app.config import Settings
settings = Settings()
manager = HttpClientManager(settings)
await manager.startup()
client = manager._default_client
await manager.shutdown()
assert manager._default_client is None
assert client.is_closed
@pytest.mark.asyncio
async def test_get_client_returns_default(self):
"""get_client() should return the default client."""
from backend.app.http_client import HttpClientManager
from backend.app.config import Settings
settings = Settings()
manager = HttpClientManager(settings)
await manager.startup()
client = manager.get_client()
assert client is manager._default_client
await manager.shutdown()
@pytest.mark.asyncio
async def test_run_blocking_executes_in_thread_pool(self):
"""run_blocking should execute sync functions in thread pool."""
from backend.app.http_client import HttpClientManager
from backend.app.config import Settings
import threading
settings = Settings()
manager = HttpClientManager(settings)
await manager.startup()
main_thread = threading.current_thread()
execution_thread = None
def blocking_func():
nonlocal execution_thread
execution_thread = threading.current_thread()
return "result"
result = await manager.run_blocking(blocking_func)
assert result == "result"
assert execution_thread is not main_thread
await manager.shutdown()
```
**Step 2: Run test to verify it fails**
Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_http_client.py -v`
Expected: FAIL with ModuleNotFoundError (http_client doesn't exist)
**Step 3: Write the implementation**
Create `backend/app/http_client.py`:
```python
"""
HTTP client manager with connection pooling and lifecycle management.
Provides:
- Shared connection pools for upstream requests
- Per-upstream client isolation when needed
- Thread pool for blocking I/O operations
- FastAPI lifespan integration
"""
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from typing import Any, Callable, Optional
import httpx
from .config import Settings
logger = logging.getLogger(__name__)
class HttpClientManager:
"""
Manages httpx.AsyncClient pools with FastAPI lifespan integration.
Features:
- Default shared pool for general requests
- Per-upstream pools for sources needing specific config/auth
- Dedicated thread pool for blocking operations
- Graceful shutdown
"""
def __init__(self, settings: Settings):
self.max_connections = settings.http_max_connections
self.max_keepalive = settings.http_max_keepalive
self.connect_timeout = settings.http_connect_timeout
self.read_timeout = settings.http_read_timeout
self.worker_threads = settings.http_worker_threads
self._default_client: Optional[httpx.AsyncClient] = None
self._upstream_clients: dict[str, httpx.AsyncClient] = {}
self._executor: Optional[ThreadPoolExecutor] = None
self._started = False
async def startup(self) -> None:
"""Initialize clients and thread pool. Called by FastAPI lifespan."""
if self._started:
return
logger.info(
f"Starting HttpClientManager: max_connections={self.max_connections}, "
f"worker_threads={self.worker_threads}"
)
# Create connection limits
limits = httpx.Limits(
max_connections=self.max_connections,
max_keepalive_connections=self.max_keepalive,
)
# Create timeout config
timeout = httpx.Timeout(
connect=self.connect_timeout,
read=self.read_timeout,
write=self.read_timeout,
pool=self.connect_timeout,
)
# Create default client
self._default_client = httpx.AsyncClient(
limits=limits,
timeout=timeout,
follow_redirects=False, # Handle redirects manually for auth
)
# Create thread pool for blocking operations
self._executor = ThreadPoolExecutor(
max_workers=self.worker_threads,
thread_name_prefix="orchard-blocking-",
)
self._started = True
logger.info("HttpClientManager started")
async def shutdown(self) -> None:
"""Close all clients and thread pool. Called by FastAPI lifespan."""
if not self._started:
return
logger.info("Shutting down HttpClientManager")
# Close default client
if self._default_client:
await self._default_client.aclose()
self._default_client = None
# Close upstream-specific clients
for name, client in self._upstream_clients.items():
logger.debug(f"Closing upstream client: {name}")
await client.aclose()
self._upstream_clients.clear()
# Shutdown thread pool
if self._executor:
self._executor.shutdown(wait=True)
self._executor = None
self._started = False
logger.info("HttpClientManager shutdown complete")
def get_client(self, upstream_name: Optional[str] = None) -> httpx.AsyncClient:
"""
Get HTTP client for making requests.
Args:
upstream_name: Optional upstream source name for dedicated pool.
If None, returns the default shared client.
Returns:
httpx.AsyncClient configured for the request.
Raises:
RuntimeError: If manager not started.
"""
if not self._started or not self._default_client:
raise RuntimeError("HttpClientManager not started. Call startup() first.")
if upstream_name and upstream_name in self._upstream_clients:
return self._upstream_clients[upstream_name]
return self._default_client
async def run_blocking(self, func: Callable[..., Any], *args: Any) -> Any:
"""
Run a blocking function in the thread pool.
Use this for:
- File I/O operations
- Archive extraction (zipfile, tarfile)
- Hash computation on large data
Args:
func: Synchronous function to execute
*args: Arguments to pass to the function
Returns:
The function's return value.
"""
if not self._executor:
raise RuntimeError("HttpClientManager not started. Call startup() first.")
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self._executor, func, *args)
@property
def active_connections(self) -> int:
"""Get approximate number of active connections (for health checks)."""
if not self._default_client:
return 0
# httpx doesn't expose this directly, return pool size as approximation
return self.max_connections
@property
def pool_size(self) -> int:
"""Get configured pool size."""
return self.max_connections
@property
def executor_active(self) -> int:
"""Get number of active thread pool workers."""
if not self._executor:
return 0
return len(self._executor._threads)
@property
def executor_max(self) -> int:
"""Get max thread pool workers."""
return self.worker_threads
```
**Step 4: Run test to verify it passes**
Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_http_client.py -v`
Expected: All tests PASS
**Step 5: Commit**
```bash
git add backend/app/http_client.py backend/tests/unit/test_http_client.py
git commit -m "feat: add HttpClientManager with connection pooling"
```
---
### Task 2.2: Create CacheService
**Files:**
- Create: `backend/app/cache_service.py`
- Test: `backend/tests/unit/test_cache_service.py`
**Step 1: Write the failing test**
Create `backend/tests/unit/test_cache_service.py`:
```python
"""Tests for CacheService."""
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
class TestCacheCategory:
"""Tests for cache category enum."""
def test_immutable_categories_have_no_ttl(self):
"""Immutable categories should return None for TTL."""
from backend.app.cache_service import CacheCategory, get_category_ttl
from backend.app.config import Settings
settings = Settings()
assert get_category_ttl(CacheCategory.ARTIFACT_METADATA, settings) is None
assert get_category_ttl(CacheCategory.ARTIFACT_DEPENDENCIES, settings) is None
assert get_category_ttl(CacheCategory.DEPENDENCY_RESOLUTION, settings) is None
def test_mutable_categories_have_ttl(self):
"""Mutable categories should return configured TTL."""
from backend.app.cache_service import CacheCategory, get_category_ttl
from backend.app.config import Settings
settings = Settings(
cache_ttl_index=300,
cache_ttl_upstream=3600,
)
assert get_category_ttl(CacheCategory.PACKAGE_INDEX, settings) == 300
assert get_category_ttl(CacheCategory.UPSTREAM_SOURCES, settings) == 3600
class TestCacheService:
"""Tests for Redis cache service."""
@pytest.mark.asyncio
async def test_disabled_cache_returns_none(self):
"""When Redis disabled, get() should return None."""
from backend.app.cache_service import CacheService, CacheCategory
from backend.app.config import Settings
settings = Settings(redis_enabled=False)
cache = CacheService(settings)
await cache.startup()
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key")
assert result is None
await cache.shutdown()
@pytest.mark.asyncio
async def test_disabled_cache_set_is_noop(self):
"""When Redis disabled, set() should be a no-op."""
from backend.app.cache_service import CacheService, CacheCategory
from backend.app.config import Settings
settings = Settings(redis_enabled=False)
cache = CacheService(settings)
await cache.startup()
# Should not raise
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"test-value")
await cache.shutdown()
@pytest.mark.asyncio
async def test_cache_key_namespacing(self):
"""Cache keys should be properly namespaced."""
from backend.app.cache_service import CacheService, CacheCategory
key = CacheService._make_key(CacheCategory.PACKAGE_INDEX, "pypi", "numpy")
assert key == "orchard:index:pypi:numpy"
@pytest.mark.asyncio
async def test_ping_returns_false_when_disabled(self):
"""ping() should return False when Redis is disabled."""
from backend.app.cache_service import CacheService
from backend.app.config import Settings
settings = Settings(redis_enabled=False)
cache = CacheService(settings)
await cache.startup()
result = await cache.ping()
assert result is False
await cache.shutdown()
```
**Step 2: Run test to verify it fails**
Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_cache_service.py -v`
Expected: FAIL with ModuleNotFoundError (cache_service doesn't exist)
**Step 3: Write the implementation**
Create `backend/app/cache_service.py`:
```python
"""
Redis-backed caching service with category-aware TTL and invalidation.
Provides:
- Immutable caching for artifact data (hermetic builds)
- TTL-based caching for discovery data
- Event-driven invalidation for config changes
- Graceful fallback when Redis unavailable
"""
import logging
from enum import Enum
from typing import Optional
from .config import Settings
logger = logging.getLogger(__name__)
class CacheCategory(Enum):
"""
Cache categories with different TTL and invalidation rules.
Immutable (cache forever):
- ARTIFACT_METADATA: Artifact info by SHA256
- ARTIFACT_DEPENDENCIES: Extracted deps by SHA256
- DEPENDENCY_RESOLUTION: Resolution results by input hash
Mutable (TTL + event invalidation):
- UPSTREAM_SOURCES: Upstream config, invalidate on DB change
- PACKAGE_INDEX: PyPI/npm index pages, TTL only
- PACKAGE_VERSIONS: Version listings, TTL only
"""
# Immutable - cache forever (hermetic builds)
ARTIFACT_METADATA = "artifact"
ARTIFACT_DEPENDENCIES = "deps"
DEPENDENCY_RESOLUTION = "resolve"
# Mutable - TTL + event invalidation
UPSTREAM_SOURCES = "upstream"
PACKAGE_INDEX = "index"
PACKAGE_VERSIONS = "versions"
def get_category_ttl(category: CacheCategory, settings: Settings) -> Optional[int]:
"""
Get TTL for a cache category.
Returns:
TTL in seconds, or None for no expiry (immutable).
"""
ttl_map = {
# Immutable - no TTL
CacheCategory.ARTIFACT_METADATA: None,
CacheCategory.ARTIFACT_DEPENDENCIES: None,
CacheCategory.DEPENDENCY_RESOLUTION: None,
# Mutable - configurable TTL
CacheCategory.UPSTREAM_SOURCES: settings.cache_ttl_upstream,
CacheCategory.PACKAGE_INDEX: settings.cache_ttl_index,
CacheCategory.PACKAGE_VERSIONS: settings.cache_ttl_versions,
}
return ttl_map.get(category)
class CacheService:
"""
Redis-backed caching with category-aware TTL.
Key format: orchard:{category}:{protocol}:{identifier}
Example: orchard:deps:pypi:abc123def456
When Redis is disabled or unavailable, operations gracefully
return None/no-op to allow the application to function without caching.
"""
def __init__(self, settings: Settings):
self._settings = settings
self._enabled = settings.redis_enabled
self._redis: Optional["redis.asyncio.Redis"] = None
self._started = False
async def startup(self) -> None:
"""Initialize Redis connection. Called by FastAPI lifespan."""
if self._started:
return
if not self._enabled:
logger.info("CacheService disabled (redis_enabled=False)")
self._started = True
return
try:
import redis.asyncio as redis
logger.info(
f"Connecting to Redis at {self._settings.redis_host}:"
f"{self._settings.redis_port}/{self._settings.redis_db}"
)
self._redis = redis.Redis(
host=self._settings.redis_host,
port=self._settings.redis_port,
db=self._settings.redis_db,
password=self._settings.redis_password,
decode_responses=False, # We handle bytes
)
# Test connection
await self._redis.ping()
logger.info("CacheService connected to Redis")
except ImportError:
logger.warning("redis package not installed, caching disabled")
self._enabled = False
except Exception as e:
logger.warning(f"Redis connection failed, caching disabled: {e}")
self._enabled = False
self._redis = None
self._started = True
async def shutdown(self) -> None:
"""Close Redis connection. Called by FastAPI lifespan."""
if not self._started:
return
if self._redis:
await self._redis.aclose()
self._redis = None
self._started = False
logger.info("CacheService shutdown complete")
@staticmethod
def _make_key(category: CacheCategory, protocol: str, identifier: str) -> str:
"""Build namespaced cache key."""
return f"orchard:{category.value}:{protocol}:{identifier}"
async def get(
self,
category: CacheCategory,
key: str,
protocol: str = "default",
) -> Optional[bytes]:
"""
Get cached value.
Args:
category: Cache category for TTL rules
key: Unique identifier within category
protocol: Protocol namespace (pypi, npm, etc.)
Returns:
Cached bytes or None if not found/disabled.
"""
if not self._enabled or not self._redis:
return None
try:
full_key = self._make_key(category, protocol, key)
return await self._redis.get(full_key)
except Exception as e:
logger.warning(f"Cache get failed for {key}: {e}")
return None
async def set(
self,
category: CacheCategory,
key: str,
value: bytes,
protocol: str = "default",
) -> None:
"""
Set cached value with category-appropriate TTL.
Args:
category: Cache category for TTL rules
key: Unique identifier within category
value: Bytes to cache
protocol: Protocol namespace (pypi, npm, etc.)
"""
if not self._enabled or not self._redis:
return
try:
full_key = self._make_key(category, protocol, key)
ttl = get_category_ttl(category, self._settings)
if ttl is None:
await self._redis.set(full_key, value)
else:
await self._redis.setex(full_key, ttl, value)
except Exception as e:
logger.warning(f"Cache set failed for {key}: {e}")
async def delete(
self,
category: CacheCategory,
key: str,
protocol: str = "default",
) -> None:
"""Delete a specific cache entry."""
if not self._enabled or not self._redis:
return
try:
full_key = self._make_key(category, protocol, key)
await self._redis.delete(full_key)
except Exception as e:
logger.warning(f"Cache delete failed for {key}: {e}")
async def invalidate_pattern(
self,
category: CacheCategory,
pattern: str = "*",
protocol: str = "default",
) -> int:
"""
Invalidate all entries matching pattern.
Args:
category: Cache category
pattern: Glob pattern for keys (default "*" = all in category)
protocol: Protocol namespace
Returns:
Number of keys deleted.
"""
if not self._enabled or not self._redis:
return 0
try:
full_pattern = self._make_key(category, protocol, pattern)
keys = []
async for key in self._redis.scan_iter(match=full_pattern):
keys.append(key)
if keys:
return await self._redis.delete(*keys)
return 0
except Exception as e:
logger.warning(f"Cache invalidate failed for pattern {pattern}: {e}")
return 0
async def ping(self) -> bool:
"""Check if Redis is connected and responding."""
if not self._enabled or not self._redis:
return False
try:
await self._redis.ping()
return True
except Exception:
return False
@property
def enabled(self) -> bool:
"""Check if caching is enabled."""
return self._enabled
```
**Step 4: Run test to verify it passes**
Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_cache_service.py -v`
Expected: All tests PASS
**Step 5: Commit**
```bash
git add backend/app/cache_service.py backend/tests/unit/test_cache_service.py
git commit -m "feat: add CacheService with Redis caching and graceful fallback"
```
---
### Task 2.3: Integrate infrastructure into FastAPI lifespan
**Files:**
- Modify: `backend/app/main.py`
**Step 1: Update imports**
Add after the existing imports (around line 17):
```python
from .http_client import HttpClientManager
from .cache_service import CacheService
```
**Step 2: Update lifespan function**
Replace the entire `lifespan` function (lines 24-52) with:
```python
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: initialize database
init_db()
# Create default admin user if no users exist
db = SessionLocal()
try:
admin = create_default_admin(db)
if admin:
logger.warning(
"Default admin user created with username 'admin' and password 'changeme123'. "
"CHANGE THIS PASSWORD IMMEDIATELY!"
)
finally:
db.close()
# Initialize infrastructure services
logger.info("Initializing infrastructure services...")
app.state.http_client = HttpClientManager(settings)
await app.state.http_client.startup()
app.state.cache = CacheService(settings)
await app.state.cache.startup()
logger.info("Infrastructure services ready")
# Seed test data in development mode
if settings.is_development:
logger.info(f"Running in {settings.env} mode - checking for seed data")
db = SessionLocal()
try:
seed_database(db)
finally:
db.close()
else:
logger.info(f"Running in {settings.env} mode - skipping seed data")
yield
# Shutdown infrastructure services
logger.info("Shutting down infrastructure services...")
await app.state.http_client.shutdown()
await app.state.cache.shutdown()
logger.info("Shutdown complete")
```
**Step 3: Verify syntax**
Run: `cd /home/mondo/orchard && python -m py_compile backend/app/main.py`
Expected: No output (success)
**Step 4: Commit**
```bash
git add backend/app/main.py
git commit -m "feat: integrate HttpClientManager and CacheService into lifespan"
```
---
## Phase 3: Database Optimization
### Task 3.1: Create ArtifactRepository with batch operations
**Files:**
- Create: `backend/app/db_utils.py`
- Test: `backend/tests/unit/test_db_utils.py`
**Step 1: Write the failing test**
Create `backend/tests/unit/test_db_utils.py`:
```python
"""Tests for database utility functions."""
import pytest
from unittest.mock import MagicMock, patch
class TestArtifactRepository:
"""Tests for ArtifactRepository."""
def test_batch_dependency_values_formatting(self):
"""batch_upsert_dependencies should format values correctly."""
from backend.app.db_utils import ArtifactRepository
deps = [
("_pypi", "numpy", ">=1.21.0"),
("_pypi", "requests", "*"),
("myproject", "mylib", "==1.0.0"),
]
values = ArtifactRepository._format_dependency_values("abc123", deps)
assert len(values) == 3
assert values[0] == {
"artifact_id": "abc123",
"dependency_project": "_pypi",
"dependency_package": "numpy",
"version_constraint": ">=1.21.0",
}
assert values[2]["dependency_project"] == "myproject"
def test_empty_dependencies_returns_empty_list(self):
"""Empty dependency list should return empty values."""
from backend.app.db_utils import ArtifactRepository
values = ArtifactRepository._format_dependency_values("abc123", [])
assert values == []
```
**Step 2: Run test to verify it fails**
Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_db_utils.py -v`
Expected: FAIL with ModuleNotFoundError (db_utils doesn't exist)
**Step 3: Write the implementation**
Create `backend/app/db_utils.py`:
```python
"""
Database utilities for optimized artifact operations.
Provides batch operations to eliminate N+1 queries.
"""
import logging
from typing import Optional
from sqlalchemy import insert, literal_column
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.orm import Session
from .models import Artifact, ArtifactDependency, CachedUrl
logger = logging.getLogger(__name__)
class ArtifactRepository:
"""
Optimized database operations for artifact storage.
Key optimizations:
- Atomic upserts using ON CONFLICT
- Batch inserts for dependencies
- Joined queries to avoid N+1
"""
def __init__(self, db: Session):
self.db = db
@staticmethod
def _format_dependency_values(
artifact_id: str,
dependencies: list[tuple[str, str, str]],
) -> list[dict]:
"""
Format dependencies for batch insert.
Args:
artifact_id: SHA256 of the artifact
dependencies: List of (project, package, version_constraint)
Returns:
List of dicts ready for bulk insert.
"""
return [
{
"artifact_id": artifact_id,
"dependency_project": proj,
"dependency_package": pkg,
"version_constraint": ver,
}
for proj, pkg, ver in dependencies
]
def get_or_create_artifact(
self,
sha256: str,
size: int,
filename: str,
content_type: Optional[str] = None,
) -> tuple[Artifact, bool]:
"""
Get existing artifact or create new one atomically.
Uses INSERT ... ON CONFLICT DO UPDATE to handle races.
If artifact exists, increments ref_count.
Args:
sha256: Content hash (primary key)
size: File size in bytes
filename: Original filename
content_type: MIME type
Returns:
(artifact, created) tuple where created is True for new artifacts.
"""
stmt = pg_insert(Artifact).values(
id=sha256,
size=size,
filename=filename,
content_type=content_type,
ref_count=1,
).on_conflict_do_update(
index_elements=['id'],
set_={'ref_count': Artifact.ref_count + 1}
).returning(Artifact)
result = self.db.execute(stmt)
artifact = result.scalar_one()
# Check if this was an insert or update by comparing ref_count
# ref_count=1 means new, >1 means existing
created = artifact.ref_count == 1
return artifact, created
def batch_upsert_dependencies(
self,
artifact_id: str,
dependencies: list[tuple[str, str, str]],
) -> int:
"""
Insert dependencies in a single batch operation.
Uses ON CONFLICT DO NOTHING to skip duplicates.
Args:
artifact_id: SHA256 of the artifact
dependencies: List of (project, package, version_constraint)
Returns:
Number of dependencies inserted.
"""
if not dependencies:
return 0
values = self._format_dependency_values(artifact_id, dependencies)
stmt = pg_insert(ArtifactDependency).values(values)
stmt = stmt.on_conflict_do_nothing(
index_elements=['artifact_id', 'dependency_project', 'dependency_package']
)
result = self.db.execute(stmt)
return result.rowcount
def get_cached_url_with_artifact(
self,
url_hash: str,
) -> Optional[tuple[CachedUrl, Artifact]]:
"""
Get cached URL and its artifact in a single query.
Args:
url_hash: SHA256 of the URL
Returns:
(CachedUrl, Artifact) tuple or None if not found.
"""
result = (
self.db.query(CachedUrl, Artifact)
.join(Artifact, CachedUrl.artifact_id == Artifact.id)
.filter(CachedUrl.url_hash == url_hash)
.first()
)
return result
def get_artifact_dependencies(
self,
artifact_id: str,
) -> list[ArtifactDependency]:
"""
Get all dependencies for an artifact in a single query.
Args:
artifact_id: SHA256 of the artifact
Returns:
List of ArtifactDependency objects.
"""
return (
self.db.query(ArtifactDependency)
.filter(ArtifactDependency.artifact_id == artifact_id)
.all()
)
```
**Step 4: Run test to verify it passes**
Run: `cd /home/mondo/orchard && python -m pytest backend/tests/unit/test_db_utils.py -v`
Expected: All tests PASS
**Step 5: Commit**
```bash
git add backend/app/db_utils.py backend/tests/unit/test_db_utils.py
git commit -m "feat: add ArtifactRepository with batch DB operations"
```
---
## Phase 4: PyPI Proxy Refactor
### Task 4.1: Add dependency injection helpers
**Files:**
- Modify: `backend/app/pypi_proxy.py`
**Step 1: Add imports and dependency injection**
At the top of the file, add these imports after existing ones (around line 28):
```python
from .http_client import HttpClientManager
from .cache_service import CacheService, CacheCategory
from .db_utils import ArtifactRepository
```
Add dependency injection functions after the router definition (around line 33):
```python
def get_http_client(request: Request) -> HttpClientManager:
"""Get HttpClientManager from app state."""
return request.app.state.http_client
def get_cache(request: Request) -> CacheService:
"""Get CacheService from app state."""
return request.app.state.cache
```
**Step 2: Verify syntax**
Run: `cd /home/mondo/orchard && python -m py_compile backend/app/pypi_proxy.py`
Expected: No output (success)
**Step 3: Commit**
```bash
git add backend/app/pypi_proxy.py
git commit -m "refactor: add infrastructure dependency injection to pypi_proxy"
```
---
### Task 4.2: Cache upstream sources lookup
**Files:**
- Modify: `backend/app/pypi_proxy.py`
**Step 1: Update _get_pypi_upstream_sources to use cache**
Find the `_get_pypi_upstream_sources` function (around line 244) and replace it:
```python
async def _get_pypi_upstream_sources_cached(
db: Session,
cache: CacheService,
) -> list[UpstreamSource]:
"""
Get PyPI upstream sources with caching.
Sources are cached for cache_ttl_upstream seconds to avoid
repeated database queries on every request.
"""
cache_key = "sources"
# Try cache first
cached = await cache.get(CacheCategory.UPSTREAM_SOURCES, cache_key, protocol="pypi")
if cached:
import json
source_data = json.loads(cached.decode())
# Reconstruct UpstreamSource-like objects from cached data
# We cache just the essential fields needed for requests
return [type('CachedSource', (), d)() for d in source_data]
# Query database
db_sources = (
db.query(UpstreamSource)
.filter(UpstreamSource.source_type == "pypi", UpstreamSource.enabled == True)
.order_by(UpstreamSource.priority)
.all()
)
# Combine with env sources
env_sources = [s for s in get_env_upstream_sources() if s.source_type == "pypi"]
all_sources = list(db_sources) + list(env_sources)
all_sources = sorted(all_sources, key=lambda s: s.priority)
# Cache the essential fields
if all_sources and cache.enabled:
import json
cache_data = [
{
"name": s.name,
"url": s.url,
"priority": s.priority,
"auth_type": getattr(s, "auth_type", "none"),
"username": getattr(s, "username", None),
"password": getattr(s, "password", None),
}
for s in all_sources
]
await cache.set(
CacheCategory.UPSTREAM_SOURCES,
cache_key,
json.dumps(cache_data).encode(),
protocol="pypi",
)
return all_sources
def _get_pypi_upstream_sources(db: Session) -> list[UpstreamSource]:
"""
Get PyPI upstream sources (non-cached version for sync contexts).
Prefer _get_pypi_upstream_sources_cached when cache is available.
"""
db_sources = (
db.query(UpstreamSource)
.filter(UpstreamSource.source_type == "pypi", UpstreamSource.enabled == True)
.order_by(UpstreamSource.priority)
.all()
)
env_sources = [s for s in get_env_upstream_sources() if s.source_type == "pypi"]
all_sources = list(db_sources) + list(env_sources)
return sorted(all_sources, key=lambda s: s.priority)
```
**Step 2: Verify syntax**
Run: `cd /home/mondo/orchard && python -m py_compile backend/app/pypi_proxy.py`
Expected: No output (success)
**Step 3: Commit**
```bash
git add backend/app/pypi_proxy.py
git commit -m "perf: cache upstream sources lookup in pypi_proxy"
```
---
### Task 4.3: Use shared HTTP client in pypi_download_file
**Files:**
- Modify: `backend/app/pypi_proxy.py`
**Step 1: Update pypi_download_file signature**
Find the `pypi_download_file` function (around line 595) and update its signature to accept the infrastructure:
```python
@router.get("/{project_name}/+f/{filename}")
async def pypi_download_file(
request: Request,
project_name: str,
filename: str,
db: Session = Depends(get_db),
storage: S3Storage = Depends(get_storage),
http_client: HttpClientManager = Depends(get_http_client),
cache: CacheService = Depends(get_cache),
):
```
**Step 2: Replace httpx.AsyncClient usage**
In the function, find the section that creates a new AsyncClient (around line 665):
```python
async with httpx.AsyncClient(timeout=timeout, follow_redirects=False) as client:
```
Replace with:
```python
client = http_client.get_client()
# Note: We don't use 'async with' since the client is managed by HttpClientManager
```
Then remove the corresponding `async with` indentation - the rest of the code that was inside the `async with` block should remain but be dedented one level.
**Step 3: Update timeout handling**
Since the shared client has default timeouts, for large file downloads we need to override per-request:
```python
# Use longer timeout for file downloads
download_timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0)
response = await client.get(upstream_url, headers=headers, timeout=download_timeout)
```
**Step 4: Verify syntax**
Run: `cd /home/mondo/orchard && python -m py_compile backend/app/pypi_proxy.py`
Expected: No output (success)
**Step 5: Commit**
```bash
git add backend/app/pypi_proxy.py
git commit -m "perf: use shared HTTP client pool in pypi_download_file"
```
---
### Task 4.4: Use batch dependency storage
**Files:**
- Modify: `backend/app/pypi_proxy.py`
**Step 1: Replace dependency storage loop with batch operation**
Find the dependency storage section (around line 824-847) that looks like:
```python
# Store extracted dependencies (deduplicate first - METADATA can list same dep under multiple extras)
if extracted_deps:
# Deduplicate: keep first version constraint seen for each package name
seen_deps: dict[str, str] = {}
for dep_name, dep_version in extracted_deps:
if dep_name not in seen_deps:
seen_deps[dep_name] = dep_version if dep_version else "*"
for dep_name, dep_version in seen_deps.items():
# Check if this dependency already exists for this artifact
existing_dep = db.query(ArtifactDependency).filter(
ArtifactDependency.artifact_id == sha256,
ArtifactDependency.dependency_project == "_pypi",
ArtifactDependency.dependency_package == dep_name,
).first()
if not existing_dep:
dep = ArtifactDependency(
artifact_id=sha256,
dependency_project="_pypi",
dependency_package=dep_name,
version_constraint=dep_version,
)
db.add(dep)
```
Replace with:
```python
# Store extracted dependencies using batch operation
if extracted_deps:
# Deduplicate: keep first version constraint seen for each package name
seen_deps: dict[str, str] = {}
for dep_name, dep_version in extracted_deps:
if dep_name not in seen_deps:
seen_deps[dep_name] = dep_version if dep_version else "*"
# Convert to list of tuples for batch insert
deps_to_store = [
("_pypi", dep_name, dep_version)
for dep_name, dep_version in seen_deps.items()
]
# Batch upsert - handles duplicates with ON CONFLICT DO NOTHING
repo = ArtifactRepository(db)
inserted = repo.batch_upsert_dependencies(sha256, deps_to_store)
if inserted > 0:
logger.debug(f"Stored {inserted} dependencies for {sha256[:12]}...")
```
**Step 2: Verify syntax**
Run: `cd /home/mondo/orchard && python -m py_compile backend/app/pypi_proxy.py`
Expected: No output (success)
**Step 3: Commit**
```bash
git add backend/app/pypi_proxy.py
git commit -m "perf: use batch dependency storage in pypi_proxy"
```
---
## Phase 5: Integration Tests
### Task 5.1: Add integration tests for infrastructure
**Files:**
- Modify: `backend/tests/integration/test_pypi_proxy.py`
**Step 1: Add infrastructure health test**
Add to the existing test file:
```python
class TestPyPIProxyInfrastructure:
"""Tests for PyPI proxy infrastructure integration."""
@pytest.mark.integration
def test_health_endpoint_includes_infrastructure(self, integration_client):
"""Health endpoint should report infrastructure status."""
response = integration_client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
# Infrastructure status may include these if implemented
# assert "infrastructure" in data
```
**Step 2: Run integration tests**
Run: `docker-compose -f docker-compose.local.yml exec -T orchard-server pytest backend/tests/integration/test_pypi_proxy.py -v --no-cov`
Expected: Tests pass
**Step 3: Commit**
```bash
git add backend/tests/integration/test_pypi_proxy.py
git commit -m "test: add infrastructure integration tests for pypi_proxy"
```
---
## Phase 6: Finalization
### Task 6.1: Update health endpoint with infrastructure status
**Files:**
- Modify: `backend/app/routes.py`
**Step 1: Find health endpoint and add infrastructure status**
Find the `/health` endpoint and update to include infrastructure:
```python
@router.get("/health")
async def health_check(
request: Request,
db: Session = Depends(get_db),
):
"""Health check endpoint with infrastructure status."""
# Basic health
health_status = {"status": "healthy"}
# Add infrastructure status if available
if hasattr(request.app.state, 'http_client'):
http_client = request.app.state.http_client
health_status["http_pool"] = {
"pool_size": http_client.pool_size,
"worker_threads": http_client.executor_max,
}
if hasattr(request.app.state, 'cache'):
cache = request.app.state.cache
health_status["cache"] = {
"enabled": cache.enabled,
"connected": await cache.ping() if cache.enabled else False,
}
return health_status
```
**Step 2: Verify syntax**
Run: `cd /home/mondo/orchard && python -m py_compile backend/app/routes.py`
Expected: No output (success)
**Step 3: Commit**
```bash
git add backend/app/routes.py
git commit -m "feat: add infrastructure status to health endpoint"
```
---
### Task 6.2: Rebuild and test locally
**Step 1: Rebuild container**
Run: `docker-compose -f docker-compose.local.yml build orchard-server`
Expected: Build succeeds
**Step 2: Restart with new code**
Run: `docker rm -f $(docker ps -aq --filter name=orchard_orchard-server) 2>/dev/null; docker-compose -f docker-compose.local.yml up -d orchard-server`
Expected: Container starts
**Step 3: Check health endpoint**
Run: `curl -s http://localhost:8080/health | python -m json.tool`
Expected: JSON with status and infrastructure info
**Step 4: Run all tests**
Run: `docker-compose -f docker-compose.local.yml exec -T orchard-server pytest backend/tests/ -v --no-cov`
Expected: All tests pass
**Step 5: Commit any fixes**
If tests fail, fix issues and commit.
---
### Task 6.3: Update CHANGELOG
**Files:**
- Modify: `CHANGELOG.md`
**Step 1: Add entry under [Unreleased]**
Add under `## [Unreleased]`:
```markdown
### Added
- HTTP connection pooling for upstream PyPI requests (reduces latency by ~200ms/request)
- Redis caching layer for package index pages and upstream source config
- Batch database operations for dependency storage (eliminates N+1 queries)
- Infrastructure status in health endpoint
### Changed
- Database connection pool defaults increased (pool_size: 5→20, max_overflow: 10→30)
- PyPI proxy now uses shared HTTP client instead of per-request connections
```
**Step 2: Commit**
```bash
git add CHANGELOG.md
git commit -m "docs: update CHANGELOG for PyPI proxy performance improvements"
```
---
### Task 6.4: Push and verify CI
**Step 1: Push all changes**
Run: `git push`
**Step 2: Monitor CI pipeline**
Check GitLab CI for build/test results.
---
## Summary
This implementation plan covers:
1. **Phase 1**: Dependencies and configuration
2. **Phase 2**: Infrastructure layer (HttpClientManager, CacheService)
3. **Phase 3**: Database optimization (ArtifactRepository)
4. **Phase 4**: PyPI proxy refactor to use new infrastructure
5. **Phase 5**: Integration tests
6. **Phase 6**: Finalization and deployment
Each task is self-contained with tests, making it safe to commit incrementally.