Add multi-tenancy with Teams feature

This commit is contained in:
Mondo Diaz
2026-01-28 12:50:58 -06:00
parent a5796f5437
commit 576791d19e
33 changed files with 5493 additions and 115 deletions

View File

@@ -658,32 +658,51 @@ class AuthorizationService:
self, project_id: str, user: Optional[User]
) -> Optional[str]:
"""Get the user's access level for a project.
Returns the highest access level the user has, or None if no access.
Checks in order:
1. System admin - gets admin access to all projects
2. Project owner (created_by) - gets admin access
3. Explicit permission in access_permissions table
3. Team-based access (owner/admin gets admin, member gets read)
4. Explicit permission in access_permissions table
5. Public access
"""
from .models import Project, AccessPermission
from .models import Project, AccessPermission, TeamMembership
# Get the project
project = self.db.query(Project).filter(Project.id == project_id).first()
if not project:
return None
# Anonymous users only get access to public projects
if not user:
return "read" if project.is_public else None
# System admins get admin access everywhere
if user.is_admin:
return "admin"
# Project owner gets admin access
if project.created_by == user.username:
return "admin"
# Check team-based access if project belongs to a team
if project.team_id:
membership = (
self.db.query(TeamMembership)
.filter(
TeamMembership.team_id == project.team_id,
TeamMembership.user_id == user.id,
)
.first()
)
if membership:
# Team owner/admin gets admin on all team projects
if membership.role in ("owner", "admin"):
return "admin"
# Team member gets read access (upgradeable by explicit permission)
# Continue checking explicit permissions for potential upgrade
# Check explicit permissions
permission = (
self.db.query(AccessPermission)
@@ -693,13 +712,27 @@ class AuthorizationService:
)
.first()
)
if permission:
# Check expiration
if permission.expires_at and permission.expires_at < datetime.now(timezone.utc):
return "read" if project.is_public else None
return permission.level
pass # Permission expired, fall through
else:
return permission.level
# Team member gets read access if no explicit permission
if project.team_id:
membership = (
self.db.query(TeamMembership)
.filter(
TeamMembership.team_id == project.team_id,
TeamMembership.user_id == user.id,
)
.first()
)
if membership:
return "read"
# Fall back to public access
return "read" if project.is_public else None
@@ -884,6 +917,226 @@ def check_project_access(
return project
# --- Team Authorization ---
# Team roles in order of increasing privilege
TEAM_ROLES = ["member", "admin", "owner"]
def get_team_role_rank(role: str) -> int:
"""Get numeric rank for team role comparison."""
try:
return TEAM_ROLES.index(role)
except ValueError:
return -1
def has_sufficient_team_role(user_role: str, required_role: str) -> bool:
"""Check if user_role is sufficient for required_role.
Role hierarchy: owner > admin > member
"""
return get_team_role_rank(user_role) >= get_team_role_rank(required_role)
class TeamAuthorizationService:
"""Service for checking team-level authorization."""
def __init__(self, db: Session):
self.db = db
def get_user_team_role(
self, team_id: str, user: Optional[User]
) -> Optional[str]:
"""Get the user's role in a team.
Returns the role ('owner', 'admin', 'member') or None if not a member.
System admins who are not team members are treated as team admins.
"""
from .models import Team, TeamMembership
if not user:
return None
# Check actual membership first
membership = (
self.db.query(TeamMembership)
.filter(
TeamMembership.team_id == team_id,
TeamMembership.user_id == user.id,
)
.first()
)
if membership:
return membership.role
# System admins who are not members get admin access
if user.is_admin:
return "admin"
return None
def check_team_access(
self,
team_id: str,
user: Optional[User],
required_role: str = "member",
) -> bool:
"""Check if user has required role in team.
Args:
team_id: Team ID to check
user: User to check (None means no access)
required_role: Minimum required role ('member', 'admin', 'owner')
Returns:
True if user has sufficient role, False otherwise
"""
user_role = self.get_user_team_role(team_id, user)
if not user_role:
return False
return has_sufficient_team_role(user_role, required_role)
def can_create_project(self, team_id: str, user: Optional[User]) -> bool:
"""Check if user can create projects in team (requires admin+)."""
return self.check_team_access(team_id, user, "admin")
def can_manage_members(self, team_id: str, user: Optional[User]) -> bool:
"""Check if user can manage team members (requires admin+)."""
return self.check_team_access(team_id, user, "admin")
def can_delete_team(self, team_id: str, user: Optional[User]) -> bool:
"""Check if user can delete the team (requires owner)."""
return self.check_team_access(team_id, user, "owner")
def get_team_by_slug(self, slug: str) -> Optional["Team"]:
"""Get a team by its slug."""
from .models import Team
return self.db.query(Team).filter(Team.slug == slug).first()
def get_user_teams(self, user: User) -> list:
"""Get all teams a user is a member of."""
from .models import Team, TeamMembership
return (
self.db.query(Team)
.join(TeamMembership)
.filter(TeamMembership.user_id == user.id)
.order_by(Team.name)
.all()
)
def get_team_authorization_service(db: Session = Depends(get_db)) -> TeamAuthorizationService:
"""Get a TeamAuthorizationService instance."""
return TeamAuthorizationService(db)
class TeamAccessChecker:
"""Dependency for checking team access in route handlers."""
def __init__(self, required_role: str = "member"):
self.required_role = required_role
def __call__(
self,
slug: str,
db: Session = Depends(get_db),
current_user: Optional[User] = Depends(get_current_user_optional),
) -> User:
"""Check if user has required role in team.
Raises 404 if team not found, 401 if not authenticated, 403 if insufficient role.
Returns the current user.
"""
from .models import Team
# Find team by slug
team = db.query(Team).filter(Team.slug == slug).first()
if not team:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Team '{slug}' not found",
)
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
auth_service = TeamAuthorizationService(db)
if not auth_service.check_team_access(str(team.id), current_user, self.required_role):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Insufficient team permissions. Required role: {self.required_role}",
)
return current_user
# Pre-configured team access checkers
require_team_member = TeamAccessChecker("member")
require_team_admin = TeamAccessChecker("admin")
require_team_owner = TeamAccessChecker("owner")
def check_team_access(
db: Session,
team_slug: str,
user: Optional[User],
required_role: str = "member",
) -> "Team":
"""Check if user has required role in team.
This is a helper function for use in route handlers.
Args:
db: Database session
team_slug: Slug of the team
user: Current user (can be None for no access)
required_role: Required team role (member, admin, owner)
Returns:
The Team object if access is granted
Raises:
HTTPException 404: Team not found
HTTPException 401: Authentication required
HTTPException 403: Insufficient permissions
"""
from .models import Team
# Find team by slug
team = db.query(Team).filter(Team.slug == team_slug).first()
if not team:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Team '{team_slug}' not found",
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
auth_service = TeamAuthorizationService(db)
if not auth_service.check_team_access(str(team.id), user, required_role):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Insufficient team permissions. Required role: {required_role}",
)
return team
# --- OIDC Configuration Service ---

View File

