"""Integration tests for authentication API endpoints.""" import pytest from uuid import uuid4 class TestAuthLogin: """Tests for login endpoint.""" @pytest.mark.integration def test_login_success(self, integration_client): """Test successful login with default admin credentials.""" response = integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) assert response.status_code == 200 data = response.json() assert data["username"] == "admin" assert data["is_admin"] is True assert "orchard_session" in response.cookies @pytest.mark.integration def test_login_invalid_password(self, integration_client): """Test login with wrong password.""" response = integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "wrongpassword"}, ) assert response.status_code == 401 assert "Invalid username or password" in response.json()["detail"] @pytest.mark.integration def test_login_nonexistent_user(self, integration_client): """Test login with non-existent user.""" response = integration_client.post( "/api/v1/auth/login", json={"username": "nonexistent", "password": "password"}, ) assert response.status_code == 401 class TestAuthLogout: """Tests for logout endpoint.""" @pytest.mark.integration def test_logout_success(self, integration_client): """Test successful logout.""" # First login login_response = integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) assert login_response.status_code == 200 # Then logout logout_response = integration_client.post("/api/v1/auth/logout") assert logout_response.status_code == 200 assert "Logged out successfully" in logout_response.json()["message"] @pytest.mark.integration def test_logout_without_session(self, integration_client): """Test logout without being logged in.""" response = integration_client.post("/api/v1/auth/logout") # Should succeed even without session assert response.status_code == 200 class TestAuthMe: """Tests for get current user endpoint.""" @pytest.mark.integration def test_get_me_authenticated(self, integration_client): """Test getting current user when authenticated.""" # Login first integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) response = integration_client.get("/api/v1/auth/me") assert response.status_code == 200 data = response.json() assert data["username"] == "admin" assert data["is_admin"] is True assert "id" in data assert "created_at" in data @pytest.mark.integration def test_get_me_unauthenticated(self, integration_client): """Test getting current user without authentication.""" # Clear any existing cookies integration_client.cookies.clear() response = integration_client.get("/api/v1/auth/me") assert response.status_code == 401 assert "Not authenticated" in response.json()["detail"] class TestAuthChangePassword: """Tests for change password endpoint.""" @pytest.mark.integration def test_change_password_success(self, integration_client): """Test successful password change.""" # Login first integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Change password response = integration_client.post( "/api/v1/auth/change-password", json={"current_password": "changeme123", "new_password": "newpassword123"}, ) assert response.status_code == 200 # Verify old password no longer works integration_client.cookies.clear() response = integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) assert response.status_code == 401 # Verify new password works response = integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "newpassword123"}, ) assert response.status_code == 200 # Reset password back to original for other tests reset_response = integration_client.post( "/api/v1/auth/change-password", json={"current_password": "newpassword123", "new_password": "changeme123"}, ) assert reset_response.status_code == 200, "Failed to reset admin password back to default" @pytest.mark.integration def test_change_password_wrong_current(self, integration_client): """Test password change with wrong current password.""" # Login first integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) response = integration_client.post( "/api/v1/auth/change-password", json={"current_password": "wrongpassword", "new_password": "newpassword"}, ) assert response.status_code == 400 assert "Current password is incorrect" in response.json()["detail"] class TestAPIKeys: """Tests for API key management endpoints.""" @pytest.mark.integration def test_create_and_list_api_key(self, integration_client): """Test creating and listing API keys.""" # Login first integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Create API key create_response = integration_client.post( "/api/v1/auth/keys", json={"name": "test-key", "description": "Test API key"}, ) assert create_response.status_code == 200 data = create_response.json() assert data["name"] == "test-key" assert data["description"] == "Test API key" assert "key" in data assert data["key"].startswith("orch_") key_id = data["id"] api_key = data["key"] # List API keys list_response = integration_client.get("/api/v1/auth/keys") assert list_response.status_code == 200 keys = list_response.json() assert any(k["id"] == key_id for k in keys) # Clean up - delete the key integration_client.delete(f"/api/v1/auth/keys/{key_id}") @pytest.mark.integration def test_use_api_key_for_auth(self, integration_client): """Test using API key for authentication.""" # Login and create API key integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) create_response = integration_client.post( "/api/v1/auth/keys", json={"name": "auth-test-key"}, ) api_key = create_response.json()["key"] key_id = create_response.json()["id"] # Clear cookies and use API key integration_client.cookies.clear() response = integration_client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {api_key}"}, ) assert response.status_code == 200 assert response.json()["username"] == "admin" # Clean up integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) integration_client.delete(f"/api/v1/auth/keys/{key_id}") @pytest.mark.integration def test_delete_api_key(self, integration_client): """Test revoking an API key.""" # Login and create API key integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) create_response = integration_client.post( "/api/v1/auth/keys", json={"name": "delete-test-key"}, ) key_id = create_response.json()["id"] api_key = create_response.json()["key"] # Delete the key delete_response = integration_client.delete(f"/api/v1/auth/keys/{key_id}") assert delete_response.status_code == 200 # Verify key no longer works integration_client.cookies.clear() response = integration_client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {api_key}"}, ) assert response.status_code == 401 class TestAdminUserManagement: """Tests for admin user management endpoints.""" @pytest.mark.integration def test_list_users(self, integration_client): """Test listing users as admin.""" # Login as admin integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) response = integration_client.get("/api/v1/admin/users") assert response.status_code == 200 users = response.json() assert len(users) >= 1 assert any(u["username"] == "admin" for u in users) @pytest.mark.integration def test_create_user(self, integration_client): """Test creating a new user as admin.""" # Login as admin integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Create new user test_username = f"testuser_{uuid4().hex[:8]}" response = integration_client.post( "/api/v1/admin/users", json={ "username": test_username, "password": "testpassword", "email": "test@example.com", }, ) assert response.status_code == 200 data = response.json() assert data["username"] == test_username assert data["email"] == "test@example.com" assert data["is_admin"] is False # Verify new user can login integration_client.cookies.clear() login_response = integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "testpassword"}, ) assert login_response.status_code == 200 @pytest.mark.integration def test_update_user(self, integration_client): """Test updating a user as admin.""" # Login as admin integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Create a test user test_username = f"updateuser_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password"}, ) # Update the user response = integration_client.put( f"/api/v1/admin/users/{test_username}", json={"email": "updated@example.com", "is_admin": True}, ) assert response.status_code == 200 data = response.json() assert data["email"] == "updated@example.com" assert data["is_admin"] is True @pytest.mark.integration def test_reset_user_password(self, integration_client): """Test resetting a user's password as admin.""" # Login as admin integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Create a test user test_username = f"resetuser_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "oldpassword"}, ) # Reset password response = integration_client.post( f"/api/v1/admin/users/{test_username}/reset-password", json={"new_password": "newpassword"}, ) assert response.status_code == 200 # Verify new password works integration_client.cookies.clear() login_response = integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "newpassword"}, ) assert login_response.status_code == 200 @pytest.mark.integration def test_non_admin_cannot_access_admin_endpoints(self, integration_client): """Test that non-admin users cannot access admin endpoints.""" # Login as admin and create non-admin user integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) test_username = f"nonadmin_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password", "is_admin": False}, ) # Login as non-admin integration_client.cookies.clear() integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "password"}, ) # Try to access admin endpoints response = integration_client.get("/api/v1/admin/users") assert response.status_code == 403 assert "Admin privileges required" in response.json()["detail"] class TestSecurityEdgeCases: """Tests for security edge cases and validation.""" @pytest.mark.integration def test_login_inactive_user(self, integration_client): """Test that inactive users cannot login.""" # Login as admin and create a user integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) test_username = f"inactive_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) # Deactivate the user integration_client.put( f"/api/v1/admin/users/{test_username}", json={"is_active": False}, ) # Try to login as inactive user integration_client.cookies.clear() response = integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "password123"}, ) assert response.status_code == 401 assert "Invalid username or password" in response.json()["detail"] @pytest.mark.integration def test_password_too_short_on_create(self, integration_client): """Test that short passwords are rejected when creating users.""" integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) response = integration_client.post( "/api/v1/admin/users", json={"username": f"shortpw_{uuid4().hex[:8]}", "password": "short"}, ) assert response.status_code == 400 assert "at least 8 characters" in response.json()["detail"] @pytest.mark.integration def test_password_too_short_on_change(self, integration_client): """Test that short passwords are rejected when changing password.""" integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) response = integration_client.post( "/api/v1/auth/change-password", json={"current_password": "changeme123", "new_password": "short"}, ) assert response.status_code == 400 assert "at least 8 characters" in response.json()["detail"] @pytest.mark.integration def test_password_too_short_on_reset(self, integration_client): """Test that short passwords are rejected when resetting password.""" integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Create a test user first test_username = f"resetshort_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) response = integration_client.post( f"/api/v1/admin/users/{test_username}/reset-password", json={"new_password": "short"}, ) assert response.status_code == 400 assert "at least 8 characters" in response.json()["detail"] @pytest.mark.integration def test_duplicate_username_rejected(self, integration_client): """Test that duplicate usernames are rejected.""" integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) test_username = f"duplicate_{uuid4().hex[:8]}" # Create user first time response1 = integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) assert response1.status_code == 200 # Try to create same username again response2 = integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password456"}, ) assert response2.status_code == 409 assert "already exists" in response2.json()["detail"] @pytest.mark.integration def test_cannot_delete_other_users_api_key(self, integration_client): """Test that users cannot delete API keys owned by other users.""" # Login as admin and create an API key integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) create_response = integration_client.post( "/api/v1/auth/keys", json={"name": "admin-key"}, ) admin_key_id = create_response.json()["id"] # Create a non-admin user test_username = f"nonadmin_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) # Login as non-admin integration_client.cookies.clear() integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "password123"}, ) # Try to delete admin's API key response = integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}") assert response.status_code == 403 assert "Cannot delete another user's API key" in response.json()["detail"] # Cleanup: login as admin and delete the key integration_client.cookies.clear() integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}") @pytest.mark.integration def test_sessions_invalidated_on_password_change(self, integration_client): """Test that all sessions are invalidated when password is changed.""" # Create a test user integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) test_username = f"sessiontest_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) # Login as test user integration_client.cookies.clear() login_response = integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "password123"}, ) assert login_response.status_code == 200 # Verify session works me_response = integration_client.get("/api/v1/auth/me") assert me_response.status_code == 200 # Change password integration_client.post( "/api/v1/auth/change-password", json={"current_password": "password123", "new_password": "newpassword123"}, ) # Old session should be invalidated - try to access /me # (note: the change-password call itself may have cleared the session cookie) me_response2 = integration_client.get("/api/v1/auth/me") # This should fail because all sessions were invalidated assert me_response2.status_code == 401 class TestSecurityEdgeCases: """Tests for security edge cases and validation.""" @pytest.mark.integration def test_login_inactive_user(self, integration_client): """Test that inactive users cannot login.""" # Login as admin and create a user integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) test_username = f"inactive_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) # Deactivate the user integration_client.put( f"/api/v1/admin/users/{test_username}", json={"is_active": False}, ) # Try to login as inactive user integration_client.cookies.clear() response = integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "password123"}, ) assert response.status_code == 401 assert "Invalid username or password" in response.json()["detail"] @pytest.mark.integration def test_password_too_short_on_create(self, integration_client): """Test that short passwords are rejected when creating users.""" integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) response = integration_client.post( "/api/v1/admin/users", json={"username": f"shortpw_{uuid4().hex[:8]}", "password": "short"}, ) assert response.status_code == 400 assert "at least 8 characters" in response.json()["detail"] @pytest.mark.integration def test_password_too_short_on_change(self, integration_client): """Test that short passwords are rejected when changing password.""" integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) response = integration_client.post( "/api/v1/auth/change-password", json={"current_password": "changeme123", "new_password": "short"}, ) assert response.status_code == 400 assert "at least 8 characters" in response.json()["detail"] @pytest.mark.integration def test_password_too_short_on_reset(self, integration_client): """Test that short passwords are rejected when resetting password.""" integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) # Create a test user first test_username = f"resetshort_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) response = integration_client.post( f"/api/v1/admin/users/{test_username}/reset-password", json={"new_password": "short"}, ) assert response.status_code == 400 assert "at least 8 characters" in response.json()["detail"] @pytest.mark.integration def test_duplicate_username_rejected(self, integration_client): """Test that duplicate usernames are rejected.""" integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) test_username = f"duplicate_{uuid4().hex[:8]}" # Create user first time response1 = integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) assert response1.status_code == 200 # Try to create same username again response2 = integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password456"}, ) assert response2.status_code == 409 assert "already exists" in response2.json()["detail"] @pytest.mark.integration def test_cannot_delete_other_users_api_key(self, integration_client): """Test that users cannot delete API keys owned by other users.""" # Login as admin and create an API key integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) create_response = integration_client.post( "/api/v1/auth/keys", json={"name": "admin-key"}, ) admin_key_id = create_response.json()["id"] # Create a non-admin user test_username = f"nonadmin_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) # Login as non-admin integration_client.cookies.clear() integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "password123"}, ) # Try to delete admin's API key response = integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}") assert response.status_code == 403 assert "Cannot delete another user's API key" in response.json()["detail"] # Cleanup: login as admin and delete the key integration_client.cookies.clear() integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) integration_client.delete(f"/api/v1/auth/keys/{admin_key_id}") @pytest.mark.integration def test_sessions_invalidated_on_password_change(self, integration_client): """Test that all sessions are invalidated when password is changed.""" # Create a test user integration_client.post( "/api/v1/auth/login", json={"username": "admin", "password": "changeme123"}, ) test_username = f"sessiontest_{uuid4().hex[:8]}" integration_client.post( "/api/v1/admin/users", json={"username": test_username, "password": "password123"}, ) # Login as test user integration_client.cookies.clear() login_response = integration_client.post( "/api/v1/auth/login", json={"username": test_username, "password": "password123"}, ) assert login_response.status_code == 200 # Verify session works me_response = integration_client.get("/api/v1/auth/me") assert me_response.status_code == 200 # Change password integration_client.post( "/api/v1/auth/change-password", json={"current_password": "password123", "new_password": "newpassword123"}, ) # Old session should be invalidated - try to access /me # (note: the change-password call itself may have cleared the session cookie) me_response2 = integration_client.get("/api/v1/auth/me") # This should fail because all sessions were invalidated assert me_response2.status_code == 401