""" Encryption utilities for sensitive data storage. Uses Fernet symmetric encryption for credentials like upstream passwords. The encryption key is sourced from ORCHARD_CACHE_ENCRYPTION_KEY environment variable. If not set, a random key is generated on startup (with a warning). """ import base64 import logging import os import secrets from functools import lru_cache from typing import Optional from cryptography.fernet import Fernet, InvalidToken logger = logging.getLogger(__name__) # Module-level storage for auto-generated key (only used if env var not set) _generated_key: Optional[bytes] = None def _get_key_from_env() -> Optional[bytes]: """Get encryption key from environment variable.""" key_str = os.environ.get("ORCHARD_CACHE_ENCRYPTION_KEY", "") if not key_str: return None # Support both raw base64 and url-safe base64 formats try: # Try to decode as-is (Fernet keys are url-safe base64) key_bytes = key_str.encode("utf-8") # Validate it's a valid Fernet key by trying to create a Fernet instance Fernet(key_bytes) return key_bytes except Exception: pass # Try base64 decoding if it's a raw 32-byte key encoded as base64 try: decoded = base64.urlsafe_b64decode(key_str) if len(decoded) == 32: # Re-encode as url-safe base64 for Fernet key_bytes = base64.urlsafe_b64encode(decoded) Fernet(key_bytes) return key_bytes except Exception: pass logger.error( "ORCHARD_CACHE_ENCRYPTION_KEY is set but invalid. " "Must be a valid Fernet key (32 bytes, url-safe base64 encoded). " "Generate one with: python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\"" ) return None def get_encryption_key() -> bytes: """ Get the Fernet encryption key. Returns the key from ORCHARD_CACHE_ENCRYPTION_KEY if set and valid, otherwise generates a random key (with a warning logged). The generated key is cached for the lifetime of the process. """ global _generated_key # Try to get from environment env_key = _get_key_from_env() if env_key: return env_key # Generate a new key if needed if _generated_key is None: _generated_key = Fernet.generate_key() logger.warning( "ORCHARD_CACHE_ENCRYPTION_KEY not set - using auto-generated key. " "Encrypted credentials will be lost on restart! " "Set ORCHARD_CACHE_ENCRYPTION_KEY for persistent encryption. " "Generate a key with: python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\"" ) return _generated_key @lru_cache(maxsize=1) def _get_fernet() -> Fernet: """Get a cached Fernet instance.""" return Fernet(get_encryption_key()) def encrypt_value(plaintext: str) -> bytes: """ Encrypt a string value using Fernet. Args: plaintext: The string to encrypt Returns: Encrypted bytes (includes Fernet token with timestamp) """ if not plaintext: raise ValueError("Cannot encrypt empty value") fernet = _get_fernet() return fernet.encrypt(plaintext.encode("utf-8")) def decrypt_value(ciphertext: bytes) -> str: """ Decrypt a Fernet-encrypted value. Args: ciphertext: The encrypted bytes Returns: Decrypted string Raises: InvalidToken: If decryption fails (wrong key or corrupted data) """ if not ciphertext: raise ValueError("Cannot decrypt empty value") fernet = _get_fernet() return fernet.decrypt(ciphertext).decode("utf-8") def can_decrypt(ciphertext: bytes) -> bool: """ Check if a value can be decrypted with the current key. Useful for checking if credentials are still valid after key rotation. Args: ciphertext: The encrypted bytes Returns: True if decryption succeeds, False otherwise """ if not ciphertext: return False try: decrypt_value(ciphertext) return True except (InvalidToken, ValueError): return False def generate_key() -> str: """ Generate a new Fernet encryption key. Returns: A valid Fernet key as a string (url-safe base64 encoded) """ return Fernet.generate_key().decode("utf-8")