@@ -32,6 +32,7 @@ class Project(Base):
DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow
)
created_by = Column(String(255), nullable=False)
team_id = Column(UUID(as_uuid=True), ForeignKey("teams.id", ondelete="SET NULL"))
packages = relationship(
"Package", back_populates="project", cascade="all, delete-orphan"
@@ -39,10 +40,12 @@ class Project(Base):
permissions = relationship(
"AccessPermission", back_populates="project", cascade="all, delete-orphan"
)
team = relationship("Team", back_populates="projects")
__table_args__ = (
Index("idx_projects_name", "name"),
Index("idx_projects_created_by", "created_by"),
Index("idx_projects_team_id", "team_id"),
)
@@ -369,6 +372,9 @@ class User(Base):
sessions = relationship(
"Session", back_populates="user", cascade="all, delete-orphan"
)
team_memberships = relationship(
"TeamMembership", back_populates="user", cascade="all, delete-orphan"
)
__table_args__ = (
Index("idx_users_username", "username"),
@@ -561,3 +567,73 @@ class ArtifactDependency(Base):
unique=True,
),
)
class Team(Base):
"""Team for organizing projects and users."""
__tablename__ = "teams"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
name = Column(String(255), nullable=False)
slug = Column(String(255), unique=True, nullable=False)
description = Column(Text)
created_at = Column(DateTime(timezone=True), default=datetime.utcnow)
updated_at = Column(
DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow
)
created_by = Column(String(255), nullable=False)
settings = Column(JSON, default=dict)
# Relationships
memberships = relationship(
"TeamMembership", back_populates="team", cascade="all, delete-orphan"
)
projects = relationship("Project", back_populates="team")
__table_args__ = (
Index("idx_teams_slug", "slug"),
Index("idx_teams_created_by", "created_by"),
Index("idx_teams_created_at", "created_at"),
CheckConstraint(
"slug ~ '^[a-z0-9][a-z0-9-]*[a-z0-9]$' OR slug ~ '^[a-z0-9]$'",
name="check_team_slug_format",
),
)
class TeamMembership(Base):
"""Maps users to teams with their roles."""
__tablename__ = "team_memberships"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
team_id = Column(
UUID(as_uuid=True),
ForeignKey("teams.id", ondelete="CASCADE"),
nullable=False,
)
user_id = Column(
UUID(as_uuid=True),
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
)
role = Column(String(20), nullable=False, default="member")
created_at = Column(DateTime(timezone=True), default=datetime.utcnow)
invited_by = Column(String(255))
# Relationships
team = relationship("Team", back_populates="memberships")
user = relationship("User", back_populates="team_memberships")
__table_args__ = (
Index("idx_team_memberships_team_id", "team_id"),
Index("idx_team_memberships_user_id", "user_id"),
Index("idx_team_memberships_role", "role"),
Index("idx_team_memberships_team_role", "team_id", "role"),
Index("idx_team_memberships_unique", "team_id", "user_id", unique=True),
CheckConstraint(
"role IN ('owner', 'admin', 'member')",
name="check_team_role",
),
)

View File

