diff --git a/backend/app/auth.py b/backend/app/auth.py index ce8169f..82607b3 100644 --- a/backend/app/auth.py +++ b/backend/app/auth.py @@ -22,6 +22,26 @@ API_KEY_PREFIX = "orch_" # Session duration (24 hours default) SESSION_DURATION_HOURS = 24 +# Password requirements +MIN_PASSWORD_LENGTH = 8 + + +class PasswordTooShortError(ValueError): + """Raised when password doesn't meet minimum length requirement.""" + + pass + + +def validate_password_strength(password: str) -> None: + """Validate password meets minimum requirements. + + Raises PasswordTooShortError if password is too short. + """ + if not password or len(password) < MIN_PASSWORD_LENGTH: + raise PasswordTooShortError( + f"Password must be at least {MIN_PASSWORD_LENGTH} characters" + ) + def hash_password(password: str) -> str: """Hash a password using bcrypt.""" @@ -93,23 +113,37 @@ class AuthService: """Authenticate a user with username and password. Returns the user if authentication succeeds, None otherwise. + Uses constant-time comparison to prevent timing-based user enumeration. """ user = self.get_user_by_username(username) + + # Always perform password verification to prevent timing attacks + # Use a dummy hash if user doesn't exist + dummy_hash = "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.VTtYA1vQ9S9sXa" + password_hash = user.password_hash if user and user.password_hash else dummy_hash + + # Verify password (constant time even if user doesn't exist) + password_valid = verify_password(password, password_hash) + + # Check all conditions if not user: return None if not user.password_hash: return None # OIDC-only user if not user.is_active: return None - if not verify_password(password, user.password_hash): + if not password_valid: return None return user def change_password(self, user: User, new_password: str) -> None: - """Change a user's password.""" + """Change a user's password and invalidate all existing sessions.""" + validate_password_strength(new_password) user.password_hash = hash_password(new_password) user.must_change_password = False self.db.commit() + # Invalidate all existing sessions for security + self.delete_user_sessions(user) def update_last_login(self, user: User) -> None: """Update the user's last login timestamp.""" @@ -120,7 +154,7 @@ class AuthService: """List all users.""" query = self.db.query(User) if not include_inactive: - query = query.filter(User.is_active == True) + query = query.filter(User.is_active.is_(True)) return query.order_by(User.username).all() def set_user_active(self, user: User, is_active: bool) -> None: @@ -134,10 +168,13 @@ class AuthService: self.db.commit() def reset_user_password(self, user: User, new_password: str) -> None: - """Reset a user's password (admin action).""" + """Reset a user's password (admin action) and invalidate all sessions.""" + validate_password_strength(new_password) user.password_hash = hash_password(new_password) user.must_change_password = True self.db.commit() + # Invalidate all existing sessions for security + self.delete_user_sessions(user) # --- Session Operations --- @@ -159,7 +196,8 @@ class AuthService: session = UserSession( user_id=user.id, token_hash=token_hash, - expires_at=datetime.now(timezone.utc) + timedelta(hours=SESSION_DURATION_HOURS), + expires_at=datetime.now(timezone.utc) + + timedelta(hours=SESSION_DURATION_HOURS), user_agent=user_agent, ip_address=ip_address, ) @@ -328,7 +366,7 @@ def create_default_admin(db: Session) -> Optional[User]: auth_service = AuthService(db) admin = auth_service.create_user( username="admin", - password="admin", + password="changeme123", is_admin=True, must_change_password=True, ) diff --git a/backend/app/main.py b/backend/app/main.py index 10fdccb..7616acc 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -27,7 +27,7 @@ async def lifespan(app: FastAPI): admin = create_default_admin(db) if admin: logger.warning( - "Default admin user created with username 'admin' and password 'admin'. " + "Default admin user created with username 'admin' and password 'changeme123'. " "CHANGE THIS PASSWORD IMMEDIATELY!" ) finally: diff --git a/backend/app/routes.py b/backend/app/routes.py index 48f6750..d95c7bb 100644 --- a/backend/app/routes.py +++ b/backend/app/routes.py @@ -368,6 +368,9 @@ from .auth import ( get_auth_service, SESSION_COOKIE_NAME, verify_password, + validate_password_strength, + PasswordTooShortError, + MIN_PASSWORD_LENGTH, ) @@ -491,8 +494,14 @@ def change_password( detail="Current password is incorrect", ) - # Change password - auth_service.change_password(current_user, password_request.new_password) + # Validate and change password + try: + auth_service.change_password(current_user, password_request.new_password) + except PasswordTooShortError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Password must be at least {MIN_PASSWORD_LENGTH} characters", + ) # Log audit _log_audit( @@ -659,6 +668,15 @@ def create_user( detail="Username already exists", ) + # Validate password strength + try: + validate_password_strength(user_create.password) + except PasswordTooShortError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Password must be at least {MIN_PASSWORD_LENGTH} characters", + ) + user = auth_service.create_user( username=user_create.username, password=user_create.password, @@ -738,7 +756,7 @@ def update_user( if user_update.is_admin is False and user.is_admin: admin_count = ( auth_service.db.query(User) - .filter(User.is_admin == True, User.is_active == True) + .filter(User.is_admin.is_(True), User.is_active.is_(True)) .count() ) if admin_count <= 1: @@ -798,7 +816,13 @@ def reset_user_password( detail="User not found", ) - auth_service.reset_user_password(user, reset_request.new_password) + try: + auth_service.reset_user_password(user, reset_request.new_password) + except PasswordTooShortError: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Password must be at least {MIN_PASSWORD_LENGTH} characters", + ) # Log audit _log_audit( diff --git a/backend/tests/integration/test_auth_api.py b/backend/tests/integration/test_auth_api.py index 49824ea..817694a 100644 --- a/backend/tests/integration/test_auth_api.py +++ b/backend/tests/integration/test_auth_api.py @@ -12,7 +12,7 @@ class TestAuthLogin: """Test successful login with default admin credentials.""" response = integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) assert response.status_code == 200 data = response.json() @@ -49,7 +49,7 @@ class TestAuthLogout: # First login login_response = integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) assert login_response.status_code == 200 @@ -75,7 +75,7 @@ class TestAuthMe: # Login first integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) response = integration_client.get("/api/v1/auth/me") @@ -106,13 +106,13 @@ class TestAuthChangePassword: # Login first integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) # Change password response = integration_client.post( "/api/v1/auth/change-password", - json={"current_password": "admin", "new_password": "newpassword123"}, + json={"current_password": "changeme123", "new_password": "newpassword123"}, ) assert response.status_code == 200 @@ -120,7 +120,7 @@ class TestAuthChangePassword: integration_client.cookies.clear() response = integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) assert response.status_code == 401 @@ -132,10 +132,11 @@ class TestAuthChangePassword: assert response.status_code == 200 # Reset password back to original for other tests - integration_client.post( + reset_response = integration_client.post( "/api/v1/auth/change-password", - json={"current_password": "newpassword123", "new_password": "admin"}, + json={"current_password": "newpassword123", "new_password": "changeme123"}, ) + assert reset_response.status_code == 200, "Failed to reset admin password back to default" @pytest.mark.integration def test_change_password_wrong_current(self, integration_client): @@ -143,7 +144,7 @@ class TestAuthChangePassword: # Login first integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) response = integration_client.post( @@ -163,7 +164,7 @@ class TestAPIKeys: # Login first integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) # Create API key @@ -195,7 +196,7 @@ class TestAPIKeys: # Login and create API key integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) create_response = integration_client.post( "/api/v1/auth/keys", @@ -216,7 +217,7 @@ class TestAPIKeys: # Clean up integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) integration_client.delete(f"/api/v1/auth/keys/{key_id}") @@ -226,7 +227,7 @@ class TestAPIKeys: # Login and create API key integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) create_response = integration_client.post( "/api/v1/auth/keys", @@ -257,7 +258,7 @@ class TestAdminUserManagement: # Login as admin integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) response = integration_client.get("/api/v1/admin/users") @@ -272,7 +273,7 @@ class TestAdminUserManagement: # Login as admin integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) # Create new user @@ -305,7 +306,7 @@ class TestAdminUserManagement: # Login as admin integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) # Create a test user @@ -331,7 +332,7 @@ class TestAdminUserManagement: # Login as admin integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) # Create a test user @@ -362,7 +363,7 @@ class TestAdminUserManagement: # Login as admin and create non-admin user integration_client.post( "/api/v1/auth/login", - json={"username": "admin", "password": "admin"}, + json={"username": "admin", "password": "changeme123"}, ) test_username = f"nonadmin_{uuid4().hex[:8]}" integration_client.post( @@ -381,3 +382,379 @@ class TestAdminUserManagement: response = integration_client.get("/api/v1/admin/users") assert response.status_code == 403 assert "Admin privileges required" in response.json()["detail"] + + +class TestSecurityEdgeCases: + """Tests for security edge cases and validation.""" + + @pytest.mark.integration + def test_login_inactive_user(self, integration_client): + """Test that inactive users cannot login.""" + # Login as admin and create a user + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + test_username = f"inactive_{uuid4().hex[:8]}" + integration_client.post( + "/api/v1/admin/users", + json={"username": test_username, "password": "password123"}, + ) + + # Deactivate the user + integration_client.put( + f"/api/v1/admin/users/{test_username}", + json={"is_active": False}, + ) + + # Try to login as inactive user + integration_client.cookies.clear() + response = integration_client.post( + "/api/v1/auth/login", + json={"username": test_username, "password": "password123"}, + ) + assert response.status_code == 401 + assert "Invalid username or password" in response.json()["detail"] + + @pytest.mark.integration + def test_password_too_short_on_create(self, integration_client): + """Test that short passwords are rejected when creating users.""" + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + + response = integration_client.post( + "/api/v1/admin/users", + json={"username": f"shortpw_{uuid4().hex[:8]}", "password": "short"}, + ) + assert response.status_code == 400 + assert "at least 8 characters" in response.json()["detail"] + + @pytest.mark.integration + def test_password_too_short_on_change(self, integration_client): + """Test that short passwords are rejected when changing password.""" + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + + response = integration_client.post( + "/api/v1/auth/change-password", + json={"current_password": "changeme123", "new_password": "short"}, + ) + assert response.status_code == 400 + assert "at least 8 characters" in response.json()["detail"] + + @pytest.mark.integration + def test_password_too_short_on_reset(self, integration_client): + """Test that short passwords are rejected when resetting password.""" + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + + # Create a test user first + test_username = f"resetshort_{uuid4().hex[:8]}" + integration_client.post( + "/api/v1/admin/users", + json={"username": test_username, "password": "password123"}, + ) + + response = integration_client.post( + f"/api/v1/admin/users/{test_username}/reset-password", + json={"new_password": "short"}, + ) + assert response.status_code == 400 + assert "at least 8 characters" in response.json()["detail"] + + @pytest.mark.integration + def test_duplicate_username_rejected(self, integration_client): + """Test that duplicate usernames are rejected.""" + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + + test_username = f"duplicate_{uuid4().hex[:8]}" + # Create user first time + response1 = integration_client.post( + "/api/v1/admin/users", + json={"username": test_username, "password": "password123"}, + ) + assert response1.status_code == 200 + + # Try to create same username again + response2 = integration_client.post( + "/api/v1/admin/users", + json={"username": test_username, "password": "password456"}, + ) + assert response2.status_code == 409 + assert "already exists" in response2.json()["detail"] + + @pytest.mark.integration + def test_cannot_delete_other_users_api_key(self, integration_client): + """Test that users cannot delete API keys owned by other users.""" + # Login as admin and create an API key + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + create_response = integration_client.post( + "/api/v1/auth/keys", + json={"name": "admin-key"}, + ) + admin_key_id = create_response.json()["id"] + + # Create a non-admin user + test_username = f"nonadmin_{uuid4().hex[:8]}" + integration_client.post( + "/api/v1/admin/users", + json={"username": test_username, "password": "password123"}, + ) + + # Login as non-admin + integration_client.cookies.clear() + integration_client.post( + "/api/v1/auth/login", + json={"username": test_username, "password": "password123"}, + ) + + # Try to delete admin's API key + response = integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}") + assert response.status_code == 403 + assert "Cannot delete another user's API key" in response.json()["detail"] + + # Cleanup: login as admin and delete the key + integration_client.cookies.clear() + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}") + + @pytest.mark.integration + def test_sessions_invalidated_on_password_change(self, integration_client): + """Test that all sessions are invalidated when password is changed.""" + # Create a test user + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + test_username = f"sessiontest_{uuid4().hex[:8]}" + integration_client.post( + "/api/v1/admin/users", + json={"username": test_username, "password": "password123"}, + ) + + # Login as test user + integration_client.cookies.clear() + login_response = integration_client.post( + "/api/v1/auth/login", + json={"username": test_username, "password": "password123"}, + ) + assert login_response.status_code == 200 + + # Verify session works + me_response = integration_client.get("/api/v1/auth/me") + assert me_response.status_code == 200 + + # Change password + integration_client.post( + "/api/v1/auth/change-password", + json={"current_password": "password123", "new_password": "newpassword123"}, + ) + + # Old session should be invalidated - try to access /me + # (note: the change-password call itself may have cleared the session cookie) + me_response2 = integration_client.get("/api/v1/auth/me") + # This should fail because all sessions were invalidated + assert me_response2.status_code == 401 + + +class TestSecurityEdgeCases: + """Tests for security edge cases and validation.""" + + @pytest.mark.integration + def test_login_inactive_user(self, integration_client): + """Test that inactive users cannot login.""" + # Login as admin and create a user + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + test_username = f"inactive_{uuid4().hex[:8]}" + integration_client.post( + "/api/v1/admin/users", + json={"username": test_username, "password": "password123"}, + ) + + # Deactivate the user + integration_client.put( + f"/api/v1/admin/users/{test_username}", + json={"is_active": False}, + ) + + # Try to login as inactive user + integration_client.cookies.clear() + response = integration_client.post( + "/api/v1/auth/login", + json={"username": test_username, "password": "password123"}, + ) + assert response.status_code == 401 + assert "Invalid username or password" in response.json()["detail"] + + @pytest.mark.integration + def test_password_too_short_on_create(self, integration_client): + """Test that short passwords are rejected when creating users.""" + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + + response = integration_client.post( + "/api/v1/admin/users", + json={"username": f"shortpw_{uuid4().hex[:8]}", "password": "short"}, + ) + assert response.status_code == 400 + assert "at least 8 characters" in response.json()["detail"] + + @pytest.mark.integration + def test_password_too_short_on_change(self, integration_client): + """Test that short passwords are rejected when changing password.""" + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + + response = integration_client.post( + "/api/v1/auth/change-password", + json={"current_password": "changeme123", "new_password": "short"}, + ) + assert response.status_code == 400 + assert "at least 8 characters" in response.json()["detail"] + + @pytest.mark.integration + def test_password_too_short_on_reset(self, integration_client): + """Test that short passwords are rejected when resetting password.""" + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + + # Create a test user first + test_username = f"resetshort_{uuid4().hex[:8]}" + integration_client.post( + "/api/v1/admin/users", + json={"username": test_username, "password": "password123"}, + ) + + response = integration_client.post( + f"/api/v1/admin/users/{test_username}/reset-password", + json={"new_password": "short"}, + ) + assert response.status_code == 400 + assert "at least 8 characters" in response.json()["detail"] + + @pytest.mark.integration + def test_duplicate_username_rejected(self, integration_client): + """Test that duplicate usernames are rejected.""" + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + + test_username = f"duplicate_{uuid4().hex[:8]}" + # Create user first time + response1 = integration_client.post( + "/api/v1/admin/users", + json={"username": test_username, "password": "password123"}, + ) + assert response1.status_code == 200 + + # Try to create same username again + response2 = integration_client.post( + "/api/v1/admin/users", + json={"username": test_username, "password": "password456"}, + ) + assert response2.status_code == 409 + assert "already exists" in response2.json()["detail"] + + @pytest.mark.integration + def test_cannot_delete_other_users_api_key(self, integration_client): + """Test that users cannot delete API keys owned by other users.""" + # Login as admin and create an API key + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + create_response = integration_client.post( + "/api/v1/auth/keys", + json={"name": "admin-key"}, + ) + admin_key_id = create_response.json()["id"] + + # Create a non-admin user + test_username = f"nonadmin_{uuid4().hex[:8]}" + integration_client.post( + "/api/v1/admin/users", + json={"username": test_username, "password": "password123"}, + ) + + # Login as non-admin + integration_client.cookies.clear() + integration_client.post( + "/api/v1/auth/login", + json={"username": test_username, "password": "password123"}, + ) + + # Try to delete admin's API key + response = integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}") + assert response.status_code == 403 + assert "Cannot delete another user's API key" in response.json()["detail"] + + # Cleanup: login as admin and delete the key + integration_client.cookies.clear() + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}") + + @pytest.mark.integration + def test_sessions_invalidated_on_password_change(self, integration_client): + """Test that all sessions are invalidated when password is changed.""" + # Create a test user + integration_client.post( + "/api/v1/auth/login", + json={"username": "admin", "password": "changeme123"}, + ) + test_username = f"sessiontest_{uuid4().hex[:8]}" + integration_client.post( + "/api/v1/admin/users", + json={"username": test_username, "password": "password123"}, + ) + + # Login as test user + integration_client.cookies.clear() + login_response = integration_client.post( + "/api/v1/auth/login", + json={"username": test_username, "password": "password123"}, + ) + assert login_response.status_code == 200 + + # Verify session works + me_response = integration_client.get("/api/v1/auth/me") + assert me_response.status_code == 200 + + # Change password + integration_client.post( + "/api/v1/auth/change-password", + json={"current_password": "password123", "new_password": "newpassword123"}, + ) + + # Old session should be invalidated - try to access /me + # (note: the change-password call itself may have cleared the session cookie) + me_response2 = integration_client.get("/api/v1/auth/me") + # This should fail because all sessions were invalidated + assert me_response2.status_code == 401