Implement authentication system with access control UI
This commit is contained in:
1208
backend/app/auth.py
Normal file
1208
backend/app/auth.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -25,6 +25,7 @@ class Settings(BaseSettings):
|
||||
database_pool_recycle: int = (
|
||||
1800 # Recycle connections after this many seconds (30 min)
|
||||
)
|
||||
database_query_timeout: int = 30 # Query timeout in seconds (0 = no timeout)
|
||||
|
||||
# S3
|
||||
s3_endpoint: str = ""
|
||||
@@ -52,6 +53,17 @@ class Settings(BaseSettings):
|
||||
log_level: str = "INFO" # DEBUG, INFO, WARNING, ERROR, CRITICAL
|
||||
log_format: str = "auto" # "json", "standard", or "auto" (json in production)
|
||||
|
||||
# JWT Authentication settings (optional, for external identity providers)
|
||||
jwt_enabled: bool = False # Enable JWT token validation
|
||||
jwt_secret: str = "" # Secret key for HS256, or leave empty for RS256 with JWKS
|
||||
jwt_algorithm: str = "HS256" # HS256 or RS256
|
||||
jwt_issuer: str = "" # Expected issuer (iss claim), leave empty to skip validation
|
||||
jwt_audience: str = "" # Expected audience (aud claim), leave empty to skip validation
|
||||
jwt_jwks_url: str = "" # JWKS URL for RS256 (e.g., https://auth.example.com/.well-known/jwks.json)
|
||||
jwt_username_claim: str = (
|
||||
"sub" # JWT claim to use as username (sub, email, preferred_username, etc.)
|
||||
)
|
||||
|
||||
@property
|
||||
def database_url(self) -> str:
|
||||
sslmode = f"?sslmode={self.database_sslmode}" if self.database_sslmode else ""
|
||||
|
||||
@@ -12,6 +12,12 @@ from .models import Base
|
||||
settings = get_settings()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Build connect_args with query timeout if configured
|
||||
connect_args = {}
|
||||
if settings.database_query_timeout > 0:
|
||||
# PostgreSQL statement_timeout is in milliseconds
|
||||
connect_args["options"] = f"-c statement_timeout={settings.database_query_timeout * 1000}"
|
||||
|
||||
# Create engine with connection pool configuration
|
||||
engine = create_engine(
|
||||
settings.database_url,
|
||||
@@ -21,6 +27,7 @@ engine = create_engine(
|
||||
max_overflow=settings.database_max_overflow,
|
||||
pool_timeout=settings.database_pool_timeout,
|
||||
pool_recycle=settings.database_pool_recycle,
|
||||
connect_args=connect_args,
|
||||
)
|
||||
|
||||
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
|
||||
@@ -1,14 +1,19 @@
|
||||
from fastapi import FastAPI
|
||||
from fastapi import FastAPI, Request
|
||||
from fastapi.staticfiles import StaticFiles
|
||||
from fastapi.responses import FileResponse
|
||||
from contextlib import asynccontextmanager
|
||||
import logging
|
||||
import os
|
||||
|
||||
from slowapi import _rate_limit_exceeded_handler
|
||||
from slowapi.errors import RateLimitExceeded
|
||||
|
||||
from .config import get_settings
|
||||
from .database import init_db, SessionLocal
|
||||
from .routes import router
|
||||
from .seed import seed_database
|
||||
from .auth import create_default_admin
|
||||
from .rate_limit import limiter
|
||||
|
||||
settings = get_settings()
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
@@ -20,6 +25,18 @@ async def lifespan(app: FastAPI):
|
||||
# Startup: initialize database
|
||||
init_db()
|
||||
|
||||
# Create default admin user if no users exist
|
||||
db = SessionLocal()
|
||||
try:
|
||||
admin = create_default_admin(db)
|
||||
if admin:
|
||||
logger.warning(
|
||||
"Default admin user created with username 'admin' and password 'changeme123'. "
|
||||
"CHANGE THIS PASSWORD IMMEDIATELY!"
|
||||
)
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
# Seed test data in development mode
|
||||
if settings.is_development:
|
||||
logger.info(f"Running in {settings.env} mode - checking for seed data")
|
||||
@@ -42,13 +59,21 @@ app = FastAPI(
|
||||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
# Set up rate limiting
|
||||
app.state.limiter = limiter
|
||||
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
|
||||
|
||||
# Include API routes
|
||||
app.include_router(router)
|
||||
|
||||
# Serve static files (React build) if the directory exists
|
||||
static_dir = os.path.join(os.path.dirname(__file__), "..", "..", "frontend", "dist")
|
||||
if os.path.exists(static_dir):
|
||||
app.mount("/assets", StaticFiles(directory=os.path.join(static_dir, "assets")), name="assets")
|
||||
app.mount(
|
||||
"/assets",
|
||||
StaticFiles(directory=os.path.join(static_dir, "assets")),
|
||||
name="assets",
|
||||
)
|
||||
|
||||
@app.get("/")
|
||||
async def serve_spa():
|
||||
@@ -60,6 +85,7 @@ if os.path.exists(static_dir):
|
||||
# Don't catch API routes or health endpoint
|
||||
if full_path.startswith("api/") or full_path.startswith("health"):
|
||||
from fastapi import HTTPException
|
||||
|
||||
raise HTTPException(status_code=404, detail="Not found")
|
||||
|
||||
# Serve SPA for all other routes (including /project/*)
|
||||
@@ -68,4 +94,5 @@ if os.path.exists(static_dir):
|
||||
return FileResponse(index_path)
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
raise HTTPException(status_code=404, detail="Not found")
|
||||
|
||||
@@ -11,6 +11,7 @@ from sqlalchemy import (
|
||||
CheckConstraint,
|
||||
Index,
|
||||
JSON,
|
||||
ARRAY,
|
||||
)
|
||||
from sqlalchemy.dialects.postgresql import UUID
|
||||
from sqlalchemy.orm import relationship, declarative_base
|
||||
@@ -302,20 +303,104 @@ class AccessPermission(Base):
|
||||
)
|
||||
|
||||
|
||||
class User(Base):
|
||||
"""User account for authentication."""
|
||||
|
||||
__tablename__ = "users"
|
||||
|
||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||||
username = Column(String(255), unique=True, nullable=False)
|
||||
password_hash = Column(String(255)) # NULL if OIDC-only user
|
||||
email = Column(String(255))
|
||||
is_admin = Column(Boolean, default=False)
|
||||
is_active = Column(Boolean, default=True)
|
||||
must_change_password = Column(Boolean, default=False)
|
||||
oidc_subject = Column(String(255)) # OIDC subject claim
|
||||
oidc_issuer = Column(String(512)) # OIDC issuer URL
|
||||
created_at = Column(DateTime(timezone=True), default=datetime.utcnow)
|
||||
updated_at = Column(
|
||||
DateTime(timezone=True), default=datetime.utcnow, onupdate=datetime.utcnow
|
||||
)
|
||||
last_login = Column(DateTime(timezone=True))
|
||||
|
||||
# Relationships
|
||||
api_keys = relationship(
|
||||
"APIKey", back_populates="owner", cascade="all, delete-orphan"
|
||||
)
|
||||
sessions = relationship(
|
||||
"Session", back_populates="user", cascade="all, delete-orphan"
|
||||
)
|
||||
|
||||
__table_args__ = (
|
||||
Index("idx_users_username", "username"),
|
||||
Index("idx_users_email", "email"),
|
||||
Index("idx_users_oidc_subject", "oidc_subject"),
|
||||
)
|
||||
|
||||
|
||||
class Session(Base):
|
||||
"""User session for web login."""
|
||||
|
||||
__tablename__ = "sessions"
|
||||
|
||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||||
user_id = Column(
|
||||
UUID(as_uuid=True),
|
||||
ForeignKey("users.id", ondelete="CASCADE"),
|
||||
nullable=False,
|
||||
)
|
||||
token_hash = Column(String(64), unique=True, nullable=False)
|
||||
created_at = Column(DateTime(timezone=True), default=datetime.utcnow)
|
||||
expires_at = Column(DateTime(timezone=True), nullable=False)
|
||||
last_accessed = Column(DateTime(timezone=True), default=datetime.utcnow)
|
||||
user_agent = Column(String(512))
|
||||
ip_address = Column(String(45))
|
||||
|
||||
user = relationship("User", back_populates="sessions")
|
||||
|
||||
__table_args__ = (
|
||||
Index("idx_sessions_user_id", "user_id"),
|
||||
Index("idx_sessions_token_hash", "token_hash"),
|
||||
Index("idx_sessions_expires_at", "expires_at"),
|
||||
)
|
||||
|
||||
|
||||
class AuthSettings(Base):
|
||||
"""Authentication settings for OIDC configuration."""
|
||||
|
||||
__tablename__ = "auth_settings"
|
||||
|
||||
key = Column(String(255), primary_key=True)
|
||||
value = Column(Text, nullable=False)
|
||||
updated_at = Column(DateTime(timezone=True), default=datetime.utcnow)
|
||||
|
||||
|
||||
class APIKey(Base):
|
||||
__tablename__ = "api_keys"
|
||||
|
||||
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
|
||||
key_hash = Column(String(64), unique=True, nullable=False)
|
||||
name = Column(String(255), nullable=False)
|
||||
user_id = Column(String(255), nullable=False)
|
||||
user_id = Column(
|
||||
String(255), nullable=False
|
||||
) # Legacy field, kept for compatibility
|
||||
owner_id = Column(
|
||||
UUID(as_uuid=True),
|
||||
ForeignKey("users.id", ondelete="CASCADE"),
|
||||
nullable=True, # Nullable for migration compatibility
|
||||
)
|
||||
description = Column(Text)
|
||||
scopes = Column(ARRAY(String), default=["read", "write"])
|
||||
created_at = Column(DateTime(timezone=True), default=datetime.utcnow)
|
||||
expires_at = Column(DateTime(timezone=True))
|
||||
last_used = Column(DateTime(timezone=True))
|
||||
|
||||
owner = relationship("User", back_populates="api_keys")
|
||||
|
||||
__table_args__ = (
|
||||
Index("idx_api_keys_user_id", "user_id"),
|
||||
Index("idx_api_keys_key_hash", "key_hash"),
|
||||
Index("idx_api_keys_owner_id", "owner_id"),
|
||||
)
|
||||
|
||||
|
||||
|
||||
16
backend/app/rate_limit.py
Normal file
16
backend/app/rate_limit.py
Normal file
@@ -0,0 +1,16 @@
|
||||
"""Rate limiting configuration for Orchard API.
|
||||
|
||||
Uses slowapi for rate limiting with IP-based keys.
|
||||
"""
|
||||
|
||||
import os
|
||||
from slowapi import Limiter
|
||||
from slowapi.util import get_remote_address
|
||||
|
||||
# Rate limiter - uses IP address as key
|
||||
limiter = Limiter(key_func=get_remote_address)
|
||||
|
||||
# Rate limit strings - configurable via environment for testing
|
||||
# Default: 5 login attempts per minute per IP
|
||||
# In tests: set ORCHARD_LOGIN_RATE_LIMIT to a high value like "1000/minute"
|
||||
LOGIN_RATE_LIMIT = os.environ.get("ORCHARD_LOGIN_RATE_LIMIT", "5/minute")
|
||||
File diff suppressed because it is too large
Load Diff
@@ -47,6 +47,13 @@ class ProjectUpdate(BaseModel):
|
||||
is_public: Optional[bool] = None
|
||||
|
||||
|
||||
class ProjectWithAccessResponse(ProjectResponse):
|
||||
"""Project response with user's access level included"""
|
||||
|
||||
access_level: Optional[str] = None # 'read', 'write', 'admin', or None
|
||||
is_owner: bool = False
|
||||
|
||||
|
||||
# Package format and platform enums
|
||||
PACKAGE_FORMATS = [
|
||||
"generic",
|
||||
@@ -686,3 +693,173 @@ class StatsReportResponse(BaseModel):
|
||||
format: str # "json", "csv", "markdown"
|
||||
generated_at: datetime
|
||||
content: str # The report content
|
||||
|
||||
|
||||
# Authentication schemas
|
||||
class LoginRequest(BaseModel):
|
||||
"""Login request with username and password"""
|
||||
username: str
|
||||
password: str
|
||||
|
||||
|
||||
class LoginResponse(BaseModel):
|
||||
"""Login response with user info"""
|
||||
id: UUID
|
||||
username: str
|
||||
email: Optional[str]
|
||||
is_admin: bool
|
||||
must_change_password: bool
|
||||
|
||||
|
||||
class ChangePasswordRequest(BaseModel):
|
||||
"""Change password request"""
|
||||
current_password: str
|
||||
new_password: str
|
||||
|
||||
|
||||
class UserResponse(BaseModel):
|
||||
"""User information response"""
|
||||
id: UUID
|
||||
username: str
|
||||
email: Optional[str]
|
||||
is_admin: bool
|
||||
is_active: bool
|
||||
must_change_password: bool
|
||||
created_at: datetime
|
||||
last_login: Optional[datetime]
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class UserCreate(BaseModel):
|
||||
"""Create user request (admin only)"""
|
||||
username: str
|
||||
password: str
|
||||
email: Optional[str] = None
|
||||
is_admin: bool = False
|
||||
|
||||
|
||||
class UserUpdate(BaseModel):
|
||||
"""Update user request (admin only)"""
|
||||
email: Optional[str] = None
|
||||
is_admin: Optional[bool] = None
|
||||
is_active: Optional[bool] = None
|
||||
|
||||
|
||||
class ResetPasswordRequest(BaseModel):
|
||||
"""Reset password request (admin only)"""
|
||||
new_password: str
|
||||
|
||||
|
||||
class APIKeyCreate(BaseModel):
|
||||
"""Create API key request"""
|
||||
name: str
|
||||
description: Optional[str] = None
|
||||
scopes: Optional[List[str]] = None
|
||||
|
||||
|
||||
class APIKeyResponse(BaseModel):
|
||||
"""API key response (without the secret key)"""
|
||||
id: UUID
|
||||
name: str
|
||||
description: Optional[str]
|
||||
scopes: Optional[List[str]]
|
||||
created_at: datetime
|
||||
expires_at: Optional[datetime]
|
||||
last_used: Optional[datetime]
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class APIKeyCreateResponse(BaseModel):
|
||||
"""API key creation response (includes the secret key - only shown once)"""
|
||||
id: UUID
|
||||
name: str
|
||||
description: Optional[str]
|
||||
scopes: Optional[List[str]]
|
||||
key: str # The actual API key - only returned on creation
|
||||
created_at: datetime
|
||||
expires_at: Optional[datetime]
|
||||
|
||||
|
||||
# OIDC Configuration schemas
|
||||
class OIDCConfigResponse(BaseModel):
|
||||
"""OIDC configuration response (hides client secret)"""
|
||||
enabled: bool
|
||||
issuer_url: str
|
||||
client_id: str
|
||||
has_client_secret: bool # True if secret is configured, but don't expose it
|
||||
scopes: List[str]
|
||||
auto_create_users: bool
|
||||
admin_group: str
|
||||
|
||||
|
||||
class OIDCConfigUpdate(BaseModel):
|
||||
"""Update OIDC configuration"""
|
||||
enabled: Optional[bool] = None
|
||||
issuer_url: Optional[str] = None
|
||||
client_id: Optional[str] = None
|
||||
client_secret: Optional[str] = None # Only set if changing
|
||||
scopes: Optional[List[str]] = None
|
||||
auto_create_users: Optional[bool] = None
|
||||
admin_group: Optional[str] = None
|
||||
|
||||
|
||||
class OIDCStatusResponse(BaseModel):
|
||||
"""Public OIDC status response"""
|
||||
enabled: bool
|
||||
issuer_url: Optional[str] = None # Only included if enabled
|
||||
|
||||
|
||||
class OIDCLoginResponse(BaseModel):
|
||||
"""OIDC login initiation response"""
|
||||
authorization_url: str
|
||||
|
||||
|
||||
# Access Permission schemas
|
||||
class AccessPermissionCreate(BaseModel):
|
||||
"""Grant access to a user for a project"""
|
||||
username: str
|
||||
level: str # 'read', 'write', or 'admin'
|
||||
expires_at: Optional[datetime] = None
|
||||
|
||||
@field_validator('level')
|
||||
@classmethod
|
||||
def validate_level(cls, v):
|
||||
if v not in ('read', 'write', 'admin'):
|
||||
raise ValueError("level must be 'read', 'write', or 'admin'")
|
||||
return v
|
||||
|
||||
|
||||
class AccessPermissionUpdate(BaseModel):
|
||||
"""Update access permission"""
|
||||
level: Optional[str] = None
|
||||
expires_at: Optional[datetime] = None
|
||||
|
||||
@field_validator('level')
|
||||
@classmethod
|
||||
def validate_level(cls, v):
|
||||
if v is not None and v not in ('read', 'write', 'admin'):
|
||||
raise ValueError("level must be 'read', 'write', or 'admin'")
|
||||
return v
|
||||
|
||||
|
||||
class AccessPermissionResponse(BaseModel):
|
||||
"""Access permission response"""
|
||||
id: UUID
|
||||
project_id: UUID
|
||||
user_id: str
|
||||
level: str
|
||||
created_at: datetime
|
||||
expires_at: Optional[datetime]
|
||||
|
||||
class Config:
|
||||
from_attributes = True
|
||||
|
||||
|
||||
class ProjectWithAccessResponse(ProjectResponse):
|
||||
"""Project response with user's access level"""
|
||||
user_access_level: Optional[str] = None
|
||||
|
||||
|
||||
@@ -9,6 +9,8 @@ pydantic==2.5.3
|
||||
pydantic-settings==2.1.0
|
||||
python-jose[cryptography]==3.3.0
|
||||
passlib[bcrypt]==1.7.4
|
||||
bcrypt==4.0.1
|
||||
slowapi==0.1.9
|
||||
|
||||
# Test dependencies
|
||||
pytest>=7.4.0
|
||||
|
||||
@@ -182,9 +182,10 @@ def test_app():
|
||||
@pytest.fixture
|
||||
def integration_client():
|
||||
"""
|
||||
Create a test client for integration tests.
|
||||
Create an authenticated test client for integration tests.
|
||||
|
||||
Uses the real database and MinIO from docker-compose.local.yml.
|
||||
Authenticates as admin for write operations.
|
||||
"""
|
||||
from httpx import Client
|
||||
|
||||
@@ -192,6 +193,15 @@ def integration_client():
|
||||
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
||||
|
||||
with Client(base_url=base_url, timeout=30.0) as client:
|
||||
# Login as admin to enable write operations
|
||||
login_response = client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
# If login fails, tests will fail - that's expected if auth is broken
|
||||
if login_response.status_code != 200:
|
||||
# Try to continue without auth for backward compatibility
|
||||
pass
|
||||
yield client
|
||||
|
||||
|
||||
|
||||
760
backend/tests/integration/test_auth_api.py
Normal file
760
backend/tests/integration/test_auth_api.py
Normal file
@@ -0,0 +1,760 @@
|
||||
"""Integration tests for authentication API endpoints."""
|
||||
|
||||
import pytest
|
||||
from uuid import uuid4
|
||||
|
||||
|
||||
class TestAuthLogin:
|
||||
"""Tests for login endpoint."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_login_success(self, integration_client):
|
||||
"""Test successful login with default admin credentials."""
|
||||
response = integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["username"] == "admin"
|
||||
assert data["is_admin"] is True
|
||||
assert "orchard_session" in response.cookies
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_login_invalid_password(self, integration_client):
|
||||
"""Test login with wrong password."""
|
||||
response = integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "wrongpassword"},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
assert "Invalid username or password" in response.json()["detail"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_login_nonexistent_user(self, integration_client):
|
||||
"""Test login with non-existent user."""
|
||||
response = integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "nonexistent", "password": "password"},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
class TestAuthLogout:
|
||||
"""Tests for logout endpoint."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_logout_success(self, integration_client):
|
||||
"""Test successful logout."""
|
||||
# First login
|
||||
login_response = integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
assert login_response.status_code == 200
|
||||
|
||||
# Then logout
|
||||
logout_response = integration_client.post("/api/v1/auth/logout")
|
||||
assert logout_response.status_code == 200
|
||||
assert "Logged out successfully" in logout_response.json()["message"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_logout_without_session(self, integration_client):
|
||||
"""Test logout without being logged in."""
|
||||
response = integration_client.post("/api/v1/auth/logout")
|
||||
# Should succeed even without session
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
class TestAuthMe:
|
||||
"""Tests for get current user endpoint."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_get_me_authenticated(self, integration_client):
|
||||
"""Test getting current user when authenticated."""
|
||||
# Login first
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
response = integration_client.get("/api/v1/auth/me")
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["username"] == "admin"
|
||||
assert data["is_admin"] is True
|
||||
assert "id" in data
|
||||
assert "created_at" in data
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_get_me_unauthenticated(self, integration_client):
|
||||
"""Test getting current user without authentication."""
|
||||
# Clear any existing cookies
|
||||
integration_client.cookies.clear()
|
||||
|
||||
response = integration_client.get("/api/v1/auth/me")
|
||||
assert response.status_code == 401
|
||||
assert "Not authenticated" in response.json()["detail"]
|
||||
|
||||
|
||||
class TestAuthChangePassword:
|
||||
"""Tests for change password endpoint."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_change_password_success(self, integration_client):
|
||||
"""Test successful password change."""
|
||||
# Login first
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
# Change password
|
||||
response = integration_client.post(
|
||||
"/api/v1/auth/change-password",
|
||||
json={"current_password": "changeme123", "new_password": "newpassword123"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
# Verify old password no longer works
|
||||
integration_client.cookies.clear()
|
||||
response = integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
# Verify new password works
|
||||
response = integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "newpassword123"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
# Reset password back to original for other tests
|
||||
reset_response = integration_client.post(
|
||||
"/api/v1/auth/change-password",
|
||||
json={"current_password": "newpassword123", "new_password": "changeme123"},
|
||||
)
|
||||
assert reset_response.status_code == 200, "Failed to reset admin password back to default"
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_change_password_wrong_current(self, integration_client):
|
||||
"""Test password change with wrong current password."""
|
||||
# Login first
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
response = integration_client.post(
|
||||
"/api/v1/auth/change-password",
|
||||
json={"current_password": "wrongpassword", "new_password": "newpassword"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "Current password is incorrect" in response.json()["detail"]
|
||||
|
||||
|
||||
class TestAPIKeys:
|
||||
"""Tests for API key management endpoints."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_create_and_list_api_key(self, integration_client):
|
||||
"""Test creating and listing API keys."""
|
||||
# Login first
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
# Create API key
|
||||
create_response = integration_client.post(
|
||||
"/api/v1/auth/keys",
|
||||
json={"name": "test-key", "description": "Test API key"},
|
||||
)
|
||||
assert create_response.status_code == 200
|
||||
data = create_response.json()
|
||||
assert data["name"] == "test-key"
|
||||
assert data["description"] == "Test API key"
|
||||
assert "key" in data
|
||||
assert data["key"].startswith("orch_")
|
||||
key_id = data["id"]
|
||||
api_key = data["key"]
|
||||
|
||||
# List API keys
|
||||
list_response = integration_client.get("/api/v1/auth/keys")
|
||||
assert list_response.status_code == 200
|
||||
keys = list_response.json()
|
||||
assert any(k["id"] == key_id for k in keys)
|
||||
|
||||
# Clean up - delete the key
|
||||
integration_client.delete(f"/api/v1/auth/keys/{key_id}")
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_use_api_key_for_auth(self, integration_client):
|
||||
"""Test using API key for authentication."""
|
||||
# Login and create API key
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
create_response = integration_client.post(
|
||||
"/api/v1/auth/keys",
|
||||
json={"name": "auth-test-key"},
|
||||
)
|
||||
api_key = create_response.json()["key"]
|
||||
key_id = create_response.json()["id"]
|
||||
|
||||
# Clear cookies and use API key
|
||||
integration_client.cookies.clear()
|
||||
response = integration_client.get(
|
||||
"/api/v1/auth/me",
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json()["username"] == "admin"
|
||||
|
||||
# Clean up
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
integration_client.delete(f"/api/v1/auth/keys/{key_id}")
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_delete_api_key(self, integration_client):
|
||||
"""Test revoking an API key."""
|
||||
# Login and create API key
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
create_response = integration_client.post(
|
||||
"/api/v1/auth/keys",
|
||||
json={"name": "delete-test-key"},
|
||||
)
|
||||
key_id = create_response.json()["id"]
|
||||
api_key = create_response.json()["key"]
|
||||
|
||||
# Delete the key
|
||||
delete_response = integration_client.delete(f"/api/v1/auth/keys/{key_id}")
|
||||
assert delete_response.status_code == 200
|
||||
|
||||
# Verify key no longer works
|
||||
integration_client.cookies.clear()
|
||||
response = integration_client.get(
|
||||
"/api/v1/auth/me",
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
|
||||
|
||||
class TestAdminUserManagement:
|
||||
"""Tests for admin user management endpoints."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_list_users(self, integration_client):
|
||||
"""Test listing users as admin."""
|
||||
# Login as admin
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
response = integration_client.get("/api/v1/admin/users")
|
||||
assert response.status_code == 200
|
||||
users = response.json()
|
||||
assert len(users) >= 1
|
||||
assert any(u["username"] == "admin" for u in users)
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_create_user(self, integration_client):
|
||||
"""Test creating a new user as admin."""
|
||||
# Login as admin
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
# Create new user
|
||||
test_username = f"testuser_{uuid4().hex[:8]}"
|
||||
response = integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={
|
||||
"username": test_username,
|
||||
"password": "testpassword",
|
||||
"email": "test@example.com",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["username"] == test_username
|
||||
assert data["email"] == "test@example.com"
|
||||
assert data["is_admin"] is False
|
||||
|
||||
# Verify new user can login
|
||||
integration_client.cookies.clear()
|
||||
login_response = integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": test_username, "password": "testpassword"},
|
||||
)
|
||||
assert login_response.status_code == 200
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_update_user(self, integration_client):
|
||||
"""Test updating a user as admin."""
|
||||
# Login as admin
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
# Create a test user
|
||||
test_username = f"updateuser_{uuid4().hex[:8]}"
|
||||
integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password"},
|
||||
)
|
||||
|
||||
# Update the user
|
||||
response = integration_client.put(
|
||||
f"/api/v1/admin/users/{test_username}",
|
||||
json={"email": "updated@example.com", "is_admin": True},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["email"] == "updated@example.com"
|
||||
assert data["is_admin"] is True
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_reset_user_password(self, integration_client):
|
||||
"""Test resetting a user's password as admin."""
|
||||
# Login as admin
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
# Create a test user
|
||||
test_username = f"resetuser_{uuid4().hex[:8]}"
|
||||
integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "oldpassword"},
|
||||
)
|
||||
|
||||
# Reset password
|
||||
response = integration_client.post(
|
||||
f"/api/v1/admin/users/{test_username}/reset-password",
|
||||
json={"new_password": "newpassword"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
# Verify new password works
|
||||
integration_client.cookies.clear()
|
||||
login_response = integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": test_username, "password": "newpassword"},
|
||||
)
|
||||
assert login_response.status_code == 200
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_non_admin_cannot_access_admin_endpoints(self, integration_client):
|
||||
"""Test that non-admin users cannot access admin endpoints."""
|
||||
# Login as admin and create non-admin user
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
test_username = f"nonadmin_{uuid4().hex[:8]}"
|
||||
integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password", "is_admin": False},
|
||||
)
|
||||
|
||||
# Login as non-admin
|
||||
integration_client.cookies.clear()
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": test_username, "password": "password"},
|
||||
)
|
||||
|
||||
# Try to access admin endpoints
|
||||
response = integration_client.get("/api/v1/admin/users")
|
||||
assert response.status_code == 403
|
||||
assert "Admin privileges required" in response.json()["detail"]
|
||||
|
||||
|
||||
class TestSecurityEdgeCases:
|
||||
"""Tests for security edge cases and validation."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_login_inactive_user(self, integration_client):
|
||||
"""Test that inactive users cannot login."""
|
||||
# Login as admin and create a user
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
test_username = f"inactive_{uuid4().hex[:8]}"
|
||||
integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
|
||||
# Deactivate the user
|
||||
integration_client.put(
|
||||
f"/api/v1/admin/users/{test_username}",
|
||||
json={"is_active": False},
|
||||
)
|
||||
|
||||
# Try to login as inactive user
|
||||
integration_client.cookies.clear()
|
||||
response = integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
assert "Invalid username or password" in response.json()["detail"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_password_too_short_on_create(self, integration_client):
|
||||
"""Test that short passwords are rejected when creating users."""
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
response = integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": f"shortpw_{uuid4().hex[:8]}", "password": "short"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "at least 8 characters" in response.json()["detail"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_password_too_short_on_change(self, integration_client):
|
||||
"""Test that short passwords are rejected when changing password."""
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
response = integration_client.post(
|
||||
"/api/v1/auth/change-password",
|
||||
json={"current_password": "changeme123", "new_password": "short"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "at least 8 characters" in response.json()["detail"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_password_too_short_on_reset(self, integration_client):
|
||||
"""Test that short passwords are rejected when resetting password."""
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
# Create a test user first
|
||||
test_username = f"resetshort_{uuid4().hex[:8]}"
|
||||
integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
|
||||
response = integration_client.post(
|
||||
f"/api/v1/admin/users/{test_username}/reset-password",
|
||||
json={"new_password": "short"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "at least 8 characters" in response.json()["detail"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_duplicate_username_rejected(self, integration_client):
|
||||
"""Test that duplicate usernames are rejected."""
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
test_username = f"duplicate_{uuid4().hex[:8]}"
|
||||
# Create user first time
|
||||
response1 = integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
assert response1.status_code == 200
|
||||
|
||||
# Try to create same username again
|
||||
response2 = integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password456"},
|
||||
)
|
||||
assert response2.status_code == 409
|
||||
assert "already exists" in response2.json()["detail"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_cannot_delete_other_users_api_key(self, integration_client):
|
||||
"""Test that users cannot delete API keys owned by other users."""
|
||||
# Login as admin and create an API key
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
create_response = integration_client.post(
|
||||
"/api/v1/auth/keys",
|
||||
json={"name": "admin-key"},
|
||||
)
|
||||
admin_key_id = create_response.json()["id"]
|
||||
|
||||
# Create a non-admin user
|
||||
test_username = f"nonadmin_{uuid4().hex[:8]}"
|
||||
integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
|
||||
# Login as non-admin
|
||||
integration_client.cookies.clear()
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
|
||||
# Try to delete admin's API key
|
||||
response = integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}")
|
||||
assert response.status_code == 403
|
||||
assert "Cannot delete another user's API key" in response.json()["detail"]
|
||||
|
||||
# Cleanup: login as admin and delete the key
|
||||
integration_client.cookies.clear()
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}")
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_sessions_invalidated_on_password_change(self, integration_client):
|
||||
"""Test that all sessions are invalidated when password is changed."""
|
||||
# Create a test user
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
test_username = f"sessiontest_{uuid4().hex[:8]}"
|
||||
integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
|
||||
# Login as test user
|
||||
integration_client.cookies.clear()
|
||||
login_response = integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
assert login_response.status_code == 200
|
||||
|
||||
# Verify session works
|
||||
me_response = integration_client.get("/api/v1/auth/me")
|
||||
assert me_response.status_code == 200
|
||||
|
||||
# Change password
|
||||
integration_client.post(
|
||||
"/api/v1/auth/change-password",
|
||||
json={"current_password": "password123", "new_password": "newpassword123"},
|
||||
)
|
||||
|
||||
# Old session should be invalidated - try to access /me
|
||||
# (note: the change-password call itself may have cleared the session cookie)
|
||||
me_response2 = integration_client.get("/api/v1/auth/me")
|
||||
# This should fail because all sessions were invalidated
|
||||
assert me_response2.status_code == 401
|
||||
|
||||
|
||||
class TestSecurityEdgeCases:
|
||||
"""Tests for security edge cases and validation."""
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_login_inactive_user(self, integration_client):
|
||||
"""Test that inactive users cannot login."""
|
||||
# Login as admin and create a user
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
test_username = f"inactive_{uuid4().hex[:8]}"
|
||||
integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
|
||||
# Deactivate the user
|
||||
integration_client.put(
|
||||
f"/api/v1/admin/users/{test_username}",
|
||||
json={"is_active": False},
|
||||
)
|
||||
|
||||
# Try to login as inactive user
|
||||
integration_client.cookies.clear()
|
||||
response = integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
assert response.status_code == 401
|
||||
assert "Invalid username or password" in response.json()["detail"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_password_too_short_on_create(self, integration_client):
|
||||
"""Test that short passwords are rejected when creating users."""
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
response = integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": f"shortpw_{uuid4().hex[:8]}", "password": "short"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "at least 8 characters" in response.json()["detail"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_password_too_short_on_change(self, integration_client):
|
||||
"""Test that short passwords are rejected when changing password."""
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
response = integration_client.post(
|
||||
"/api/v1/auth/change-password",
|
||||
json={"current_password": "changeme123", "new_password": "short"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "at least 8 characters" in response.json()["detail"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_password_too_short_on_reset(self, integration_client):
|
||||
"""Test that short passwords are rejected when resetting password."""
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
# Create a test user first
|
||||
test_username = f"resetshort_{uuid4().hex[:8]}"
|
||||
integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
|
||||
response = integration_client.post(
|
||||
f"/api/v1/admin/users/{test_username}/reset-password",
|
||||
json={"new_password": "short"},
|
||||
)
|
||||
assert response.status_code == 400
|
||||
assert "at least 8 characters" in response.json()["detail"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_duplicate_username_rejected(self, integration_client):
|
||||
"""Test that duplicate usernames are rejected."""
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
|
||||
test_username = f"duplicate_{uuid4().hex[:8]}"
|
||||
# Create user first time
|
||||
response1 = integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
assert response1.status_code == 200
|
||||
|
||||
# Try to create same username again
|
||||
response2 = integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password456"},
|
||||
)
|
||||
assert response2.status_code == 409
|
||||
assert "already exists" in response2.json()["detail"]
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_cannot_delete_other_users_api_key(self, integration_client):
|
||||
"""Test that users cannot delete API keys owned by other users."""
|
||||
# Login as admin and create an API key
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
create_response = integration_client.post(
|
||||
"/api/v1/auth/keys",
|
||||
json={"name": "admin-key"},
|
||||
)
|
||||
admin_key_id = create_response.json()["id"]
|
||||
|
||||
# Create a non-admin user
|
||||
test_username = f"nonadmin_{uuid4().hex[:8]}"
|
||||
integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
|
||||
# Login as non-admin
|
||||
integration_client.cookies.clear()
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
|
||||
# Try to delete admin's API key
|
||||
response = integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}")
|
||||
assert response.status_code == 403
|
||||
assert "Cannot delete another user's API key" in response.json()["detail"]
|
||||
|
||||
# Cleanup: login as admin and delete the key
|
||||
integration_client.cookies.clear()
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}")
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_sessions_invalidated_on_password_change(self, integration_client):
|
||||
"""Test that all sessions are invalidated when password is changed."""
|
||||
# Create a test user
|
||||
integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": "admin", "password": "changeme123"},
|
||||
)
|
||||
test_username = f"sessiontest_{uuid4().hex[:8]}"
|
||||
integration_client.post(
|
||||
"/api/v1/admin/users",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
|
||||
# Login as test user
|
||||
integration_client.cookies.clear()
|
||||
login_response = integration_client.post(
|
||||
"/api/v1/auth/login",
|
||||
json={"username": test_username, "password": "password123"},
|
||||
)
|
||||
assert login_response.status_code == 200
|
||||
|
||||
# Verify session works
|
||||
me_response = integration_client.get("/api/v1/auth/me")
|
||||
assert me_response.status_code == 200
|
||||
|
||||
# Change password
|
||||
integration_client.post(
|
||||
"/api/v1/auth/change-password",
|
||||
json={"current_password": "password123", "new_password": "newpassword123"},
|
||||
)
|
||||
|
||||
# Old session should be invalidated - try to access /me
|
||||
# (note: the change-password call itself may have cleared the session cookie)
|
||||
me_response2 = integration_client.get("/api/v1/auth/me")
|
||||
# This should fail because all sessions were invalidated
|
||||
assert me_response2.status_code == 401
|
||||
@@ -59,7 +59,8 @@ class TestProjectCRUD:
|
||||
@pytest.mark.integration
|
||||
def test_list_projects(self, integration_client, test_project):
|
||||
"""Test listing projects includes created project."""
|
||||
response = integration_client.get("/api/v1/projects")
|
||||
# Search specifically for our test project to avoid pagination issues
|
||||
response = integration_client.get(f"/api/v1/projects?search={test_project}")
|
||||
assert response.status_code == 200
|
||||
|
||||
data = response.json()
|
||||
@@ -107,9 +108,11 @@ class TestProjectListingFilters:
|
||||
@pytest.mark.integration
|
||||
def test_projects_search(self, integration_client, test_project):
|
||||
"""Test project search by name."""
|
||||
# Search for our test project
|
||||
# Search using the unique portion of our test project name
|
||||
# test_project format is "test-project-test-{uuid[:8]}"
|
||||
unique_part = test_project.split("-")[-1] # Get the UUID portion
|
||||
response = integration_client.get(
|
||||
f"/api/v1/projects?search={test_project[:10]}"
|
||||
f"/api/v1/projects?search={unique_part}"
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
@@ -286,6 +286,14 @@ class TestConcurrentUploads:
|
||||
expected_hash = compute_sha256(content)
|
||||
num_concurrent = 5
|
||||
|
||||
# Create an API key for worker threads
|
||||
api_key_response = integration_client.post(
|
||||
"/api/v1/auth/keys",
|
||||
json={"name": "concurrent-test-key"},
|
||||
)
|
||||
assert api_key_response.status_code == 200, f"Failed to create API key: {api_key_response.text}"
|
||||
api_key = api_key_response.json()["key"]
|
||||
|
||||
results = []
|
||||
errors = []
|
||||
|
||||
@@ -306,6 +314,7 @@ class TestConcurrentUploads:
|
||||
f"/api/v1/project/{project}/{package}/upload",
|
||||
files=files,
|
||||
data={"tag": f"concurrent-{tag_suffix}"},
|
||||
headers={"Authorization": f"Bearer {api_key}"},
|
||||
)
|
||||
if response.status_code == 200:
|
||||
results.append(response.json())
|
||||
|
||||
Reference in New Issue
Block a user