"""Integration tests for authentication API endpoints. Note: These tests are marked as auth_intensive because they make many login requests which can trigger rate limiting on deployed environments. They are excluded from CI integration tests but run in local and unit test suites. """ import pytest from uuid import uuid4 # Mark all tests in this module as auth_intensive pytestmark = pytest.mark.auth_intensive class TestAuthLogin: """Tests for login endpoint.""" @pytest.mark.integration def test_login_success(self, integration_client): """Test successful login with default admin credentials.""" response = integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) assert response.status_code == 200 data = response.json() assert data["username"] == "admin" assert data["is_admin"] is True assert "orchard_session" in response.cookies @pytest.mark.integration def test_login_invalid_password(self, integration_client): """Test login with wrong password.""" response = integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "wrongpassword"}, ) assert response.status_code == 401 assert "Invalid username or password" in response.json()["detail"] @pytest.mark.integration def test_login_nonexistent_user(self, integration_client): """Test login with non-existent user.""" response = integration_client.post( "/api/v1/auth/login", json={"username": "nonexistent", "password": "password"}, ) assert response.status_code == 401 class TestAuthLogout: """Tests for logout endpoint.""" @pytest.mark.integration def test_logout_success(self, integration_client): """Test successful logout.""" # First login login_response = integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) assert login_response.status_code == 200 # Then logout logout_response = integration_client.post("/api/v1/auth/logout") assert logout_response.status_code == 200 assert "Logged out successfully" in logout_response.json()["message"] @pytest.mark.integration def test_logout_without_session(self, integration_client): """Test logout without being logged in.""" response = integration_client.post("/api/v1/auth/logout") # Should succeed even without session assert response.status_code == 200 class TestAuthMe: """Tests for get current user endpoint.""" @pytest.mark.integration def test_get_me_authenticated(self, integration_client): """Test getting current user when authenticated.""" # Login first integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) response = integration_client.get("/api/v1/auth/me") assert response.status_code == 200 data = response.json() assert data["username"] == "admin" assert data["is_admin"] is True assert "id" in data assert "created_at" in data @pytest.mark.integration def test_get_me_unauthenticated(self, integration_client): """Test getting current user without authentication.""" # Clear any existing cookies integration_client.cookies.clear() response = integration_client.get("/api/v1/auth/me") assert response.status_code == 401 assert "Not authenticated" in response.json()["detail"] class TestAuthChangePassword: """Tests for change password endpoint.""" @pytest.mark.integration def test_change_password_success(self, integration_client): """Test successful password change.""" # Login first integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Change password response = integration_client.post( "/api/v1/auth/change-password", json={"current_password": "changeme123", "new_password": "newpassword123"}, ) assert response.status_code == 200 # Verify old password no longer works integration_client.cookies.clear() response = integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) assert response.status_code == 401 # Verify new password works response = integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "newpassword123"}, ) assert response.status_code == 200 # Reset password back to original for other tests reset_response = integration_client.post( "/api/v1/auth/change-password", json={"current_password": "newpassword123", "new_password": "changeme123"}, ) assert reset_response.status_code == 200, "Failed to reset admin password back to default" @pytest.mark.integration def test_change_password_wrong_current(self, integration_client): """Test password change with wrong current password.""" # Login first integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) response = integration_client.post( "/api/v1/auth/change-password", json={"current_password": "wrongpassword", "new_password": "newpassword"}, ) assert response.status_code == 400 assert "Current password is incorrect" in response.json()["detail"] class TestAPIKeys: """Tests for API key management endpoints.""" @pytest.mark.integration def test_create_and_list_api_key(self, integration_client): """Test creating and listing API keys.""" # Login first integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Create API key create_response = integration_client.post( "/api/v1/auth/keys", json={"name": "test-key", "description": "Test API key"}, ) assert create_response.status_code == 200 data = create_response.json() assert data["name"] == "test-key" assert data["description"] == "Test API key" assert "key" in data assert data["key"].startswith("orch_") key_id = data["id"] api_key = data["key"] # List API keys list_response = integration_client.get("/api/v1/auth/keys") assert list_response.status_code == 200 keys = list_response.json() assert any(k["id"] == key_id for k in keys) # Clean up - delete the key integration_client.delete(f"/api/v1/auth/keys/{key_id}") @pytest.mark.integration def test_use_api_key_for_auth(self, integration_client): """Test using API key for authentication.""" # Login and create API key integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) create_response = integration_client.post( "/api/v1/auth/keys", json={"name": "auth-test-key"}, ) api_key = create_response.json()["key"] key_id = create_response.json()["id"] # Clear cookies and use API key integration_client.cookies.clear() response = integration_client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {api_key}"}, ) assert response.status_code == 200 assert response.json()["username"] == "admin" # Clean up integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) integration_client.delete(f"/api/v1/auth/keys/{key_id}") @pytest.mark.integration def test_delete_api_key(self, integration_client): """Test revoking an API key.""" # Login and create API key integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) create_response = integration_client.post( "/api/v1/auth/keys", json={"name": "delete-test-key"}, ) key_id = create_response.json()["id"] api_key = create_response.json()["key"] # Delete the key delete_response = integration_client.delete(f"/api/v1/auth/keys/{key_id}") assert delete_response.status_code == 200 # Verify key no longer works integration_client.cookies.clear() response = integration_client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {api_key}"}, ) assert response.status_code == 401 class TestAdminUserManagement: """Tests for admin user management endpoints.""" @pytest.mark.integration def test_list_users(self, integration_client): """Test listing users as admin.""" # Login as admin integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) response = integration_client.get("/api/v1/admin/users") assert response.status_code == 200 users = response.json() assert len(users) >= 1 assert any(u["username"] == "admin" for u in users) @pytest.mark.integration def test_create_user(self, integration_client): """Test creating a new user as admin.""" # Login as admin integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Create new user test_username = f"testuser_{uuid4().hex[:8]}" response = integration_client.post( "/api/v1/admin/users", json={ "username": test_username, "password": "testpassword", "email": "test@example.com", }, ) assert response.status_code == 200 data = response.json() assert data["username"] == test_username assert data["email"] == "test@example.com" assert data["is_admin"] is False # Verify new user can login integration_client.cookies.clear() login_response = integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "testpassword"}, ) assert login_response.status_code == 200 @pytest.mark.integration def test_update_user(self, integration_client): """Test updating a user as admin.""" # Login as admin integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Create a test user test_username = f"updateuser_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password"}, ) # Update the user response = integration_client.put( f"/api/v1/admin/users/{test_username}", json={"email": "updated@example.com", "is_admin": True}, ) assert response.status_code == 200 data = response.json() assert data["email"] == "updated@example.com" assert data["is_admin"] is True @pytest.mark.integration def test_reset_user_password(self, integration_client): """Test resetting a user's password as admin.""" # Login as admin integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Create a test user test_username = f"resetuser_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "oldpassword"}, ) # Reset password response = integration_client.post( f"/api/v1/admin/users/{test_username}/reset-password", json={"new_password": "newpassword"}, ) assert response.status_code == 200 # Verify new password works integration_client.cookies.clear() login_response = integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "newpassword"}, ) assert login_response.status_code == 200 @pytest.mark.integration def test_non_admin_cannot_access_admin_endpoints(self, integration_client): """Test that non-admin users cannot access admin endpoints.""" # Login as admin and create non-admin user integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) test_username = f"nonadmin_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password", "is_admin": False}, ) # Login as non-admin integration_client.cookies.clear() integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "password"}, ) # Try to access admin endpoints response = integration_client.get("/api/v1/admin/users") assert response.status_code == 403 assert "Admin privileges required" in response.json()["detail"] class TestSecurityEdgeCases: """Tests for security edge cases and validation.""" @pytest.mark.integration def test_login_inactive_user(self, integration_client): """Test that inactive users cannot login.""" # Login as admin and create a user integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) test_username = f"inactive_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) # Deactivate the user integration_client.put( f"/api/v1/admin/users/{test_username}", json={"is_active": False}, ) # Try to login as inactive user integration_client.cookies.clear() response = integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "password123"}, ) assert response.status_code == 401 assert "Invalid username or password" in response.json()["detail"] @pytest.mark.integration def test_password_too_short_on_create(self, integration_client): """Test that short passwords are rejected when creating users.""" integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) response = integration_client.post( "/api/v1/admin/users", json={"username": f"shortpw_{uuid4().hex[:8]}", "password": "short"}, ) assert response.status_code == 400 assert "at least 8 characters" in response.json()["detail"] @pytest.mark.integration def test_password_too_short_on_change(self, integration_client): """Test that short passwords are rejected when changing password.""" integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) response = integration_client.post( "/api/v1/auth/change-password", json={"current_password": "changeme123", "new_password": "short"}, ) assert response.status_code == 400 assert "at least 8 characters" in response.json()["detail"] @pytest.mark.integration def test_password_too_short_on_reset(self, integration_client): """Test that short passwords are rejected when resetting password.""" integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Create a test user first test_username = f"resetshort_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) response = integration_client.post( f"/api/v1/admin/users/{test_username}/reset-password", json={"new_password": "short"}, ) assert response.status_code == 400 assert "at least 8 characters" in response.json()["detail"] @pytest.mark.integration def test_duplicate_username_rejected(self, integration_client): """Test that duplicate usernames are rejected.""" integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) test_username = f"duplicate_{uuid4().hex[:8]}" # Create user first time response1 = integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) assert response1.status_code == 200 # Try to create same username again response2 = integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password456"}, ) assert response2.status_code == 409 assert "already exists" in response2.json()["detail"] @pytest.mark.integration def test_cannot_delete_other_users_api_key(self, integration_client): """Test that users cannot delete API keys owned by other users.""" # Login as admin and create an API key integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) create_response = integration_client.post( "/api/v1/auth/keys", json={"name": "admin-key"}, ) admin_key_id = create_response.json()["id"] # Create a non-admin user test_username = f"nonadmin_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) # Login as non-admin integration_client.cookies.clear() integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "password123"}, ) # Try to delete admin's API key response = integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}") assert response.status_code == 403 assert "Cannot delete another user's API key" in response.json()["detail"] # Cleanup: login as admin and delete the key integration_client.cookies.clear() integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}") @pytest.mark.integration def test_sessions_invalidated_on_password_change(self, integration_client): """Test that all sessions are invalidated when password is changed.""" # Create a test user integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) test_username = f"sessiontest_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) # Login as test user integration_client.cookies.clear() login_response = integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "password123"}, ) assert login_response.status_code == 200 # Verify session works me_response = integration_client.get("/api/v1/auth/me") assert me_response.status_code == 200 # Change password integration_client.post( "/api/v1/auth/change-password", json={"current_password": "password123", "new_password": "newpassword123"}, ) # Old session should be invalidated - try to access /me # (note: the change-password call itself may have cleared the session cookie) me_response2 = integration_client.get("/api/v1/auth/me") # This should fail because all sessions were invalidated assert me_response2.status_code == 401