@@ -16,7 +16,7 @@ from fastapi import (
)
from fastapi.responses import StreamingResponse, RedirectResponse, PlainTextResponse
from sqlalchemy.orm import Session
from sqlalchemy import or_, and_, func, text
from sqlalchemy import or_, and_, func, text, case
from typing import List, Optional, Literal
import math
import io
@@ -48,6 +48,8 @@ from .models import (
AccessPermission,
PackageVersion,
ArtifactDependency,
Team,
TeamMembership,
)
from .schemas import (
ProjectCreate,
@@ -127,6 +129,13 @@ from .schemas import (
DependencyResolutionResponse,
CircularDependencyError as CircularDependencyErrorSchema,
DependencyConflictError as DependencyConflictErrorSchema,
TeamCreate,
TeamUpdate,
TeamResponse,
TeamDetailResponse,
TeamMemberCreate,
TeamMemberUpdate,
TeamMemberResponse,
)
from .metadata import extract_metadata
from .dependencies import (
@@ -558,6 +567,9 @@ from .auth import (
MIN_PASSWORD_LENGTH,
check_project_access,
AuthorizationService,
TeamAuthorizationService,
check_team_access,
get_team_authorization_service,
)
from .rate_limit import limiter, LOGIN_RATE_LIMIT
@@ -1081,6 +1093,43 @@ def oidc_callback(
return response
# --- User Search Routes (for autocomplete) ---
@router.get("/api/v1/users/search")
def search_users(
q: str = Query(..., min_length=1, description="Search query for username"),
limit: int = Query(default=10, ge=1, le=50, description="Maximum results"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""
Search for users by username prefix.
Returns basic user info for autocomplete (no email for privacy).
Any authenticated user can search.
"""
search_pattern = f"{q.lower()}%"
users = (
db.query(User)
.filter(
func.lower(User.username).like(search_pattern),
User.is_active == True,
)
.order_by(User.username)
.limit(limit)
.all()
)
return [
{
"id": str(u.id),
"username": u.username,
"is_admin": u.is_admin,
}
for u in users
]
# --- Admin User Management Routes ---
@@ -1438,15 +1487,46 @@ def list_projects(
)
# Base query - filter by access
query = db.query(Project).filter(
or_(Project.is_public == True, Project.created_by == user_id)
)
# Users can see projects that are:
# 1. Public
# 2. Created by them
# 3. Belong to a team they're a member of
if current_user:
# Get team IDs where user is a member
user_team_ids = db.query(TeamMembership.team_id).filter(
TeamMembership.user_id == current_user.id
).subquery()
query = db.query(Project).filter(
or_(
Project.is_public == True,
Project.created_by == user_id,
Project.team_id.in_(user_team_ids)
)
)
else:
# Anonymous users only see public projects
query = db.query(Project).filter(Project.is_public == True)
# Apply visibility filter
if visibility == "public":
query = query.filter(Project.is_public == True)
elif visibility == "private":
query = query.filter(Project.is_public == False, Project.created_by == user_id)
if current_user:
# Get team IDs where user is a member (for private filter)
user_team_ids_for_private = db.query(TeamMembership.team_id).filter(
TeamMembership.user_id == current_user.id
).subquery()
query = query.filter(
Project.is_public == False,
or_(
Project.created_by == user_id,
Project.team_id.in_(user_team_ids_for_private)
)
)
else:
# Anonymous users can't see private projects
query = query.filter(False)
# Apply search filter (case-insensitive on name and description)
if search:
@@ -1543,11 +1623,33 @@ def create_project(
if existing:
raise HTTPException(status_code=400, detail="Project already exists")
# If team_id is provided, verify user has admin access to the team
team = None
if project.team_id:
team = db.query(Team).filter(Team.id == project.team_id).first()
if not team:
raise HTTPException(status_code=404, detail="Team not found")
# Check if user has admin role in team
if current_user:
team_auth = TeamAuthorizationService(db)
if not team_auth.can_create_project(str(team.id), current_user):
raise HTTPException(
status_code=403,
detail="Requires admin role in team to create projects",
)
else:
raise HTTPException(
status_code=401,
detail="Authentication required to create projects in a team",
)
db_project = Project(
name=project.name,
description=project.description,
is_public=project.is_public,
created_by=user_id,
team_id=project.team_id,
)
db.add(db_project)
@@ -1558,12 +1660,28 @@ def create_project(
resource=f"project/{project.name}",
user_id=user_id,
source_ip=request.client.host if request.client else None,
details={"is_public": project.is_public},
details={
"is_public": project.is_public,
"team_id": str(project.team_id) if project.team_id else None,
},
)
db.commit()
db.refresh(db_project)
return db_project
# Build response with team info
return ProjectResponse(
id=db_project.id,
name=db_project.name,
description=db_project.description,
is_public=db_project.is_public,
created_at=db_project.created_at,
updated_at=db_project.updated_at,
created_by=db_project.created_by,
team_id=team.id if team else None,
team_slug=team.slug if team else None,
team_name=team.name if team else None,
)
@router.get("/api/v1/projects/{project_name}", response_model=ProjectResponse)
@@ -1574,7 +1692,20 @@ def get_project(
):
"""Get a single project by name. Requires read access for private projects."""
project = check_project_access(db, project_name, current_user, "read")
return project
# Build response with team info
return ProjectResponse(
id=project.id,
name=project.name,
description=project.description,
is_public=project.is_public,
created_at=project.created_at,
updated_at=project.updated_at,
created_by=project.created_by,
team_id=project.team.id if project.team else None,
team_slug=project.team.slug if project.team else None,
team_name=project.team.name if project.team else None,
)
@router.put("/api/v1/projects/{project_name}", response_model=ProjectResponse)
@@ -1701,14 +1832,63 @@ def list_project_permissions(
):
"""
List all access permissions for a project.
Includes both explicit permissions and team-based access.
Requires admin access to the project.
"""
project = check_project_access(db, project_name, current_user, "admin")
auth_service = AuthorizationService(db)
permissions = auth_service.list_project_permissions(str(project.id))
explicit_permissions = auth_service.list_project_permissions(str(project.id))
return permissions
# Convert to response format with source field
result = []
for perm in explicit_permissions:
result.append(AccessPermissionResponse(
id=perm.id,
project_id=perm.project_id,
user_id=perm.user_id,
level=perm.level,
created_at=perm.created_at,
expires_at=perm.expires_at,
source="explicit",
))
# Add team-based access if project belongs to a team
if project.team_id:
team = db.query(Team).filter(Team.id == project.team_id).first()
if team:
memberships = (
db.query(TeamMembership)
.join(User, TeamMembership.user_id == User.id)
.filter(TeamMembership.team_id == project.team_id)
.all()
)
# Track users who already have explicit permissions
explicit_users = {p.user_id for p in result}
for membership in memberships:
user = db.query(User).filter(User.id == membership.user_id).first()
if user and user.username not in explicit_users:
# Map team role to project access level
if membership.role in ("owner", "admin"):
level = "admin"
else:
level = "read"
result.append(AccessPermissionResponse(
id=membership.id, # Use membership ID
project_id=project.id,
user_id=user.username,
level=level,
created_at=membership.created_at,
expires_at=None,
source="team",
team_slug=team.slug,
team_role=membership.role,
))
return result
@router.post(
@@ -1842,6 +2022,653 @@ def get_my_project_access(
}
# Team routes
@router.get("/api/v1/teams", response_model=PaginatedResponse[TeamDetailResponse])
def list_teams(
page: int = Query(default=1, ge=1, description="Page number"),
limit: int = Query(default=20, ge=1, le=100, description="Items per page"),
search: Optional[str] = Query(default=None, description="Search by name or slug"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all teams the current user belongs to."""
# Base query - teams user is a member of
query = (
db.query(Team)
.join(TeamMembership)
.filter(TeamMembership.user_id == current_user.id)
)
# Apply search filter
if search:
search_lower = search.lower()
query = query.filter(
or_(
func.lower(Team.name).contains(search_lower),
func.lower(Team.slug).contains(search_lower),
)
)
# Get total count
total = query.count()
# Apply sorting and pagination
query = query.order_by(Team.name)
offset = (page - 1) * limit
teams = query.offset(offset).limit(limit).all()
# Calculate total pages
total_pages = math.ceil(total / limit) if total > 0 else 1
# Build response with member counts and user roles
items = []
for team in teams:
member_count = db.query(TeamMembership).filter(TeamMembership.team_id == team.id).count()
project_count = db.query(Project).filter(Project.team_id == team.id).count()
# Get user's role in this team
membership = (
db.query(TeamMembership)
.filter(
TeamMembership.team_id == team.id,
TeamMembership.user_id == current_user.id,
)
.first()
)
items.append(
TeamDetailResponse(
id=team.id,
name=team.name,
slug=team.slug,
description=team.description,
created_at=team.created_at,
updated_at=team.updated_at,
member_count=member_count,
project_count=project_count,
user_role=membership.role if membership else None,
)
)
return PaginatedResponse(
items=items,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
has_more=page < total_pages,
),
)
@router.post("/api/v1/teams", response_model=TeamDetailResponse, status_code=201)
def create_team(
team_data: TeamCreate,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Create a new team. The creator becomes the owner."""
# Check if slug already exists
existing = db.query(Team).filter(Team.slug == team_data.slug).first()
if existing:
raise HTTPException(status_code=400, detail="Team slug already exists")
# Create the team
team = Team(
name=team_data.name,
slug=team_data.slug,
description=team_data.description,
created_by=current_user.username,
)
db.add(team)
db.flush() # Get the team ID
# Add creator as owner
membership = TeamMembership(
team_id=team.id,
user_id=current_user.id,
role="owner",
invited_by=current_user.username,
)
db.add(membership)
# Audit log
_log_audit(
db=db,
action="team.create",
resource=f"team/{team.slug}",
user_id=current_user.username,
source_ip=request.client.host if request.client else None,
details={"team_name": team.name},
)
db.commit()
db.refresh(team)
return TeamDetailResponse(
id=team.id,
name=team.name,
slug=team.slug,
description=team.description,
created_at=team.created_at,
updated_at=team.updated_at,
member_count=1,
project_count=0,
user_role="owner",
)
@router.get("/api/v1/teams/{slug}", response_model=TeamDetailResponse)
def get_team(
slug: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Get team details. Requires team membership."""
team = check_team_access(db, slug, current_user, "member")
member_count = db.query(TeamMembership).filter(TeamMembership.team_id == team.id).count()
project_count = db.query(Project).filter(Project.team_id == team.id).count()
# Get user's role
membership = (
db.query(TeamMembership)
.filter(
TeamMembership.team_id == team.id,
TeamMembership.user_id == current_user.id,
)
.first()
)
user_role = membership.role if membership else ("admin" if current_user.is_admin else None)
return TeamDetailResponse(
id=team.id,
name=team.name,
slug=team.slug,
description=team.description,
created_at=team.created_at,
updated_at=team.updated_at,
member_count=member_count,
project_count=project_count,
user_role=user_role,
)
@router.put("/api/v1/teams/{slug}", response_model=TeamDetailResponse)
def update_team(
slug: str,
team_update: TeamUpdate,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update team details. Requires admin role."""
team = check_team_access(db, slug, current_user, "admin")
# Track changes for audit
changes = {}
if team_update.name is not None and team_update.name != team.name:
changes["name"] = {"old": team.name, "new": team_update.name}
team.name = team_update.name
if team_update.description is not None and team_update.description != team.description:
changes["description"] = {"old": team.description, "new": team_update.description}
team.description = team_update.description
if changes:
_log_audit(
db=db,
action="team.update",
resource=f"team/{slug}",
user_id=current_user.username,
source_ip=request.client.host if request.client else None,
details=changes,
)
db.commit()
db.refresh(team)
member_count = db.query(TeamMembership).filter(TeamMembership.team_id == team.id).count()
project_count = db.query(Project).filter(Project.team_id == team.id).count()
membership = (
db.query(TeamMembership)
.filter(
TeamMembership.team_id == team.id,
TeamMembership.user_id == current_user.id,
)
.first()
)
user_role = membership.role if membership else ("admin" if current_user.is_admin else None)
return TeamDetailResponse(
id=team.id,
name=team.name,
slug=team.slug,
description=team.description,
created_at=team.created_at,
updated_at=team.updated_at,
member_count=member_count,
project_count=project_count,
user_role=user_role,
)
@router.delete("/api/v1/teams/{slug}", status_code=204)
def delete_team(
slug: str,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Delete a team. Requires owner role."""
team = check_team_access(db, slug, current_user, "owner")
# Check if team has any projects
project_count = db.query(Project).filter(Project.team_id == team.id).count()
if project_count > 0:
raise HTTPException(
status_code=400,
detail=f"Cannot delete team with {project_count} project(s). Move or delete projects first.",
)
# Audit log
_log_audit(
db=db,
action="team.delete",
resource=f"team/{slug}",
user_id=current_user.username,
source_ip=request.client.host if request.client else None,
details={"team_name": team.name},
)
db.delete(team)
db.commit()
return Response(status_code=204)
# Team membership routes
@router.get("/api/v1/teams/{slug}/members", response_model=List[TeamMemberResponse])
def list_team_members(
slug: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all members of a team. Requires team membership.
Email addresses are only visible to team admins/owners.
"""
team = check_team_access(db, slug, current_user, "member")
# Check if current user is admin/owner to determine email visibility
current_membership = (
db.query(TeamMembership)
.filter(
TeamMembership.team_id == team.id,
TeamMembership.user_id == current_user.id,
)
.first()
)
can_see_emails = (
current_user.is_admin or
(current_membership and current_membership.role in ("owner", "admin"))
)
memberships = (
db.query(TeamMembership)
.join(User)
.filter(TeamMembership.team_id == team.id)
.order_by(
# Sort by role (owner first, then admin, then member)
case(
(TeamMembership.role == "owner", 0),
(TeamMembership.role == "admin", 1),
else_=2,
),
User.username,
)
.all()
)
return [
TeamMemberResponse(
id=m.id,
user_id=m.user_id,
username=m.user.username,
email=m.user.email if can_see_emails else None,
role=m.role,
created_at=m.created_at,
)
for m in memberships
]
@router.post("/api/v1/teams/{slug}/members", response_model=TeamMemberResponse, status_code=201)
def add_team_member(
slug: str,
member_data: TeamMemberCreate,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Add a member to a team. Requires admin role."""
team = check_team_access(db, slug, current_user, "admin")
# Find the user by username
user = db.query(User).filter(User.username == member_data.username).first()
if not user:
raise HTTPException(status_code=404, detail=f"User '{member_data.username}' not found")
# Check if already a member
existing = (
db.query(TeamMembership)
.filter(
TeamMembership.team_id == team.id,
TeamMembership.user_id == user.id,
)
.first()
)
if existing:
raise HTTPException(status_code=400, detail="User is already a member of this team")
# Only owners can add other owners
if member_data.role == "owner":
current_membership = (
db.query(TeamMembership)
.filter(
TeamMembership.team_id == team.id,
TeamMembership.user_id == current_user.id,
)
.first()
)
if not current_membership or current_membership.role != "owner":
raise HTTPException(status_code=403, detail="Only owners can add other owners")
membership = TeamMembership(
team_id=team.id,
user_id=user.id,
role=member_data.role,
invited_by=current_user.username,
)
db.add(membership)
_log_audit(
db=db,
action="team.member.add",
resource=f"team/{slug}/members/{member_data.username}",
user_id=current_user.username,
source_ip=request.client.host if request.client else None,
details={"role": member_data.role},
)
db.commit()
db.refresh(membership)
return TeamMemberResponse(
id=membership.id,
user_id=membership.user_id,
username=user.username,
email=user.email,
role=membership.role,
created_at=membership.created_at,
)
@router.put("/api/v1/teams/{slug}/members/{username}", response_model=TeamMemberResponse)
def update_team_member(
slug: str,
username: str,
member_update: TeamMemberUpdate,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Update a member's role. Requires admin role."""
team = check_team_access(db, slug, current_user, "admin")
# Find the user
user = db.query(User).filter(User.username == username).first()
if not user:
raise HTTPException(status_code=404, detail=f"User '{username}' not found")
# Find the membership
membership = (
db.query(TeamMembership)
.filter(
TeamMembership.team_id == team.id,
TeamMembership.user_id == user.id,
)
.first()
)
if not membership:
raise HTTPException(status_code=404, detail=f"User '{username}' is not a member of this team")
# Prevent self-role modification
if user.id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot modify your own role")
# Get current user's membership to check permissions
current_membership = (
db.query(TeamMembership)
.filter(
TeamMembership.team_id == team.id,
TeamMembership.user_id == current_user.id,
)
.first()
)
current_role = current_membership.role if current_membership else None
# Prevent demoting the last owner
if membership.role == "owner" and member_update.role != "owner":
owner_count = (
db.query(TeamMembership)
.filter(
TeamMembership.team_id == team.id,
TeamMembership.role == "owner",
)
.count()
)
if owner_count <= 1:
raise HTTPException(status_code=400, detail="Cannot demote the last owner")
# Only team owners can modify other owners or promote to owner (system admins cannot)
if membership.role == "owner" or member_update.role == "owner":
if current_role != "owner":
raise HTTPException(status_code=403, detail="Only team owners can modify owner roles")
old_role = membership.role
membership.role = member_update.role
_log_audit(
db=db,
action="team.member.update",
resource=f"team/{slug}/members/{username}",
user_id=current_user.username,
source_ip=request.client.host if request.client else None,
details={"old_role": old_role, "new_role": member_update.role},
)
db.commit()
db.refresh(membership)
return TeamMemberResponse(
id=membership.id,
user_id=membership.user_id,
username=user.username,
email=user.email,
role=membership.role,
created_at=membership.created_at,
)
@router.delete("/api/v1/teams/{slug}/members/{username}", status_code=204)
def remove_team_member(
slug: str,
username: str,
request: Request,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Remove a member from a team. Requires admin role."""
team = check_team_access(db, slug, current_user, "admin")
# Find the user
user = db.query(User).filter(User.username == username).first()
if not user:
raise HTTPException(status_code=404, detail=f"User '{username}' not found")
# Find the membership
membership = (
db.query(TeamMembership)
.filter(
TeamMembership.team_id == team.id,
TeamMembership.user_id == user.id,
)
.first()
)
if not membership:
raise HTTPException(status_code=404, detail=f"User '{username}' is not a member of this team")
# Prevent self-removal (use a "leave team" action instead if needed)
if user.id == current_user.id:
raise HTTPException(status_code=400, detail="Cannot remove yourself. Transfer ownership first if you are an owner.")
# Prevent removing the last owner
if membership.role == "owner":
owner_count = (
db.query(TeamMembership)
.filter(
TeamMembership.team_id == team.id,
TeamMembership.role == "owner",
)
.count()
)
if owner_count <= 1:
raise HTTPException(status_code=400, detail="Cannot remove the last owner")
# Only team owners can remove other owners (system admins cannot)
current_membership = (
db.query(TeamMembership)
.filter(
TeamMembership.team_id == team.id,
TeamMembership.user_id == current_user.id,
)
.first()
)
if not current_membership or current_membership.role != "owner":
raise HTTPException(status_code=403, detail="Only team owners can remove other owners")
_log_audit(
db=db,
action="team.member.remove",
resource=f"team/{slug}/members/{username}",
user_id=current_user.username,
source_ip=request.client.host if request.client else None,
details={"role": membership.role},
)
db.delete(membership)
db.commit()
return Response(status_code=204)
# Team projects route
@router.get("/api/v1/teams/{slug}/projects", response_model=PaginatedResponse[ProjectResponse])
def list_team_projects(
slug: str,
page: int = Query(default=1, ge=1, description="Page number"),
limit: int = Query(default=20, ge=1, le=100, description="Items per page"),
search: Optional[str] = Query(default=None, description="Search by name or description"),
visibility: Optional[str] = Query(default=None, description="Filter by visibility (public, private)"),
sort: str = Query(default="name", description="Sort field (name, created_at, updated_at)"),
order: str = Query(default="asc", description="Sort order (asc, desc)"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""List all projects in a team. Requires team membership."""
team = check_team_access(db, slug, current_user, "member")
# Validate sort field
valid_sort_fields = {
"name": Project.name,
"created_at": Project.created_at,
"updated_at": Project.updated_at,
}
if sort not in valid_sort_fields:
raise HTTPException(
status_code=400,
detail=f"Invalid sort field. Must be one of: {', '.join(valid_sort_fields.keys())}",
)
if order not in ("asc", "desc"):
raise HTTPException(status_code=400, detail="Invalid order. Must be 'asc' or 'desc'")
# Base query - projects in this team
query = db.query(Project).filter(Project.team_id == team.id)
# Apply visibility filter
if visibility == "public":
query = query.filter(Project.is_public == True)
elif visibility == "private":
query = query.filter(Project.is_public == False)
# Apply search filter
if search:
search_lower = search.lower()
query = query.filter(
or_(
func.lower(Project.name).contains(search_lower),
func.lower(Project.description).contains(search_lower),
)
)
# Get total count
total = query.count()
# Apply sorting
sort_column = valid_sort_fields[sort]
if order == "desc":
query = query.order_by(sort_column.desc())
else:
query = query.order_by(sort_column.asc())
# Apply pagination
offset = (page - 1) * limit
projects = query.offset(offset).limit(limit).all()
# Calculate total pages
total_pages = math.ceil(total / limit) if total > 0 else 1
# Build response with team info
items = []
for p in projects:
items.append(
ProjectResponse(
id=p.id,
name=p.name,
description=p.description,
is_public=p.is_public,
created_at=p.created_at,
updated_at=p.updated_at,
created_by=p.created_by,
team_id=team.id,
team_slug=team.slug,
team_name=team.name,
)
)
return PaginatedResponse(
items=items,
pagination=PaginationMeta(
page=page,
limit=limit,
total=total,
total_pages=total_pages,
has_more=page < total_pages,
),
)
# Package routes
@router.get(
"/api/v1/project/{project_name}/packages",

View File

@@ -25,6 +25,7 @@ class ProjectCreate(BaseModel):
name: str
description: Optional[str] = None
is_public: bool = True
team_id: Optional[UUID] = None
class ProjectResponse(BaseModel):
@@ -35,6 +36,9 @@ class ProjectResponse(BaseModel):
created_at: datetime
updated_at: datetime
created_by: str
team_id: Optional[UUID] = None
team_slug: Optional[str] = None
team_name: Optional[str] = None
class Config:
from_attributes = True
@@ -907,6 +911,9 @@ class AccessPermissionResponse(BaseModel):
level: str
created_at: datetime
expires_at: Optional[datetime]
source: Optional[str] = "explicit" # "explicit" or "team"
team_slug: Optional[str] = None # Team slug if source is "team"
team_role: Optional[str] = None # Team role if source is "team"
class Config:
from_attributes = True
@@ -1053,3 +1060,139 @@ class CircularDependencyError(BaseModel):
message: str
cycle: List[str] # List of "project/package" strings showing the cycle
# Team schemas
TEAM_ROLES = ["owner", "admin", "member"]
RESERVED_TEAM_SLUGS = {"new", "api", "admin", "settings", "members", "projects", "search"}
class TeamCreate(BaseModel):
"""Create a new team"""
name: str
slug: str
description: Optional[str] = None
@field_validator('name')
@classmethod
def validate_name(cls, v: str) -> str:
"""Validate team name."""
if not v or not v.strip():
raise ValueError("Name cannot be empty")
if len(v) > 255:
raise ValueError("Name must be 255 characters or less")
return v.strip()
@field_validator('slug')
@classmethod
def validate_slug(cls, v: str) -> str:
"""Validate team slug format (lowercase alphanumeric with hyphens)."""
import re
if not v:
raise ValueError("Slug cannot be empty")
if len(v) < 2:
raise ValueError("Slug must be at least 2 characters")
if len(v) > 255:
raise ValueError("Slug must be 255 characters or less")
if not re.match(r'^[a-z0-9][a-z0-9-]*[a-z0-9]$', v) and not re.match(r'^[a-z0-9]$', v):
raise ValueError(
"Slug must be lowercase alphanumeric with hyphens, "
"starting and ending with alphanumeric characters"
)
if '--' in v:
raise ValueError("Slug cannot contain consecutive hyphens")
if v in RESERVED_TEAM_SLUGS:
raise ValueError(f"Slug '{v}' is reserved and cannot be used")
return v
@field_validator('description')
@classmethod
def validate_description(cls, v: Optional[str]) -> Optional[str]:
"""Validate team description."""
if v is not None and len(v) > 2000:
raise ValueError("Description must be 2000 characters or less")
return v
class TeamUpdate(BaseModel):
"""Update team details"""
name: Optional[str] = None
description: Optional[str] = None
@field_validator('name')
@classmethod
def validate_name(cls, v: Optional[str]) -> Optional[str]:
"""Validate team name."""
if v is not None:
if not v.strip():
raise ValueError("Name cannot be empty")
if len(v) > 255:
raise ValueError("Name must be 255 characters or less")
return v.strip()
return v
@field_validator('description')
@classmethod
def validate_description(cls, v: Optional[str]) -> Optional[str]:
"""Validate team description."""
if v is not None and len(v) > 2000:
raise ValueError("Description must be 2000 characters or less")
return v
class TeamResponse(BaseModel):
"""Team response with basic info"""
id: UUID
name: str
slug: str
description: Optional[str]
created_at: datetime
updated_at: datetime
member_count: int = 0
project_count: int = 0
class Config:
from_attributes = True
class TeamDetailResponse(TeamResponse):
"""Team response with user's role"""
user_role: Optional[str] = None # 'owner', 'admin', 'member', or None
class TeamMemberCreate(BaseModel):
"""Add a member to a team"""
username: str
role: str = "member"
@field_validator('role')
@classmethod
def validate_role(cls, v: str) -> str:
if v not in TEAM_ROLES:
raise ValueError(f"Role must be one of: {', '.join(TEAM_ROLES)}")
return v
class TeamMemberUpdate(BaseModel):
"""Update a team member's role"""
role: str
@field_validator('role')
@classmethod
def validate_role(cls, v: str) -> str:
if v not in TEAM_ROLES:
raise ValueError(f"Role must be one of: {', '.join(TEAM_ROLES)}")
return v
class TeamMemberResponse(BaseModel):
"""Team member response"""
id: UUID
user_id: UUID
username: str
email: Optional[str]
role: str
created_at: datetime
class Config:
from_attributes = True

View File

@@ -5,8 +5,9 @@ import hashlib
import logging
from sqlalchemy.orm import Session
from .models import Project, Package, Artifact, Tag, Upload, PackageVersion, ArtifactDependency
from .models import Project, Package, Artifact, Tag, Upload, PackageVersion, ArtifactDependency, Team, TeamMembership, User
from .storage import get_storage
from .auth import hash_password
logger = logging.getLogger(__name__)
@@ -149,6 +150,80 @@ def seed_database(db: Session) -> None:
logger.info("Seeding database with test data...")
storage = get_storage()
# Find or use admin user for team ownership
admin_user = db.query(User).filter(User.username == "admin").first()
team_owner_username = admin_user.username if admin_user else "seed-user"
# Create a demo team
demo_team = Team(
name="Demo Team",
slug="demo-team",
description="A demonstration team with sample projects",
created_by=team_owner_username,
)
db.add(demo_team)
db.flush()
# Add admin user as team owner if they exist
if admin_user:
membership = TeamMembership(
team_id=demo_team.id,
user_id=admin_user.id,
role="owner",
invited_by=team_owner_username,
)
db.add(membership)
db.flush()
logger.info(f"Created team: {demo_team.name} ({demo_team.slug})")
# Create test users with various roles
test_users = [
{"username": "alice", "email": "alice@example.com", "role": "admin"},
{"username": "bob", "email": "bob@example.com", "role": "admin"},
{"username": "charlie", "email": "charlie@example.com", "role": "member"},
{"username": "diana", "email": "diana@example.com", "role": "member"},
{"username": "eve", "email": "eve@example.com", "role": "member"},
{"username": "frank", "email": None, "role": "member"},
]
for user_data in test_users:
# Check if user already exists
existing_user = db.query(User).filter(User.username == user_data["username"]).first()
if existing_user:
test_user = existing_user
else:
# Create the user with password same as username
test_user = User(
username=user_data["username"],
email=user_data["email"],
password_hash=hash_password(user_data["username"]),
is_admin=False,
is_active=True,
must_change_password=False,
)
db.add(test_user)
db.flush()
logger.info(f"Created test user: {user_data['username']}")
# Add to demo team with specified role
existing_membership = db.query(TeamMembership).filter(
TeamMembership.team_id == demo_team.id,
TeamMembership.user_id == test_user.id,
).first()
if not existing_membership:
membership = TeamMembership(
team_id=demo_team.id,
user_id=test_user.id,
role=user_data["role"],
invited_by=team_owner_username,
)
db.add(membership)
logger.info(f"Added {user_data['username']} to {demo_team.slug} as {user_data['role']}")
db.flush()
# Create projects and packages
project_map = {}
package_map = {}
@@ -158,7 +233,8 @@ def seed_database(db: Session) -> None:
name=project_data["name"],
description=project_data["description"],
is_public=project_data["is_public"],
created_by="seed-user",
created_by=team_owner_username,
team_id=demo_team.id, # Assign to demo team
)
db.add(project)
db.flush() # Get the ID
@@ -174,7 +250,7 @@ def seed_database(db: Session) -> None:
db.flush()
package_map[(project_data["name"], package_data["name"])] = package
logger.info(f"Created {len(project_map)} projects and {len(package_map)} packages")
logger.info(f"Created {len(project_map)} projects and {len(package_map)} packages (assigned to {demo_team.slug})")
# Create artifacts, tags, and versions
artifact_count = 0
@@ -212,7 +288,7 @@ def seed_database(db: Session) -> None:
size=size,
content_type=artifact_data["content_type"],
original_name=artifact_data["filename"],
created_by="seed-user",
created_by=team_owner_username,
s3_key=s3_key,
ref_count=ref_count,
)
@@ -235,7 +311,7 @@ def seed_database(db: Session) -> None:
artifact_id=sha256_hash,
version=artifact_data["version"],
version_source="explicit",
created_by="seed-user",
created_by=team_owner_username,
)
db.add(version)
version_count += 1
@@ -246,7 +322,7 @@ def seed_database(db: Session) -> None:
package_id=package.id,
name=tag_name,
artifact_id=sha256_hash,
created_by="seed-user",
created_by=team_owner_username,
)
db.add(tag)
tag_count += 1

View File

@@ -0,0 +1,316 @@
"""
Integration tests for Teams API endpoints.
"""
import pytest
@pytest.mark.integration
class TestTeamsCRUD:
"""Tests for team creation, listing, updating, and deletion."""
def test_create_team(self, integration_client, unique_test_id):
"""Test creating a new team."""
team_name = f"Test Team {unique_test_id}"
team_slug = f"test-team-{unique_test_id}"
response = integration_client.post(
"/api/v1/teams",
json={
"name": team_name,
"slug": team_slug,
"description": "A test team",
},
)
assert response.status_code == 201, f"Failed to create team: {response.text}"
data = response.json()
assert data["name"] == team_name
assert data["slug"] == team_slug
assert data["description"] == "A test team"
assert data["user_role"] == "owner"
assert data["member_count"] == 1
assert data["project_count"] == 0
# Cleanup
integration_client.delete(f"/api/v1/teams/{team_slug}")
def test_create_team_duplicate_slug(self, integration_client, unique_test_id):
"""Test that duplicate team slugs are rejected."""
team_slug = f"dup-team-{unique_test_id}"
# Create first team
response = integration_client.post(
"/api/v1/teams",
json={"name": "First Team", "slug": team_slug},
)
assert response.status_code == 201
# Try to create second team with same slug
response = integration_client.post(
"/api/v1/teams",
json={"name": "Second Team", "slug": team_slug},
)
assert response.status_code == 400
assert "already exists" in response.json()["detail"].lower()
# Cleanup
integration_client.delete(f"/api/v1/teams/{team_slug}")
def test_create_team_invalid_slug(self, integration_client):
"""Test that invalid team slugs are rejected."""
invalid_slugs = [
"UPPERCASE",
"with spaces",
"-starts-with-hyphen",
"ends-with-hyphen-",
"has--double--hyphen",
]
for invalid_slug in invalid_slugs:
response = integration_client.post(
"/api/v1/teams",
json={"name": "Test", "slug": invalid_slug},
)
assert response.status_code == 422, f"Slug '{invalid_slug}' should be invalid"
def test_list_teams(self, integration_client, unique_test_id):
"""Test listing teams the user belongs to."""
# Create a team
team_slug = f"list-team-{unique_test_id}"
integration_client.post(
"/api/v1/teams",
json={"name": "List Test Team", "slug": team_slug},
)
# List teams
response = integration_client.get("/api/v1/teams")
assert response.status_code == 200
data = response.json()
assert "items" in data
assert "pagination" in data
# Find our team
team = next((t for t in data["items"] if t["slug"] == team_slug), None)
assert team is not None
assert team["name"] == "List Test Team"
# Cleanup
integration_client.delete(f"/api/v1/teams/{team_slug}")
def test_get_team(self, integration_client, unique_test_id):
"""Test getting team details."""
team_slug = f"get-team-{unique_test_id}"
integration_client.post(
"/api/v1/teams",
json={"name": "Get Test Team", "slug": team_slug, "description": "Test"},
)
response = integration_client.get(f"/api/v1/teams/{team_slug}")
assert response.status_code == 200
data = response.json()
assert data["slug"] == team_slug
assert data["name"] == "Get Test Team"
assert data["user_role"] == "owner"
# Cleanup
integration_client.delete(f"/api/v1/teams/{team_slug}")
def test_get_nonexistent_team(self, integration_client):
"""Test getting a team that doesn't exist."""
response = integration_client.get("/api/v1/teams/nonexistent-team-12345")
assert response.status_code == 404
def test_update_team(self, integration_client, unique_test_id):
"""Test updating team details."""
team_slug = f"update-team-{unique_test_id}"
integration_client.post(
"/api/v1/teams",
json={"name": "Original Name", "slug": team_slug},
)
response = integration_client.put(
f"/api/v1/teams/{team_slug}",
json={"name": "Updated Name", "description": "New description"},
)
assert response.status_code == 200
data = response.json()
assert data["name"] == "Updated Name"
assert data["description"] == "New description"
assert data["slug"] == team_slug # Slug should not change
# Cleanup
integration_client.delete(f"/api/v1/teams/{team_slug}")
def test_delete_team(self, integration_client, unique_test_id):
"""Test deleting a team."""
team_slug = f"delete-team-{unique_test_id}"
integration_client.post(
"/api/v1/teams",
json={"name": "Delete Test Team", "slug": team_slug},
)
response = integration_client.delete(f"/api/v1/teams/{team_slug}")
assert response.status_code == 204
# Verify team is gone
response = integration_client.get(f"/api/v1/teams/{team_slug}")
assert response.status_code == 404
@pytest.mark.integration
class TestTeamMembers:
"""Tests for team membership management."""
@pytest.fixture
def test_team(self, integration_client, unique_test_id):
"""Create a test team for member tests."""
team_slug = f"member-team-{unique_test_id}"
response = integration_client.post(
"/api/v1/teams",
json={"name": "Member Test Team", "slug": team_slug},
)
assert response.status_code == 201
yield team_slug
# Cleanup
try:
integration_client.delete(f"/api/v1/teams/{team_slug}")
except Exception:
pass
def test_list_members(self, integration_client, test_team):
"""Test listing team members."""
response = integration_client.get(f"/api/v1/teams/{test_team}/members")
assert response.status_code == 200
members = response.json()
assert len(members) == 1
assert members[0]["role"] == "owner"
def test_owner_is_first_member(self, integration_client, test_team):
"""Test that the team creator is automatically the owner."""
response = integration_client.get(f"/api/v1/teams/{test_team}/members")
members = response.json()
assert len(members) >= 1
owner = next((m for m in members if m["role"] == "owner"), None)
assert owner is not None
@pytest.mark.integration
class TestTeamProjects:
"""Tests for team project management."""
@pytest.fixture
def test_team(self, integration_client, unique_test_id):
"""Create a test team for project tests."""
team_slug = f"proj-team-{unique_test_id}"
response = integration_client.post(
"/api/v1/teams",
json={"name": "Project Test Team", "slug": team_slug},
)
assert response.status_code == 201
data = response.json()
yield {"slug": team_slug, "id": data["id"]}
# Cleanup
try:
integration_client.delete(f"/api/v1/teams/{team_slug}")
except Exception:
pass
def test_list_team_projects_empty(self, integration_client, test_team):
"""Test listing projects in an empty team."""
response = integration_client.get(f"/api/v1/teams/{test_team['slug']}/projects")
assert response.status_code == 200
data = response.json()
assert data["items"] == []
assert data["pagination"]["total"] == 0
def test_create_project_in_team(self, integration_client, test_team, unique_test_id):
"""Test creating a project within a team."""
project_name = f"team-project-{unique_test_id}"
response = integration_client.post(
"/api/v1/projects",
json={
"name": project_name,
"description": "A team project",
"team_id": test_team["id"],
},
)
assert response.status_code == 200, f"Failed to create project: {response.text}"
data = response.json()
assert data["team_id"] == test_team["id"]
assert data["team_slug"] == test_team["slug"]
# Verify project appears in team projects list
response = integration_client.get(f"/api/v1/teams/{test_team['slug']}/projects")
assert response.status_code == 200
projects = response.json()["items"]
assert any(p["name"] == project_name for p in projects)
# Cleanup
integration_client.delete(f"/api/v1/projects/{project_name}")
def test_project_team_info_in_response(self, integration_client, test_team, unique_test_id):
"""Test that project responses include team info."""
project_name = f"team-info-project-{unique_test_id}"
# Create project in team
integration_client.post(
"/api/v1/projects",
json={"name": project_name, "team_id": test_team["id"]},
)
# Get project and verify team info
response = integration_client.get(f"/api/v1/projects/{project_name}")
assert response.status_code == 200
data = response.json()
assert data["team_id"] == test_team["id"]
assert data["team_slug"] == test_team["slug"]
assert data["team_name"] == "Project Test Team"
# Cleanup
integration_client.delete(f"/api/v1/projects/{project_name}")
@pytest.mark.integration
class TestTeamAuthorization:
"""Tests for team-based authorization."""
def test_cannot_delete_team_with_projects(self, integration_client, unique_test_id):
"""Test that teams with projects cannot be deleted."""
team_slug = f"nodelete-team-{unique_test_id}"
project_name = f"nodelete-project-{unique_test_id}"
# Create team
response = integration_client.post(
"/api/v1/teams",
json={"name": "No Delete Team", "slug": team_slug},
)
team_id = response.json()["id"]
# Create project in team
integration_client.post(
"/api/v1/projects",
json={"name": project_name, "team_id": team_id},
)
# Try to delete team - should fail
response = integration_client.delete(f"/api/v1/teams/{team_slug}")
assert response.status_code == 400
assert "project" in response.json()["detail"].lower()
# Cleanup - delete project first, then team
integration_client.delete(f"/api/v1/projects/{project_name}")
integration_client.delete(f"/api/v1/teams/{team_slug}")

View File

@@ -0,0 +1,213 @@
"""
Unit tests for TeamAuthorizationService.
"""
import pytest
from unittest.mock import MagicMock, patch
import uuid
class TestTeamRoleHierarchy:
"""Tests for team role hierarchy functions."""
def test_get_team_role_rank(self):
"""Test role ranking."""
from app.auth import get_team_role_rank
assert get_team_role_rank("member") == 0
assert get_team_role_rank("admin") == 1
assert get_team_role_rank("owner") == 2
assert get_team_role_rank("invalid") == -1
def test_has_sufficient_team_role(self):
"""Test role sufficiency checks."""
from app.auth import has_sufficient_team_role
# Same role should be sufficient
assert has_sufficient_team_role("member", "member") is True
assert has_sufficient_team_role("admin", "admin") is True
assert has_sufficient_team_role("owner", "owner") is True
# Higher role should be sufficient for lower requirements
assert has_sufficient_team_role("admin", "member") is True
assert has_sufficient_team_role("owner", "member") is True
assert has_sufficient_team_role("owner", "admin") is True
# Lower role should NOT be sufficient for higher requirements
assert has_sufficient_team_role("member", "admin") is False
assert has_sufficient_team_role("member", "owner") is False
assert has_sufficient_team_role("admin", "owner") is False
class TestTeamAuthorizationService:
"""Tests for TeamAuthorizationService class."""
@pytest.fixture
def mock_db(self):
"""Create a mock database session."""
return MagicMock()
@pytest.fixture
def mock_user(self):
"""Create a mock user."""
user = MagicMock()
user.id = uuid.uuid4()
user.username = "testuser"
user.is_admin = False
return user
@pytest.fixture
def mock_admin_user(self):
"""Create a mock admin user."""
user = MagicMock()
user.id = uuid.uuid4()
user.username = "adminuser"
user.is_admin = True
return user
def test_get_user_team_role_no_user(self, mock_db):
"""Test that None is returned for anonymous users."""
from app.auth import TeamAuthorizationService
service = TeamAuthorizationService(mock_db)
result = service.get_user_team_role("team-id", None)
assert result is None
def test_get_user_team_role_admin_user(self, mock_db, mock_admin_user):
"""Test that system admins who are not members get admin role."""
from app.auth import TeamAuthorizationService
# Mock no membership found
mock_db.query.return_value.filter.return_value.first.return_value = None
service = TeamAuthorizationService(mock_db)
result = service.get_user_team_role("team-id", mock_admin_user)
assert result == "admin"
def test_get_user_team_role_member(self, mock_db, mock_user):
"""Test getting role for a team member."""
from app.auth import TeamAuthorizationService
# Mock the membership query
mock_membership = MagicMock()
mock_membership.role = "member"
mock_db.query.return_value.filter.return_value.first.return_value = mock_membership
service = TeamAuthorizationService(mock_db)
result = service.get_user_team_role("team-id", mock_user)
assert result == "member"
def test_get_user_team_role_not_member(self, mock_db, mock_user):
"""Test getting role for a non-member."""
from app.auth import TeamAuthorizationService
# Mock no membership found
mock_db.query.return_value.filter.return_value.first.return_value = None
service = TeamAuthorizationService(mock_db)
result = service.get_user_team_role("team-id", mock_user)
assert result is None
def test_check_team_access_member(self, mock_db, mock_user):
"""Test access check for member requiring member role."""
from app.auth import TeamAuthorizationService
# Mock the membership query
mock_membership = MagicMock()
mock_membership.role = "member"
mock_db.query.return_value.filter.return_value.first.return_value = mock_membership
service = TeamAuthorizationService(mock_db)
# Member should have member access
assert service.check_team_access("team-id", mock_user, "member") is True
# Member should not have admin access
assert service.check_team_access("team-id", mock_user, "admin") is False
# Member should not have owner access
assert service.check_team_access("team-id", mock_user, "owner") is False
def test_check_team_access_admin(self, mock_db, mock_user):
"""Test access check for admin role."""
from app.auth import TeamAuthorizationService
# Mock admin membership
mock_membership = MagicMock()
mock_membership.role = "admin"
mock_db.query.return_value.filter.return_value.first.return_value = mock_membership
service = TeamAuthorizationService(mock_db)
assert service.check_team_access("team-id", mock_user, "member") is True
assert service.check_team_access("team-id", mock_user, "admin") is True
assert service.check_team_access("team-id", mock_user, "owner") is False
def test_check_team_access_owner(self, mock_db, mock_user):
"""Test access check for owner role."""
from app.auth import TeamAuthorizationService
# Mock owner membership
mock_membership = MagicMock()
mock_membership.role = "owner"
mock_db.query.return_value.filter.return_value.first.return_value = mock_membership
service = TeamAuthorizationService(mock_db)
assert service.check_team_access("team-id", mock_user, "member") is True
assert service.check_team_access("team-id", mock_user, "admin") is True
assert service.check_team_access("team-id", mock_user, "owner") is True
def test_can_create_project(self, mock_db, mock_user):
"""Test can_create_project requires admin role."""
from app.auth import TeamAuthorizationService
service = TeamAuthorizationService(mock_db)
# Member cannot create projects
mock_membership = MagicMock()
mock_membership.role = "member"
mock_db.query.return_value.filter.return_value.first.return_value = mock_membership
assert service.can_create_project("team-id", mock_user) is False
# Admin can create projects
mock_membership.role = "admin"
assert service.can_create_project("team-id", mock_user) is True
# Owner can create projects
mock_membership.role = "owner"
assert service.can_create_project("team-id", mock_user) is True
def test_can_manage_members(self, mock_db, mock_user):
"""Test can_manage_members requires admin role."""
from app.auth import TeamAuthorizationService
service = TeamAuthorizationService(mock_db)
# Member cannot manage members
mock_membership = MagicMock()
mock_membership.role = "member"
mock_db.query.return_value.filter.return_value.first.return_value = mock_membership
assert service.can_manage_members("team-id", mock_user) is False
# Admin can manage members
mock_membership.role = "admin"
assert service.can_manage_members("team-id", mock_user) is True
def test_can_delete_team(self, mock_db, mock_user):
"""Test can_delete_team requires owner role."""
from app.auth import TeamAuthorizationService
service = TeamAuthorizationService(mock_db)
# Member cannot delete team
mock_membership = MagicMock()
mock_membership.role = "member"
mock_db.query.return_value.filter.return_value.first.return_value = mock_membership
assert service.can_delete_team("team-id", mock_user) is False
# Admin cannot delete team
mock_membership.role = "admin"
assert service.can_delete_team("team-id", mock_user) is False
# Only owner can delete team
mock_membership.role = "owner"
assert service.can_delete_team("team-id", mock_user) is True