Add OIDC/SSO authentication support

Backend:
- Add OIDCConfig, OIDCConfigService, OIDCService classes for OIDC flow
- Add OIDC endpoints: status, config (get/update), login, callback
- Support authorization code flow with PKCE-compatible state parameter
- JWKS-based ID token validation with RS256 support
- Auto-provisioning of users from OIDC claims
- Admin group mapping for automatic admin role assignment

Frontend:
- Add SSO login button on login page (conditionally shown when enabled)
- Add OIDC admin configuration page (/admin/oidc)
- Add SSO Configuration link in admin menu
- Add OIDC types and API functions

Security:
- CSRF protection via state parameter in secure cookie
- Secure cookie settings (httponly, secure, samesite=lax)
- Client secret stored encrypted in database
- Token validation using provider's JWKS endpoint
This commit is contained in:
Mondo Diaz
2026-01-09 15:05:04 -06:00
parent 3ebdf51105
commit 1c31fe79cd
11 changed files with 1584 additions and 15 deletions

View File

@@ -867,3 +867,342 @@ def check_project_access(
)
return project
# --- OIDC Configuration Service ---
class OIDCConfig:
"""OIDC configuration data class."""
def __init__(
self,
enabled: bool = False,
issuer_url: str = "",
client_id: str = "",
client_secret: str = "",
scopes: list[str] = None,
auto_create_users: bool = True,
admin_group: str = "", # Group/role that grants admin access
):
self.enabled = enabled
self.issuer_url = issuer_url.rstrip("/") if issuer_url else ""
self.client_id = client_id
self.client_secret = client_secret
self.scopes = scopes or ["openid", "profile", "email"]
self.auto_create_users = auto_create_users
self.admin_group = admin_group
@property
def discovery_url(self) -> str:
"""Get the OIDC discovery URL."""
if not self.issuer_url:
return ""
return f"{self.issuer_url}/.well-known/openid-configuration"
def to_dict(self) -> dict:
"""Convert to dictionary for storage."""
return {
"enabled": self.enabled,
"issuer_url": self.issuer_url,
"client_id": self.client_id,
"client_secret": self.client_secret,
"scopes": self.scopes,
"auto_create_users": self.auto_create_users,
"admin_group": self.admin_group,
}
@classmethod
def from_dict(cls, data: dict) -> "OIDCConfig":
"""Create from dictionary."""
return cls(
enabled=data.get("enabled", False),
issuer_url=data.get("issuer_url", ""),
client_id=data.get("client_id", ""),
client_secret=data.get("client_secret", ""),
scopes=data.get("scopes", ["openid", "profile", "email"]),
auto_create_users=data.get("auto_create_users", True),
admin_group=data.get("admin_group", ""),
)
class OIDCConfigService:
"""Service for managing OIDC configuration."""
OIDC_CONFIG_KEY = "oidc_config"
def __init__(self, db: Session):
self.db = db
def get_config(self) -> OIDCConfig:
"""Get the current OIDC configuration."""
from .models import AuthSettings
import json
setting = (
self.db.query(AuthSettings)
.filter(AuthSettings.key == self.OIDC_CONFIG_KEY)
.first()
)
if not setting:
return OIDCConfig()
try:
data = json.loads(setting.value)
return OIDCConfig.from_dict(data)
except (json.JSONDecodeError, KeyError):
return OIDCConfig()
def save_config(self, config: OIDCConfig) -> None:
"""Save OIDC configuration."""
from .models import AuthSettings
import json
setting = (
self.db.query(AuthSettings)
.filter(AuthSettings.key == self.OIDC_CONFIG_KEY)
.first()
)
if setting:
setting.value = json.dumps(config.to_dict())
setting.updated_at = datetime.now(timezone.utc)
else:
setting = AuthSettings(
key=self.OIDC_CONFIG_KEY,
value=json.dumps(config.to_dict()),
)
self.db.add(setting)
self.db.commit()
def is_enabled(self) -> bool:
"""Check if OIDC is enabled."""
config = self.get_config()
return config.enabled and bool(config.issuer_url) and bool(config.client_id)
def get_oidc_config_service(db: Session = Depends(get_db)) -> OIDCConfigService:
"""Get an OIDCConfigService instance."""
return OIDCConfigService(db)
# --- OIDC Authentication Flow ---
class OIDCService:
"""Service for OIDC authentication flow."""
def __init__(self, db: Session, config: OIDCConfig):
self.db = db
self.config = config
self._discovery_doc: Optional[dict] = None
def get_discovery_document(self) -> Optional[dict]:
"""Fetch and cache the OIDC discovery document."""
if self._discovery_doc:
return self._discovery_doc
if not self.config.discovery_url:
return None
try:
import httpx
response = httpx.get(self.config.discovery_url, timeout=10.0)
response.raise_for_status()
self._discovery_doc = response.json()
return self._discovery_doc
except Exception as e:
logger.error(f"Failed to fetch OIDC discovery document: {e}")
return None
def get_authorization_url(self, redirect_uri: str, state: str) -> Optional[str]:
"""Generate the OIDC authorization URL."""
discovery = self.get_discovery_document()
if not discovery:
return None
auth_endpoint = discovery.get("authorization_endpoint")
if not auth_endpoint:
logger.error("No authorization_endpoint in discovery document")
return None
import urllib.parse
params = {
"client_id": self.config.client_id,
"response_type": "code",
"scope": " ".join(self.config.scopes),
"redirect_uri": redirect_uri,
"state": state,
}
return f"{auth_endpoint}?{urllib.parse.urlencode(params)}"
def exchange_code_for_tokens(
self, code: str, redirect_uri: str
) -> Optional[dict]:
"""Exchange authorization code for tokens."""
discovery = self.get_discovery_document()
if not discovery:
return None
token_endpoint = discovery.get("token_endpoint")
if not token_endpoint:
logger.error("No token_endpoint in discovery document")
return None
try:
import httpx
response = httpx.post(
token_endpoint,
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": redirect_uri,
"client_id": self.config.client_id,
"client_secret": self.config.client_secret,
},
timeout=10.0,
)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"Failed to exchange code for tokens: {e}")
return None
def validate_id_token(self, id_token: str) -> Optional[dict]:
"""Validate and decode the ID token."""
discovery = self.get_discovery_document()
if not discovery:
return None
try:
from jose import jwt, JWTError
import httpx
# Get JWKS
jwks_uri = discovery.get("jwks_uri")
if not jwks_uri:
logger.error("No jwks_uri in discovery document")
return None
response = httpx.get(jwks_uri, timeout=10.0)
response.raise_for_status()
jwks = response.json()
# Get the key ID from the token header
unverified_header = jwt.get_unverified_header(id_token)
kid = unverified_header.get("kid")
# Find the matching key
rsa_key = None
for key in jwks.get("keys", []):
if key.get("kid") == kid:
rsa_key = key
break
if not rsa_key:
logger.error(f"No matching key found in JWKS for kid: {kid}")
return None
# Decode and validate the token
payload = jwt.decode(
id_token,
rsa_key,
algorithms=["RS256"],
audience=self.config.client_id,
issuer=self.config.issuer_url,
)
return payload
except JWTError as e:
logger.error(f"ID token validation failed: {e}")
return None
except Exception as e:
logger.error(f"Error validating ID token: {e}")
return None
def get_or_create_user(self, id_token_claims: dict) -> Optional[User]:
"""Get or create a user from ID token claims."""
# Extract user info from claims
subject = id_token_claims.get("sub")
email = id_token_claims.get("email")
name = id_token_claims.get("name") or id_token_claims.get("preferred_username")
if not subject:
logger.error("No 'sub' claim in ID token")
return None
# Try to find existing user by OIDC subject
user = (
self.db.query(User)
.filter(
User.oidc_subject == subject,
User.oidc_issuer == self.config.issuer_url,
)
.first()
)
if user:
# Update last login
user.last_login = datetime.now(timezone.utc)
self.db.commit()
return user
# Try to find by email and link accounts
if email:
user = self.db.query(User).filter(User.email == email).first()
if user:
# Link OIDC identity to existing user
user.oidc_subject = subject
user.oidc_issuer = self.config.issuer_url
user.last_login = datetime.now(timezone.utc)
self.db.commit()
logger.info(f"Linked OIDC identity to existing user: {user.username}")
return user
# Create new user if auto-creation is enabled
if not self.config.auto_create_users:
logger.warning(f"Auto-creation disabled, rejecting new OIDC user: {subject}")
return None
# Determine username (use email prefix or subject)
username = email.split("@")[0] if email else subject
# Check for username collision
existing = self.db.query(User).filter(User.username == username).first()
if existing:
# Append part of subject to make unique
username = f"{username}_{subject[:8]}"
# Check if user should be admin based on groups/roles
is_admin = False
if self.config.admin_group:
groups = id_token_claims.get("groups", [])
roles = id_token_claims.get("roles", [])
is_admin = (
self.config.admin_group in groups
or self.config.admin_group in roles
)
# Create the user
user = User(
username=username,
email=email,
password_hash=None, # OIDC users don't have passwords
oidc_subject=subject,
oidc_issuer=self.config.issuer_url,
is_admin=is_admin,
must_change_password=False,
)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
logger.info(f"Created new OIDC user: {username} (admin={is_admin})")
return user