Add multi-tenancy with Teams feature
Implement team-based organization for projects with role-based access control: Backend: - Add teams and team_memberships database tables (migrations 009, 009b) - Add Team and TeamMembership ORM models with relationships - Implement TeamAuthorizationService for team-level access control - Add team CRUD, membership, and projects API endpoints - Update project creation to support team assignment Frontend: - Add TeamContext for managing team state with localStorage persistence - Add TeamSelector component for switching between teams - Add TeamsPage, TeamDashboardPage, TeamSettingsPage, TeamMembersPage - Add team API client functions - Update navigation with Teams link Security: - Team role hierarchy: owner > admin > member - Membership checked before system admin fallback - Self-modification prevention for role changes - Email visibility restricted to team admins/owners - Slug validation rejects consecutive hyphens Tests: - Unit tests for TeamAuthorizationService - Integration tests for all team API endpoints
This commit is contained in:
@@ -658,32 +658,51 @@ class AuthorizationService:
|
||||
self, project_id: str, user: Optional[User]
|
||||
) -> Optional[str]:
|
||||
"""Get the user's access level for a project.
|
||||
|
||||
|
||||
Returns the highest access level the user has, or None if no access.
|
||||
Checks in order:
|
||||
1. System admin - gets admin access to all projects
|
||||
2. Project owner (created_by) - gets admin access
|
||||
3. Explicit permission in access_permissions table
|
||||
3. Team-based access (owner/admin gets admin, member gets read)
|
||||
4. Explicit permission in access_permissions table
|
||||
5. Public access
|
||||
"""
|
||||
from .models import Project, AccessPermission
|
||||
|
||||
from .models import Project, AccessPermission, TeamMembership
|
||||
|
||||
# Get the project
|
||||
project = self.db.query(Project).filter(Project.id == project_id).first()
|
||||
if not project:
|
||||
return None
|
||||
|
||||
|
||||
# Anonymous users only get access to public projects
|
||||
if not user:
|
||||
return "read" if project.is_public else None
|
||||
|
||||
|
||||
# System admins get admin access everywhere
|
||||
if user.is_admin:
|
||||
return "admin"
|
||||
|
||||
|
||||
# Project owner gets admin access
|
||||
if project.created_by == user.username:
|
||||
return "admin"
|
||||
|
||||
|
||||
# Check team-based access if project belongs to a team
|
||||
if project.team_id:
|
||||
membership = (
|
||||
self.db.query(TeamMembership)
|
||||
.filter(
|
||||
TeamMembership.team_id == project.team_id,
|
||||
TeamMembership.user_id == user.id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
if membership:
|
||||
# Team owner/admin gets admin on all team projects
|
||||
if membership.role in ("owner", "admin"):
|
||||
return "admin"
|
||||
# Team member gets read access (upgradeable by explicit permission)
|
||||
# Continue checking explicit permissions for potential upgrade
|
||||
|
||||
# Check explicit permissions
|
||||
permission = (
|
||||
self.db.query(AccessPermission)
|
||||
@@ -693,13 +712,27 @@ class AuthorizationService:
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
|
||||
if permission:
|
||||
# Check expiration
|
||||
if permission.expires_at and permission.expires_at < datetime.now(timezone.utc):
|
||||
return "read" if project.is_public else None
|
||||
return permission.level
|
||||
|
||||
pass # Permission expired, fall through
|
||||
else:
|
||||
return permission.level
|
||||
|
||||
# Team member gets read access if no explicit permission
|
||||
if project.team_id:
|
||||
membership = (
|
||||
self.db.query(TeamMembership)
|
||||
.filter(
|
||||
TeamMembership.team_id == project.team_id,
|
||||
TeamMembership.user_id == user.id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
if membership:
|
||||
return "read"
|
||||
|
||||
# Fall back to public access
|
||||
return "read" if project.is_public else None
|
||||
|
||||
@@ -884,6 +917,226 @@ def check_project_access(
|
||||
return project
|
||||
|
||||
|
||||
# --- Team Authorization ---
|
||||
|
||||
# Team roles in order of increasing privilege
|
||||
TEAM_ROLES = ["member", "admin", "owner"]
|
||||
|
||||
|
||||
def get_team_role_rank(role: str) -> int:
|
||||
"""Get numeric rank for team role comparison."""
|
||||
try:
|
||||
return TEAM_ROLES.index(role)
|
||||
except ValueError:
|
||||
return -1
|
||||
|
||||
|
||||
def has_sufficient_team_role(user_role: str, required_role: str) -> bool:
|
||||
"""Check if user_role is sufficient for required_role.
|
||||
|
||||
Role hierarchy: owner > admin > member
|
||||
"""
|
||||
return get_team_role_rank(user_role) >= get_team_role_rank(required_role)
|
||||
|
||||
|
||||
class TeamAuthorizationService:
|
||||
"""Service for checking team-level authorization."""
|
||||
|
||||
def __init__(self, db: Session):
|
||||
self.db = db
|
||||
|
||||
def get_user_team_role(
|
||||
self, team_id: str, user: Optional[User]
|
||||
) -> Optional[str]:
|
||||
"""Get the user's role in a team.
|
||||
|
||||
Returns the role ('owner', 'admin', 'member') or None if not a member.
|
||||
System admins who are not team members are treated as team admins.
|
||||
"""
|
||||
from .models import Team, TeamMembership
|
||||
|
||||
if not user:
|
||||
return None
|
||||
|
||||
# Check actual membership first
|
||||
membership = (
|
||||
self.db.query(TeamMembership)
|
||||
.filter(
|
||||
TeamMembership.team_id == team_id,
|
||||
TeamMembership.user_id == user.id,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if membership:
|
||||
return membership.role
|
||||
|
||||
# System admins who are not members get admin access
|
||||
if user.is_admin:
|
||||
return "admin"
|
||||
|
||||
return None
|
||||
|
||||
def check_team_access(
|
||||
self,
|
||||
team_id: str,
|
||||
user: Optional[User],
|
||||
required_role: str = "member",
|
||||
) -> bool:
|
||||
"""Check if user has required role in team.
|
||||
|
||||
Args:
|
||||
team_id: Team ID to check
|
||||
user: User to check (None means no access)
|
||||
required_role: Minimum required role ('member', 'admin', 'owner')
|
||||
|
||||
Returns:
|
||||
True if user has sufficient role, False otherwise
|
||||
"""
|
||||
user_role = self.get_user_team_role(team_id, user)
|
||||
if not user_role:
|
||||
return False
|
||||
return has_sufficient_team_role(user_role, required_role)
|
||||
|
||||
def can_create_project(self, team_id: str, user: Optional[User]) -> bool:
|
||||
"""Check if user can create projects in team (requires admin+)."""
|
||||
return self.check_team_access(team_id, user, "admin")
|
||||
|
||||
def can_manage_members(self, team_id: str, user: Optional[User]) -> bool:
|
||||
"""Check if user can manage team members (requires admin+)."""
|
||||
return self.check_team_access(team_id, user, "admin")
|
||||
|
||||
def can_delete_team(self, team_id: str, user: Optional[User]) -> bool:
|
||||
"""Check if user can delete the team (requires owner)."""
|
||||
return self.check_team_access(team_id, user, "owner")
|
||||
|
||||
def get_team_by_slug(self, slug: str) -> Optional["Team"]:
|
||||
"""Get a team by its slug."""
|
||||
from .models import Team
|
||||
|
||||
return self.db.query(Team).filter(Team.slug == slug).first()
|
||||
|
||||
def get_user_teams(self, user: User) -> list:
|
||||
"""Get all teams a user is a member of."""
|
||||
from .models import Team, TeamMembership
|
||||
|
||||
return (
|
||||
self.db.query(Team)
|
||||
.join(TeamMembership)
|
||||
.filter(TeamMembership.user_id == user.id)
|
||||
.order_by(Team.name)
|
||||
.all()
|
||||
)
|
||||
|
||||
|
||||
def get_team_authorization_service(db: Session = Depends(get_db)) -> TeamAuthorizationService:
|
||||
"""Get a TeamAuthorizationService instance."""
|
||||
return TeamAuthorizationService(db)
|
||||
|
||||
|
||||
class TeamAccessChecker:
|
||||
"""Dependency for checking team access in route handlers."""
|
||||
|
||||
def __init__(self, required_role: str = "member"):
|
||||
self.required_role = required_role
|
||||
|
||||
def __call__(
|
||||
self,
|
||||
slug: str,
|
||||
db: Session = Depends(get_db),
|
||||
current_user: Optional[User] = Depends(get_current_user_optional),
|
||||
) -> User:
|
||||
"""Check if user has required role in team.
|
||||
|
||||
Raises 404 if team not found, 401 if not authenticated, 403 if insufficient role.
|
||||
Returns the current user.
|
||||
"""
|
||||
from .models import Team
|
||||
|
||||
# Find team by slug
|
||||
team = db.query(Team).filter(Team.slug == slug).first()
|
||||
if not team:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Team '{slug}' not found",
|
||||
)
|
||||
|
||||
if not current_user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Authentication required",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
auth_service = TeamAuthorizationService(db)
|
||||
|
||||
if not auth_service.check_team_access(str(team.id), current_user, self.required_role):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f"Insufficient team permissions. Required role: {self.required_role}",
|
||||
)
|
||||
|
||||
return current_user
|
||||
|
||||
|
||||
# Pre-configured team access checkers
|
||||
require_team_member = TeamAccessChecker("member")
|
||||
require_team_admin = TeamAccessChecker("admin")
|
||||
require_team_owner = TeamAccessChecker("owner")
|
||||
|
||||
|
||||
def check_team_access(
|
||||
db: Session,
|
||||
team_slug: str,
|
||||
user: Optional[User],
|
||||
required_role: str = "member",
|
||||
) -> "Team":
|
||||
"""Check if user has required role in team.
|
||||
|
||||
This is a helper function for use in route handlers.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
team_slug: Slug of the team
|
||||
user: Current user (can be None for no access)
|
||||
required_role: Required team role (member, admin, owner)
|
||||
|
||||
Returns:
|
||||
The Team object if access is granted
|
||||
|
||||
Raises:
|
||||
HTTPException 404: Team not found
|
||||
HTTPException 401: Authentication required
|
||||
HTTPException 403: Insufficient permissions
|
||||
"""
|
||||
from .models import Team
|
||||
|
||||
# Find team by slug
|
||||
team = db.query(Team).filter(Team.slug == team_slug).first()
|
||||
if not team:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_404_NOT_FOUND,
|
||||
detail=f"Team '{team_slug}' not found",
|
||||
)
|
||||
|
||||
if not user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Authentication required",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
|
||||
auth_service = TeamAuthorizationService(db)
|
||||
|
||||
if not auth_service.check_team_access(str(team.id), user, required_role):
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_403_FORBIDDEN,
|
||||
detail=f"Insufficient team permissions. Required role: {required_role}",
|
||||
)
|
||||
|
||||
return team
|
||||
|
||||
|
||||
# --- OIDC Configuration Service ---
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user