Files
orchard/backend/app/encryption.py
Mondo Diaz a3a49ac9c3 Add upstream caching infrastructure and refactor CI pipeline
Upstream Caching (Epic #68-#75, #105):
- Add upstream_sources and cache_settings tables with migrations
- Add cache management API endpoints (CRUD for sources, settings)
- Add environment variable overrides for upstream sources and cache settings
- Add encryption module for storing credentials securely
- Add frontend Admin Cache Management page
- Add is_system field to projects for system cache distinction
- Add purge_seed_data for transitioning to production-like environments

CI Pipeline Refactoring:
- Remove reset jobs (reset_stage_pre, reset_stage)
- Add ephemeral orchard-test deployment for main branch testing
- Run integration tests on ephemeral deployment before promoting to stage
- Stage is now long-running pre-prod (smoke tests only)
- Disable prosper_setup for tag pipelines
2026-01-29 11:28:59 -06:00

161 lines
4.3 KiB
Python

"""
Encryption utilities for sensitive data storage.
Uses Fernet symmetric encryption for credentials like upstream passwords.
The encryption key is sourced from ORCHARD_CACHE_ENCRYPTION_KEY environment variable.
If not set, a random key is generated on startup (with a warning).
"""
import base64
import logging
import os
import secrets
from functools import lru_cache
from typing import Optional
from cryptography.fernet import Fernet, InvalidToken
logger = logging.getLogger(__name__)
# Module-level storage for auto-generated key (only used if env var not set)
_generated_key: Optional[bytes] = None
def _get_key_from_env() -> Optional[bytes]:
"""Get encryption key from environment variable."""
key_str = os.environ.get("ORCHARD_CACHE_ENCRYPTION_KEY", "")
if not key_str:
return None
# Support both raw base64 and url-safe base64 formats
try:
# Try to decode as-is (Fernet keys are url-safe base64)
key_bytes = key_str.encode("utf-8")
# Validate it's a valid Fernet key by trying to create a Fernet instance
Fernet(key_bytes)
return key_bytes
except Exception:
pass
# Try base64 decoding if it's a raw 32-byte key encoded as base64
try:
decoded = base64.urlsafe_b64decode(key_str)
if len(decoded) == 32:
# Re-encode as url-safe base64 for Fernet
key_bytes = base64.urlsafe_b64encode(decoded)
Fernet(key_bytes)
return key_bytes
except Exception:
pass
logger.error(
"ORCHARD_CACHE_ENCRYPTION_KEY is set but invalid. "
"Must be a valid Fernet key (32 bytes, url-safe base64 encoded). "
"Generate one with: python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\""
)
return None
def get_encryption_key() -> bytes:
"""
Get the Fernet encryption key.
Returns the key from ORCHARD_CACHE_ENCRYPTION_KEY if set and valid,
otherwise generates a random key (with a warning logged).
The generated key is cached for the lifetime of the process.
"""
global _generated_key
# Try to get from environment
env_key = _get_key_from_env()
if env_key:
return env_key
# Generate a new key if needed
if _generated_key is None:
_generated_key = Fernet.generate_key()
logger.warning(
"ORCHARD_CACHE_ENCRYPTION_KEY not set - using auto-generated key. "
"Encrypted credentials will be lost on restart! "
"Set ORCHARD_CACHE_ENCRYPTION_KEY for persistent encryption. "
"Generate a key with: python -c \"from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())\""
)
return _generated_key
@lru_cache(maxsize=1)
def _get_fernet() -> Fernet:
"""Get a cached Fernet instance."""
return Fernet(get_encryption_key())
def encrypt_value(plaintext: str) -> bytes:
"""
Encrypt a string value using Fernet.
Args:
plaintext: The string to encrypt
Returns:
Encrypted bytes (includes Fernet token with timestamp)
"""
if not plaintext:
raise ValueError("Cannot encrypt empty value")
fernet = _get_fernet()
return fernet.encrypt(plaintext.encode("utf-8"))
def decrypt_value(ciphertext: bytes) -> str:
"""
Decrypt a Fernet-encrypted value.
Args:
ciphertext: The encrypted bytes
Returns:
Decrypted string
Raises:
InvalidToken: If decryption fails (wrong key or corrupted data)
"""
if not ciphertext:
raise ValueError("Cannot decrypt empty value")
fernet = _get_fernet()
return fernet.decrypt(ciphertext).decode("utf-8")
def can_decrypt(ciphertext: bytes) -> bool:
"""
Check if a value can be decrypted with the current key.
Useful for checking if credentials are still valid after key rotation.
Args:
ciphertext: The encrypted bytes
Returns:
True if decryption succeeds, False otherwise
"""
if not ciphertext:
return False
try:
decrypt_value(ciphertext)
return True
except (InvalidToken, ValueError):
return False
def generate_key() -> str:
"""
Generate a new Fernet encryption key.
Returns:
A valid Fernet key as a string (url-safe base64 encoded)
"""
return Fernet.generate_key().decode("utf-8")