""" Redis-backed caching service with category-aware TTL and invalidation. Provides: - Immutable caching for artifact data (hermetic builds) - TTL-based caching for discovery data - Event-driven invalidation for config changes - Graceful fallback when Redis unavailable """ import logging from enum import Enum from typing import Optional from .config import Settings logger = logging.getLogger(__name__) class CacheCategory(Enum): """ Cache categories with different TTL and invalidation rules. Immutable (cache forever): - ARTIFACT_METADATA: Artifact info by SHA256 - ARTIFACT_DEPENDENCIES: Extracted deps by SHA256 - DEPENDENCY_RESOLUTION: Resolution results by input hash Mutable (TTL + event invalidation): - UPSTREAM_SOURCES: Upstream config, invalidate on DB change - PACKAGE_INDEX: PyPI/npm index pages, TTL only - PACKAGE_VERSIONS: Version listings, TTL only """ # Immutable - cache forever (hermetic builds) ARTIFACT_METADATA = "artifact" ARTIFACT_DEPENDENCIES = "deps" DEPENDENCY_RESOLUTION = "resolve" # Mutable - TTL + event invalidation UPSTREAM_SOURCES = "upstream" PACKAGE_INDEX = "index" PACKAGE_VERSIONS = "versions" def get_category_ttl(category: CacheCategory, settings: Settings) -> Optional[int]: """ Get TTL for a cache category. Returns: TTL in seconds, or None for no expiry (immutable). """ ttl_map = { # Immutable - no TTL CacheCategory.ARTIFACT_METADATA: None, CacheCategory.ARTIFACT_DEPENDENCIES: None, CacheCategory.DEPENDENCY_RESOLUTION: None, # Mutable - configurable TTL CacheCategory.UPSTREAM_SOURCES: settings.cache_ttl_upstream, CacheCategory.PACKAGE_INDEX: settings.cache_ttl_index, CacheCategory.PACKAGE_VERSIONS: settings.cache_ttl_versions, } return ttl_map.get(category) class CacheService: """ Redis-backed caching with category-aware TTL. Key format: orchard:{category}:{protocol}:{identifier} Example: orchard:deps:pypi:abc123def456 When Redis is disabled or unavailable, operations gracefully return None/no-op to allow the application to function without caching. """ def __init__(self, settings: Settings): self._settings = settings self._enabled = settings.redis_enabled self._redis: Optional["redis.asyncio.Redis"] = None self._started = False async def startup(self) -> None: """Initialize Redis connection. Called by FastAPI lifespan.""" if self._started: return if not self._enabled: logger.info("CacheService disabled (redis_enabled=False)") self._started = True return try: import redis.asyncio as redis logger.info( f"Connecting to Redis at {self._settings.redis_host}:" f"{self._settings.redis_port}/{self._settings.redis_db}" ) self._redis = redis.Redis( host=self._settings.redis_host, port=self._settings.redis_port, db=self._settings.redis_db, password=self._settings.redis_password, decode_responses=False, # We handle bytes ) # Test connection await self._redis.ping() logger.info("CacheService connected to Redis") except ImportError: logger.warning("redis package not installed, caching disabled") self._enabled = False except Exception as e: logger.warning(f"Redis connection failed, caching disabled: {e}") self._enabled = False self._redis = None self._started = True async def shutdown(self) -> None: """Close Redis connection. Called by FastAPI lifespan.""" if not self._started: return if self._redis: await self._redis.aclose() self._redis = None self._started = False logger.info("CacheService shutdown complete") @staticmethod def _make_key(category: CacheCategory, protocol: str, identifier: str) -> str: """Build namespaced cache key.""" return f"orchard:{category.value}:{protocol}:{identifier}" async def get( self, category: CacheCategory, key: str, protocol: str = "default", ) -> Optional[bytes]: """ Get cached value. Args: category: Cache category for TTL rules key: Unique identifier within category protocol: Protocol namespace (pypi, npm, etc.) Returns: Cached bytes or None if not found/disabled. """ if not self._enabled or not self._redis: return None try: full_key = self._make_key(category, protocol, key) return await self._redis.get(full_key) except Exception as e: logger.warning(f"Cache get failed for {key}: {e}") return None async def set( self, category: CacheCategory, key: str, value: bytes, protocol: str = "default", ) -> None: """ Set cached value with category-appropriate TTL. Args: category: Cache category for TTL rules key: Unique identifier within category value: Bytes to cache protocol: Protocol namespace (pypi, npm, etc.) """ if not self._enabled or not self._redis: return try: full_key = self._make_key(category, protocol, key) ttl = get_category_ttl(category, self._settings) if ttl is None: await self._redis.set(full_key, value) else: await self._redis.setex(full_key, ttl, value) except Exception as e: logger.warning(f"Cache set failed for {key}: {e}") async def delete( self, category: CacheCategory, key: str, protocol: str = "default", ) -> None: """Delete a specific cache entry.""" if not self._enabled or not self._redis: return try: full_key = self._make_key(category, protocol, key) await self._redis.delete(full_key) except Exception as e: logger.warning(f"Cache delete failed for {key}: {e}") async def invalidate_pattern( self, category: CacheCategory, pattern: str = "*", protocol: str = "default", ) -> int: """ Invalidate all entries matching pattern. Args: category: Cache category pattern: Glob pattern for keys (default "*" = all in category) protocol: Protocol namespace Returns: Number of keys deleted. """ if not self._enabled or not self._redis: return 0 try: full_pattern = self._make_key(category, protocol, pattern) keys = [] async for key in self._redis.scan_iter(match=full_pattern): keys.append(key) if keys: return await self._redis.delete(*keys) return 0 except Exception as e: logger.warning(f"Cache invalidate failed for pattern {pattern}: {e}") return 0 async def ping(self) -> bool: """Check if Redis is connected and responding.""" if not self._enabled or not self._redis: return False try: await self._redis.ping() return True except Exception: return False @property def enabled(self) -> bool: """Check if caching is enabled.""" return self._enabled