Add security hardening and additional auth tests
Security improvements: - Add password strength validation (min 8 characters) - Invalidate all sessions on password change/reset - Add timing-safe user lookup to prevent enumeration attacks - Fix SQLAlchemy boolean comparisons (== True -> is_(True)) - Change default admin password to 'changeme123' (meets min length) New tests (7 additional): - Inactive user login attempt blocked - Short password rejected on create/change/reset - Duplicate username rejected (409) - Non-owner API key deletion blocked (403) - Sessions invalidated on password change
This commit is contained in:
@@ -22,6 +22,26 @@ API_KEY_PREFIX = "orch_"
|
|||||||
# Session duration (24 hours default)
|
# Session duration (24 hours default)
|
||||||
SESSION_DURATION_HOURS = 24
|
SESSION_DURATION_HOURS = 24
|
||||||
|
|
||||||
|
# Password requirements
|
||||||
|
MIN_PASSWORD_LENGTH = 8
|
||||||
|
|
||||||
|
|
||||||
|
class PasswordTooShortError(ValueError):
|
||||||
|
"""Raised when password doesn't meet minimum length requirement."""
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def validate_password_strength(password: str) -> None:
|
||||||
|
"""Validate password meets minimum requirements.
|
||||||
|
|
||||||
|
Raises PasswordTooShortError if password is too short.
|
||||||
|
"""
|
||||||
|
if not password or len(password) < MIN_PASSWORD_LENGTH:
|
||||||
|
raise PasswordTooShortError(
|
||||||
|
f"Password must be at least {MIN_PASSWORD_LENGTH} characters"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def hash_password(password: str) -> str:
|
def hash_password(password: str) -> str:
|
||||||
"""Hash a password using bcrypt."""
|
"""Hash a password using bcrypt."""
|
||||||
@@ -93,23 +113,37 @@ class AuthService:
|
|||||||
"""Authenticate a user with username and password.
|
"""Authenticate a user with username and password.
|
||||||
|
|
||||||
Returns the user if authentication succeeds, None otherwise.
|
Returns the user if authentication succeeds, None otherwise.
|
||||||
|
Uses constant-time comparison to prevent timing-based user enumeration.
|
||||||
"""
|
"""
|
||||||
user = self.get_user_by_username(username)
|
user = self.get_user_by_username(username)
|
||||||
|
|
||||||
|
# Always perform password verification to prevent timing attacks
|
||||||
|
# Use a dummy hash if user doesn't exist
|
||||||
|
dummy_hash = "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYA1vQ9S9sXa"
|
||||||
|
password_hash = user.password_hash if user and user.password_hash else dummy_hash
|
||||||
|
|
||||||
|
# Verify password (constant time even if user doesn't exist)
|
||||||
|
password_valid = verify_password(password, password_hash)
|
||||||
|
|
||||||
|
# Check all conditions
|
||||||
if not user:
|
if not user:
|
||||||
return None
|
return None
|
||||||
if not user.password_hash:
|
if not user.password_hash:
|
||||||
return None # OIDC-only user
|
return None # OIDC-only user
|
||||||
if not user.is_active:
|
if not user.is_active:
|
||||||
return None
|
return None
|
||||||
if not verify_password(password, user.password_hash):
|
if not password_valid:
|
||||||
return None
|
return None
|
||||||
return user
|
return user
|
||||||
|
|
||||||
def change_password(self, user: User, new_password: str) -> None:
|
def change_password(self, user: User, new_password: str) -> None:
|
||||||
"""Change a user's password."""
|
"""Change a user's password and invalidate all existing sessions."""
|
||||||
|
validate_password_strength(new_password)
|
||||||
user.password_hash = hash_password(new_password)
|
user.password_hash = hash_password(new_password)
|
||||||
user.must_change_password = False
|
user.must_change_password = False
|
||||||
self.db.commit()
|
self.db.commit()
|
||||||
|
# Invalidate all existing sessions for security
|
||||||
|
self.delete_user_sessions(user)
|
||||||
|
|
||||||
def update_last_login(self, user: User) -> None:
|
def update_last_login(self, user: User) -> None:
|
||||||
"""Update the user's last login timestamp."""
|
"""Update the user's last login timestamp."""
|
||||||
@@ -120,7 +154,7 @@ class AuthService:
|
|||||||
"""List all users."""
|
"""List all users."""
|
||||||
query = self.db.query(User)
|
query = self.db.query(User)
|
||||||
if not include_inactive:
|
if not include_inactive:
|
||||||
query = query.filter(User.is_active == True)
|
query = query.filter(User.is_active.is_(True))
|
||||||
return query.order_by(User.username).all()
|
return query.order_by(User.username).all()
|
||||||
|
|
||||||
def set_user_active(self, user: User, is_active: bool) -> None:
|
def set_user_active(self, user: User, is_active: bool) -> None:
|
||||||
@@ -134,10 +168,13 @@ class AuthService:
|
|||||||
self.db.commit()
|
self.db.commit()
|
||||||
|
|
||||||
def reset_user_password(self, user: User, new_password: str) -> None:
|
def reset_user_password(self, user: User, new_password: str) -> None:
|
||||||
"""Reset a user's password (admin action)."""
|
"""Reset a user's password (admin action) and invalidate all sessions."""
|
||||||
|
validate_password_strength(new_password)
|
||||||
user.password_hash = hash_password(new_password)
|
user.password_hash = hash_password(new_password)
|
||||||
user.must_change_password = True
|
user.must_change_password = True
|
||||||
self.db.commit()
|
self.db.commit()
|
||||||
|
# Invalidate all existing sessions for security
|
||||||
|
self.delete_user_sessions(user)
|
||||||
|
|
||||||
# --- Session Operations ---
|
# --- Session Operations ---
|
||||||
|
|
||||||
@@ -159,7 +196,8 @@ class AuthService:
|
|||||||
session = UserSession(
|
session = UserSession(
|
||||||
user_id=user.id,
|
user_id=user.id,
|
||||||
token_hash=token_hash,
|
token_hash=token_hash,
|
||||||
expires_at=datetime.now(timezone.utc) + timedelta(hours=SESSION_DURATION_HOURS),
|
expires_at=datetime.now(timezone.utc)
|
||||||
|
+ timedelta(hours=SESSION_DURATION_HOURS),
|
||||||
user_agent=user_agent,
|
user_agent=user_agent,
|
||||||
ip_address=ip_address,
|
ip_address=ip_address,
|
||||||
)
|
)
|
||||||
@@ -328,7 +366,7 @@ def create_default_admin(db: Session) -> Optional[User]:
|
|||||||
auth_service = AuthService(db)
|
auth_service = AuthService(db)
|
||||||
admin = auth_service.create_user(
|
admin = auth_service.create_user(
|
||||||
username="admin",
|
username="admin",
|
||||||
password="admin",
|
password="changeme123",
|
||||||
is_admin=True,
|
is_admin=True,
|
||||||
must_change_password=True,
|
must_change_password=True,
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -27,7 +27,7 @@ async def lifespan(app: FastAPI):
|
|||||||
admin = create_default_admin(db)
|
admin = create_default_admin(db)
|
||||||
if admin:
|
if admin:
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Default admin user created with username 'admin' and password 'admin'. "
|
"Default admin user created with username 'admin' and password 'changeme123'. "
|
||||||
"CHANGE THIS PASSWORD IMMEDIATELY!"
|
"CHANGE THIS PASSWORD IMMEDIATELY!"
|
||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
@@ -368,6 +368,9 @@ from .auth import (
|
|||||||
get_auth_service,
|
get_auth_service,
|
||||||
SESSION_COOKIE_NAME,
|
SESSION_COOKIE_NAME,
|
||||||
verify_password,
|
verify_password,
|
||||||
|
validate_password_strength,
|
||||||
|
PasswordTooShortError,
|
||||||
|
MIN_PASSWORD_LENGTH,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@@ -491,8 +494,14 @@ def change_password(
|
|||||||
detail="Current password is incorrect",
|
detail="Current password is incorrect",
|
||||||
)
|
)
|
||||||
|
|
||||||
# Change password
|
# Validate and change password
|
||||||
auth_service.change_password(current_user, password_request.new_password)
|
try:
|
||||||
|
auth_service.change_password(current_user, password_request.new_password)
|
||||||
|
except PasswordTooShortError:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
detail=f"Password must be at least {MIN_PASSWORD_LENGTH} characters",
|
||||||
|
)
|
||||||
|
|
||||||
# Log audit
|
# Log audit
|
||||||
_log_audit(
|
_log_audit(
|
||||||
@@ -659,6 +668,15 @@ def create_user(
|
|||||||
detail="Username already exists",
|
detail="Username already exists",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Validate password strength
|
||||||
|
try:
|
||||||
|
validate_password_strength(user_create.password)
|
||||||
|
except PasswordTooShortError:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
detail=f"Password must be at least {MIN_PASSWORD_LENGTH} characters",
|
||||||
|
)
|
||||||
|
|
||||||
user = auth_service.create_user(
|
user = auth_service.create_user(
|
||||||
username=user_create.username,
|
username=user_create.username,
|
||||||
password=user_create.password,
|
password=user_create.password,
|
||||||
@@ -738,7 +756,7 @@ def update_user(
|
|||||||
if user_update.is_admin is False and user.is_admin:
|
if user_update.is_admin is False and user.is_admin:
|
||||||
admin_count = (
|
admin_count = (
|
||||||
auth_service.db.query(User)
|
auth_service.db.query(User)
|
||||||
.filter(User.is_admin == True, User.is_active == True)
|
.filter(User.is_admin.is_(True), User.is_active.is_(True))
|
||||||
.count()
|
.count()
|
||||||
)
|
)
|
||||||
if admin_count <= 1:
|
if admin_count <= 1:
|
||||||
@@ -798,7 +816,13 @@ def reset_user_password(
|
|||||||
detail="User not found",
|
detail="User not found",
|
||||||
)
|
)
|
||||||
|
|
||||||
auth_service.reset_user_password(user, reset_request.new_password)
|
try:
|
||||||
|
auth_service.reset_user_password(user, reset_request.new_password)
|
||||||
|
except PasswordTooShortError:
|
||||||
|
raise HTTPException(
|
||||||
|
status_code=status.HTTP_400_BAD_REQUEST,
|
||||||
|
detail=f"Password must be at least {MIN_PASSWORD_LENGTH} characters",
|
||||||
|
)
|
||||||
|
|
||||||
# Log audit
|
# Log audit
|
||||||
_log_audit(
|
_log_audit(
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ class TestAuthLogin:
|
|||||||
"""Test successful login with default admin credentials."""
|
"""Test successful login with default admin credentials."""
|
||||||
response = integration_client.post(
|
response = integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
data = response.json()
|
data = response.json()
|
||||||
@@ -49,7 +49,7 @@ class TestAuthLogout:
|
|||||||
# First login
|
# First login
|
||||||
login_response = integration_client.post(
|
login_response = integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
assert login_response.status_code == 200
|
assert login_response.status_code == 200
|
||||||
|
|
||||||
@@ -75,7 +75,7 @@ class TestAuthMe:
|
|||||||
# Login first
|
# Login first
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
|
|
||||||
response = integration_client.get("/api/v1/auth/me")
|
response = integration_client.get("/api/v1/auth/me")
|
||||||
@@ -106,13 +106,13 @@ class TestAuthChangePassword:
|
|||||||
# Login first
|
# Login first
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Change password
|
# Change password
|
||||||
response = integration_client.post(
|
response = integration_client.post(
|
||||||
"/api/v1/auth/change-password",
|
"/api/v1/auth/change-password",
|
||||||
json={"current_password": "admin", "new_password": "newpassword123"},
|
json={"current_password": "changeme123", "new_password": "newpassword123"},
|
||||||
)
|
)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
|
|
||||||
@@ -120,7 +120,7 @@ class TestAuthChangePassword:
|
|||||||
integration_client.cookies.clear()
|
integration_client.cookies.clear()
|
||||||
response = integration_client.post(
|
response = integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
assert response.status_code == 401
|
assert response.status_code == 401
|
||||||
|
|
||||||
@@ -132,10 +132,11 @@ class TestAuthChangePassword:
|
|||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
|
|
||||||
# Reset password back to original for other tests
|
# Reset password back to original for other tests
|
||||||
integration_client.post(
|
reset_response = integration_client.post(
|
||||||
"/api/v1/auth/change-password",
|
"/api/v1/auth/change-password",
|
||||||
json={"current_password": "newpassword123", "new_password": "admin"},
|
json={"current_password": "newpassword123", "new_password": "changeme123"},
|
||||||
)
|
)
|
||||||
|
assert reset_response.status_code == 200, "Failed to reset admin password back to default"
|
||||||
|
|
||||||
@pytest.mark.integration
|
@pytest.mark.integration
|
||||||
def test_change_password_wrong_current(self, integration_client):
|
def test_change_password_wrong_current(self, integration_client):
|
||||||
@@ -143,7 +144,7 @@ class TestAuthChangePassword:
|
|||||||
# Login first
|
# Login first
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
|
|
||||||
response = integration_client.post(
|
response = integration_client.post(
|
||||||
@@ -163,7 +164,7 @@ class TestAPIKeys:
|
|||||||
# Login first
|
# Login first
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create API key
|
# Create API key
|
||||||
@@ -195,7 +196,7 @@ class TestAPIKeys:
|
|||||||
# Login and create API key
|
# Login and create API key
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
create_response = integration_client.post(
|
create_response = integration_client.post(
|
||||||
"/api/v1/auth/keys",
|
"/api/v1/auth/keys",
|
||||||
@@ -216,7 +217,7 @@ class TestAPIKeys:
|
|||||||
# Clean up
|
# Clean up
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
integration_client.delete(f"/api/v1/auth/keys/{key_id}")
|
integration_client.delete(f"/api/v1/auth/keys/{key_id}")
|
||||||
|
|
||||||
@@ -226,7 +227,7 @@ class TestAPIKeys:
|
|||||||
# Login and create API key
|
# Login and create API key
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
create_response = integration_client.post(
|
create_response = integration_client.post(
|
||||||
"/api/v1/auth/keys",
|
"/api/v1/auth/keys",
|
||||||
@@ -257,7 +258,7 @@ class TestAdminUserManagement:
|
|||||||
# Login as admin
|
# Login as admin
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
|
|
||||||
response = integration_client.get("/api/v1/admin/users")
|
response = integration_client.get("/api/v1/admin/users")
|
||||||
@@ -272,7 +273,7 @@ class TestAdminUserManagement:
|
|||||||
# Login as admin
|
# Login as admin
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create new user
|
# Create new user
|
||||||
@@ -305,7 +306,7 @@ class TestAdminUserManagement:
|
|||||||
# Login as admin
|
# Login as admin
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create a test user
|
# Create a test user
|
||||||
@@ -331,7 +332,7 @@ class TestAdminUserManagement:
|
|||||||
# Login as admin
|
# Login as admin
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
|
|
||||||
# Create a test user
|
# Create a test user
|
||||||
@@ -362,7 +363,7 @@ class TestAdminUserManagement:
|
|||||||
# Login as admin and create non-admin user
|
# Login as admin and create non-admin user
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
"/api/v1/auth/login",
|
"/api/v1/auth/login",
|
||||||
json={"username": "admin", "password": "admin"},
|
json={"username": "admin", "password": "changeme123"},
|
||||||
)
|
)
|
||||||
test_username = f"nonadmin_{uuid4().hex[:8]}"
|
test_username = f"nonadmin_{uuid4().hex[:8]}"
|
||||||
integration_client.post(
|
integration_client.post(
|
||||||
@@ -381,3 +382,379 @@ class TestAdminUserManagement:
|
|||||||
response = integration_client.get("/api/v1/admin/users")
|
response = integration_client.get("/api/v1/admin/users")
|
||||||
assert response.status_code == 403
|
assert response.status_code == 403
|
||||||
assert "Admin privileges required" in response.json()["detail"]
|
assert "Admin privileges required" in response.json()["detail"]
|
||||||
|
|
||||||
|
|
||||||
|
class TestSecurityEdgeCases:
|
||||||
|
"""Tests for security edge cases and validation."""
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_login_inactive_user(self, integration_client):
|
||||||
|
"""Test that inactive users cannot login."""
|
||||||
|
# Login as admin and create a user
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
test_username = f"inactive_{uuid4().hex[:8]}"
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Deactivate the user
|
||||||
|
integration_client.put(
|
||||||
|
f"/api/v1/admin/users/{test_username}",
|
||||||
|
json={"is_active": False},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Try to login as inactive user
|
||||||
|
integration_client.cookies.clear()
|
||||||
|
response = integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert "Invalid username or password" in response.json()["detail"]
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_password_too_short_on_create(self, integration_client):
|
||||||
|
"""Test that short passwords are rejected when creating users."""
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
response = integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": f"shortpw_{uuid4().hex[:8]}", "password": "short"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert "at least 8 characters" in response.json()["detail"]
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_password_too_short_on_change(self, integration_client):
|
||||||
|
"""Test that short passwords are rejected when changing password."""
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
response = integration_client.post(
|
||||||
|
"/api/v1/auth/change-password",
|
||||||
|
json={"current_password": "changeme123", "new_password": "short"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert "at least 8 characters" in response.json()["detail"]
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_password_too_short_on_reset(self, integration_client):
|
||||||
|
"""Test that short passwords are rejected when resetting password."""
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a test user first
|
||||||
|
test_username = f"resetshort_{uuid4().hex[:8]}"
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
response = integration_client.post(
|
||||||
|
f"/api/v1/admin/users/{test_username}/reset-password",
|
||||||
|
json={"new_password": "short"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert "at least 8 characters" in response.json()["detail"]
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_duplicate_username_rejected(self, integration_client):
|
||||||
|
"""Test that duplicate usernames are rejected."""
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
test_username = f"duplicate_{uuid4().hex[:8]}"
|
||||||
|
# Create user first time
|
||||||
|
response1 = integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
assert response1.status_code == 200
|
||||||
|
|
||||||
|
# Try to create same username again
|
||||||
|
response2 = integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": test_username, "password": "password456"},
|
||||||
|
)
|
||||||
|
assert response2.status_code == 409
|
||||||
|
assert "already exists" in response2.json()["detail"]
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_cannot_delete_other_users_api_key(self, integration_client):
|
||||||
|
"""Test that users cannot delete API keys owned by other users."""
|
||||||
|
# Login as admin and create an API key
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
create_response = integration_client.post(
|
||||||
|
"/api/v1/auth/keys",
|
||||||
|
json={"name": "admin-key"},
|
||||||
|
)
|
||||||
|
admin_key_id = create_response.json()["id"]
|
||||||
|
|
||||||
|
# Create a non-admin user
|
||||||
|
test_username = f"nonadmin_{uuid4().hex[:8]}"
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Login as non-admin
|
||||||
|
integration_client.cookies.clear()
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Try to delete admin's API key
|
||||||
|
response = integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}")
|
||||||
|
assert response.status_code == 403
|
||||||
|
assert "Cannot delete another user's API key" in response.json()["detail"]
|
||||||
|
|
||||||
|
# Cleanup: login as admin and delete the key
|
||||||
|
integration_client.cookies.clear()
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}")
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_sessions_invalidated_on_password_change(self, integration_client):
|
||||||
|
"""Test that all sessions are invalidated when password is changed."""
|
||||||
|
# Create a test user
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
test_username = f"sessiontest_{uuid4().hex[:8]}"
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Login as test user
|
||||||
|
integration_client.cookies.clear()
|
||||||
|
login_response = integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
assert login_response.status_code == 200
|
||||||
|
|
||||||
|
# Verify session works
|
||||||
|
me_response = integration_client.get("/api/v1/auth/me")
|
||||||
|
assert me_response.status_code == 200
|
||||||
|
|
||||||
|
# Change password
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/change-password",
|
||||||
|
json={"current_password": "password123", "new_password": "newpassword123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Old session should be invalidated - try to access /me
|
||||||
|
# (note: the change-password call itself may have cleared the session cookie)
|
||||||
|
me_response2 = integration_client.get("/api/v1/auth/me")
|
||||||
|
# This should fail because all sessions were invalidated
|
||||||
|
assert me_response2.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
|
class TestSecurityEdgeCases:
|
||||||
|
"""Tests for security edge cases and validation."""
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_login_inactive_user(self, integration_client):
|
||||||
|
"""Test that inactive users cannot login."""
|
||||||
|
# Login as admin and create a user
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
test_username = f"inactive_{uuid4().hex[:8]}"
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Deactivate the user
|
||||||
|
integration_client.put(
|
||||||
|
f"/api/v1/admin/users/{test_username}",
|
||||||
|
json={"is_active": False},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Try to login as inactive user
|
||||||
|
integration_client.cookies.clear()
|
||||||
|
response = integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert "Invalid username or password" in response.json()["detail"]
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_password_too_short_on_create(self, integration_client):
|
||||||
|
"""Test that short passwords are rejected when creating users."""
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
response = integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": f"shortpw_{uuid4().hex[:8]}", "password": "short"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert "at least 8 characters" in response.json()["detail"]
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_password_too_short_on_change(self, integration_client):
|
||||||
|
"""Test that short passwords are rejected when changing password."""
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
response = integration_client.post(
|
||||||
|
"/api/v1/auth/change-password",
|
||||||
|
json={"current_password": "changeme123", "new_password": "short"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert "at least 8 characters" in response.json()["detail"]
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_password_too_short_on_reset(self, integration_client):
|
||||||
|
"""Test that short passwords are rejected when resetting password."""
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Create a test user first
|
||||||
|
test_username = f"resetshort_{uuid4().hex[:8]}"
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
response = integration_client.post(
|
||||||
|
f"/api/v1/admin/users/{test_username}/reset-password",
|
||||||
|
json={"new_password": "short"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert "at least 8 characters" in response.json()["detail"]
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_duplicate_username_rejected(self, integration_client):
|
||||||
|
"""Test that duplicate usernames are rejected."""
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
test_username = f"duplicate_{uuid4().hex[:8]}"
|
||||||
|
# Create user first time
|
||||||
|
response1 = integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
assert response1.status_code == 200
|
||||||
|
|
||||||
|
# Try to create same username again
|
||||||
|
response2 = integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": test_username, "password": "password456"},
|
||||||
|
)
|
||||||
|
assert response2.status_code == 409
|
||||||
|
assert "already exists" in response2.json()["detail"]
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_cannot_delete_other_users_api_key(self, integration_client):
|
||||||
|
"""Test that users cannot delete API keys owned by other users."""
|
||||||
|
# Login as admin and create an API key
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
create_response = integration_client.post(
|
||||||
|
"/api/v1/auth/keys",
|
||||||
|
json={"name": "admin-key"},
|
||||||
|
)
|
||||||
|
admin_key_id = create_response.json()["id"]
|
||||||
|
|
||||||
|
# Create a non-admin user
|
||||||
|
test_username = f"nonadmin_{uuid4().hex[:8]}"
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Login as non-admin
|
||||||
|
integration_client.cookies.clear()
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Try to delete admin's API key
|
||||||
|
response = integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}")
|
||||||
|
assert response.status_code == 403
|
||||||
|
assert "Cannot delete another user's API key" in response.json()["detail"]
|
||||||
|
|
||||||
|
# Cleanup: login as admin and delete the key
|
||||||
|
integration_client.cookies.clear()
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}")
|
||||||
|
|
||||||
|
@pytest.mark.integration
|
||||||
|
def test_sessions_invalidated_on_password_change(self, integration_client):
|
||||||
|
"""Test that all sessions are invalidated when password is changed."""
|
||||||
|
# Create a test user
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": "admin", "password": "changeme123"},
|
||||||
|
)
|
||||||
|
test_username = f"sessiontest_{uuid4().hex[:8]}"
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/admin/users",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Login as test user
|
||||||
|
integration_client.cookies.clear()
|
||||||
|
login_response = integration_client.post(
|
||||||
|
"/api/v1/auth/login",
|
||||||
|
json={"username": test_username, "password": "password123"},
|
||||||
|
)
|
||||||
|
assert login_response.status_code == 200
|
||||||
|
|
||||||
|
# Verify session works
|
||||||
|
me_response = integration_client.get("/api/v1/auth/me")
|
||||||
|
assert me_response.status_code == 200
|
||||||
|
|
||||||
|
# Change password
|
||||||
|
integration_client.post(
|
||||||
|
"/api/v1/auth/change-password",
|
||||||
|
json={"current_password": "password123", "new_password": "newpassword123"},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Old session should be invalidated - try to access /me
|
||||||
|
# (note: the change-password call itself may have cleared the session cookie)
|
||||||
|
me_response2 = integration_client.get("/api/v1/auth/me")
|
||||||
|
# This should fail because all sessions were invalidated
|
||||||
|
assert me_response2.status_code == 401
|
||||||
|
|||||||
Reference in New Issue
Block a user