- Add get_admin_password() and get_admin_username() helpers to conftest.py - Update test_auth_api.py to use helpers instead of hardcoded credentials - Allows tests to work with different passwords in feature/stage/prod envs The ORCHARD_TEST_PASSWORD environment variable is set by CI jobs to match the deployed environment's admin password.
618 lines
22 KiB
Python
618 lines
22 KiB
Python
"""Integration tests for authentication API endpoints.
|
|
|
|
Note: These tests are marked as auth_intensive because they make many login
|
|
requests. Dev/stage deployments have relaxed rate limits (1000/minute) to
|
|
allow these tests to run. Production uses strict rate limits (5/minute).
|
|
"""
|
|
|
|
import pytest
|
|
from uuid import uuid4
|
|
|
|
from tests.conftest import get_admin_password, get_admin_username
|
|
|
|
|
|
# Mark all tests in this module as auth_intensive (informational, not excluded from CI)
|
|
pytestmark = pytest.mark.auth_intensive
|
|
|
|
|
|
class TestAuthLogin:
|
|
"""Tests for login endpoint."""
|
|
|
|
@pytest.mark.integration
|
|
def test_login_success(self, auth_client):
|
|
"""Test successful login with default admin credentials."""
|
|
response = auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["username"] == get_admin_username()
|
|
assert data["is_admin"] is True
|
|
assert "orchard_session" in response.cookies
|
|
|
|
@pytest.mark.integration
|
|
def test_login_invalid_password(self, auth_client):
|
|
"""Test login with wrong password."""
|
|
response = auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": "wrongpassword"},
|
|
)
|
|
assert response.status_code == 401
|
|
assert "Invalid username or password" in response.json()["detail"]
|
|
|
|
@pytest.mark.integration
|
|
def test_login_nonexistent_user(self, auth_client):
|
|
"""Test login with non-existent user."""
|
|
response = auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": "nonexistent", "password": "password"},
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
|
|
class TestAuthLogout:
|
|
"""Tests for logout endpoint."""
|
|
|
|
@pytest.mark.integration
|
|
def test_logout_success(self, auth_client):
|
|
"""Test successful logout."""
|
|
# First login
|
|
login_response = auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
assert login_response.status_code == 200
|
|
|
|
# Then logout
|
|
logout_response = auth_client.post("/api/v1/auth/logout")
|
|
assert logout_response.status_code == 200
|
|
assert "Logged out successfully" in logout_response.json()["message"]
|
|
|
|
@pytest.mark.integration
|
|
def test_logout_without_session(self, auth_client):
|
|
"""Test logout without being logged in."""
|
|
response = auth_client.post("/api/v1/auth/logout")
|
|
# Should succeed even without session
|
|
assert response.status_code == 200
|
|
|
|
|
|
class TestAuthMe:
|
|
"""Tests for get current user endpoint."""
|
|
|
|
@pytest.mark.integration
|
|
def test_get_me_authenticated(self, auth_client):
|
|
"""Test getting current user when authenticated."""
|
|
# Login first
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
|
|
response = auth_client.get("/api/v1/auth/me")
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["username"] == get_admin_username()
|
|
assert data["is_admin"] is True
|
|
assert "id" in data
|
|
assert "created_at" in data
|
|
|
|
@pytest.mark.integration
|
|
def test_get_me_unauthenticated(self, auth_client):
|
|
"""Test getting current user without authentication."""
|
|
# Clear any existing cookies
|
|
auth_client.cookies.clear()
|
|
|
|
response = auth_client.get("/api/v1/auth/me")
|
|
assert response.status_code == 401
|
|
assert "Not authenticated" in response.json()["detail"]
|
|
|
|
|
|
class TestAuthChangePassword:
|
|
"""Tests for change password endpoint.
|
|
|
|
Note: These tests use dedicated test users instead of admin to avoid
|
|
invalidating the integration_client session (which uses admin).
|
|
"""
|
|
|
|
@pytest.mark.integration
|
|
def test_change_password_success(self, auth_client):
|
|
"""Test successful password change."""
|
|
# Login as admin to create a test user
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
test_username = f"pwchange_{uuid4().hex[:8]}"
|
|
auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": test_username, "password": "oldpassword123"},
|
|
)
|
|
|
|
# Login as test user
|
|
auth_client.cookies.clear()
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": test_username, "password": "oldpassword123"},
|
|
)
|
|
|
|
# Change password
|
|
response = auth_client.post(
|
|
"/api/v1/auth/change-password",
|
|
json={"current_password": "oldpassword123", "new_password": "newpassword123"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# Verify old password no longer works
|
|
auth_client.cookies.clear()
|
|
response = auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": test_username, "password": "oldpassword123"},
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
# Verify new password works
|
|
response = auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": test_username, "password": "newpassword123"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
@pytest.mark.integration
|
|
def test_change_password_wrong_current(self, auth_client):
|
|
"""Test password change with wrong current password."""
|
|
# Login as admin to create a test user
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
test_username = f"pwwrong_{uuid4().hex[:8]}"
|
|
auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": test_username, "password": "password123"},
|
|
)
|
|
|
|
# Login as test user
|
|
auth_client.cookies.clear()
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": test_username, "password": "password123"},
|
|
)
|
|
|
|
response = auth_client.post(
|
|
"/api/v1/auth/change-password",
|
|
json={"current_password": "wrongpassword", "new_password": "newpassword"},
|
|
)
|
|
assert response.status_code == 400
|
|
assert "Current password is incorrect" in response.json()["detail"]
|
|
|
|
|
|
class TestAPIKeys:
|
|
"""Tests for API key management endpoints."""
|
|
|
|
@pytest.mark.integration
|
|
def test_create_and_list_api_key(self, auth_client):
|
|
"""Test creating and listing API keys."""
|
|
# Login first
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
|
|
# Create API key
|
|
create_response = auth_client.post(
|
|
"/api/v1/auth/keys",
|
|
json={"name": "test-key", "description": "Test API key"},
|
|
)
|
|
assert create_response.status_code == 200
|
|
data = create_response.json()
|
|
assert data["name"] == "test-key"
|
|
assert data["description"] == "Test API key"
|
|
assert "key" in data
|
|
assert data["key"].startswith("orch_")
|
|
key_id = data["id"]
|
|
api_key = data["key"]
|
|
|
|
# List API keys
|
|
list_response = auth_client.get("/api/v1/auth/keys")
|
|
assert list_response.status_code == 200
|
|
keys = list_response.json()
|
|
assert any(k["id"] == key_id for k in keys)
|
|
|
|
# Clean up - delete the key
|
|
auth_client.delete(f"/api/v1/auth/keys/{key_id}")
|
|
|
|
@pytest.mark.integration
|
|
def test_use_api_key_for_auth(self, auth_client):
|
|
"""Test using API key for authentication."""
|
|
# Login and create API key
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
create_response = auth_client.post(
|
|
"/api/v1/auth/keys",
|
|
json={"name": "auth-test-key"},
|
|
)
|
|
api_key = create_response.json()["key"]
|
|
key_id = create_response.json()["id"]
|
|
|
|
# Clear cookies and use API key
|
|
auth_client.cookies.clear()
|
|
response = auth_client.get(
|
|
"/api/v1/auth/me",
|
|
headers={"Authorization": f"Bearer {api_key}"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["username"] == get_admin_username()
|
|
|
|
# Clean up
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
auth_client.delete(f"/api/v1/auth/keys/{key_id}")
|
|
|
|
@pytest.mark.integration
|
|
def test_delete_api_key(self, auth_client):
|
|
"""Test revoking an API key."""
|
|
# Login and create API key
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
create_response = auth_client.post(
|
|
"/api/v1/auth/keys",
|
|
json={"name": "delete-test-key"},
|
|
)
|
|
key_id = create_response.json()["id"]
|
|
api_key = create_response.json()["key"]
|
|
|
|
# Delete the key
|
|
delete_response = auth_client.delete(f"/api/v1/auth/keys/{key_id}")
|
|
assert delete_response.status_code == 200
|
|
|
|
# Verify key no longer works
|
|
auth_client.cookies.clear()
|
|
response = auth_client.get(
|
|
"/api/v1/auth/me",
|
|
headers={"Authorization": f"Bearer {api_key}"},
|
|
)
|
|
assert response.status_code == 401
|
|
|
|
|
|
class TestAdminUserManagement:
|
|
"""Tests for admin user management endpoints."""
|
|
|
|
@pytest.mark.integration
|
|
def test_list_users(self, auth_client):
|
|
"""Test listing users as admin."""
|
|
# Login as admin
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
|
|
response = auth_client.get("/api/v1/admin/users")
|
|
assert response.status_code == 200
|
|
users = response.json()
|
|
assert len(users) >= 1
|
|
assert any(u["username"] == get_admin_username() for u in users)
|
|
|
|
@pytest.mark.integration
|
|
def test_create_user(self, auth_client):
|
|
"""Test creating a new user as admin."""
|
|
# Login as admin
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
|
|
# Create new user
|
|
test_username = f"testuser_{uuid4().hex[:8]}"
|
|
response = auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={
|
|
"username": test_username,
|
|
"password": "testpassword",
|
|
"email": "test@example.com",
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["username"] == test_username
|
|
assert data["email"] == "test@example.com"
|
|
assert data["is_admin"] is False
|
|
|
|
# Verify new user can login
|
|
auth_client.cookies.clear()
|
|
login_response = auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": test_username, "password": "testpassword"},
|
|
)
|
|
assert login_response.status_code == 200
|
|
|
|
@pytest.mark.integration
|
|
def test_update_user(self, auth_client):
|
|
"""Test updating a user as admin."""
|
|
# Login as admin
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
|
|
# Create a test user
|
|
test_username = f"updateuser_{uuid4().hex[:8]}"
|
|
auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": test_username, "password": "password"},
|
|
)
|
|
|
|
# Update the user
|
|
response = auth_client.put(
|
|
f"/api/v1/admin/users/{test_username}",
|
|
json={"email": "updated@example.com", "is_admin": True},
|
|
)
|
|
assert response.status_code == 200
|
|
data = response.json()
|
|
assert data["email"] == "updated@example.com"
|
|
assert data["is_admin"] is True
|
|
|
|
@pytest.mark.integration
|
|
def test_reset_user_password(self, auth_client):
|
|
"""Test resetting a user's password as admin."""
|
|
# Login as admin
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
|
|
# Create a test user
|
|
test_username = f"resetuser_{uuid4().hex[:8]}"
|
|
auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": test_username, "password": "oldpassword"},
|
|
)
|
|
|
|
# Reset password
|
|
response = auth_client.post(
|
|
f"/api/v1/admin/users/{test_username}/reset-password",
|
|
json={"new_password": "newpassword"},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
# Verify new password works
|
|
auth_client.cookies.clear()
|
|
login_response = auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": test_username, "password": "newpassword"},
|
|
)
|
|
assert login_response.status_code == 200
|
|
|
|
@pytest.mark.integration
|
|
def test_non_admin_cannot_access_admin_endpoints(self, auth_client):
|
|
"""Test that non-admin users cannot access admin endpoints."""
|
|
# Login as admin and create non-admin user
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
test_username = f"nonadmin_{uuid4().hex[:8]}"
|
|
auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": test_username, "password": "password", "is_admin": False},
|
|
)
|
|
|
|
# Login as non-admin
|
|
auth_client.cookies.clear()
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": test_username, "password": "password"},
|
|
)
|
|
|
|
# Try to access admin endpoints
|
|
response = auth_client.get("/api/v1/admin/users")
|
|
assert response.status_code == 403
|
|
assert "Admin privileges required" in response.json()["detail"]
|
|
|
|
|
|
class TestSecurityEdgeCases:
|
|
"""Tests for security edge cases and validation."""
|
|
|
|
@pytest.mark.integration
|
|
def test_login_inactive_user(self, auth_client):
|
|
"""Test that inactive users cannot login."""
|
|
# Login as admin and create a user
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
test_username = f"inactive_{uuid4().hex[:8]}"
|
|
auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": test_username, "password": "password123"},
|
|
)
|
|
|
|
# Deactivate the user
|
|
auth_client.put(
|
|
f"/api/v1/admin/users/{test_username}",
|
|
json={"is_active": False},
|
|
)
|
|
|
|
# Try to login as inactive user
|
|
auth_client.cookies.clear()
|
|
response = auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": test_username, "password": "password123"},
|
|
)
|
|
assert response.status_code == 401
|
|
assert "Invalid username or password" in response.json()["detail"]
|
|
|
|
@pytest.mark.integration
|
|
def test_password_too_short_on_create(self, auth_client):
|
|
"""Test that short passwords are rejected when creating users."""
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
|
|
response = auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": f"shortpw_{uuid4().hex[:8]}", "password": "short"},
|
|
)
|
|
assert response.status_code == 400
|
|
assert "at least 8 characters" in response.json()["detail"]
|
|
|
|
@pytest.mark.integration
|
|
def test_password_too_short_on_change(self, auth_client):
|
|
"""Test that short passwords are rejected when changing password."""
|
|
# Create test user
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
test_username = f"shortchange_{uuid4().hex[:8]}"
|
|
auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": test_username, "password": "password123"},
|
|
)
|
|
|
|
# Login as test user
|
|
auth_client.cookies.clear()
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": test_username, "password": "password123"},
|
|
)
|
|
|
|
response = auth_client.post(
|
|
"/api/v1/auth/change-password",
|
|
json={"current_password": "password123", "new_password": "short"},
|
|
)
|
|
assert response.status_code == 400
|
|
assert "at least 8 characters" in response.json()["detail"]
|
|
|
|
@pytest.mark.integration
|
|
def test_password_too_short_on_reset(self, auth_client):
|
|
"""Test that short passwords are rejected when resetting password."""
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
|
|
# Create a test user first
|
|
test_username = f"resetshort_{uuid4().hex[:8]}"
|
|
auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": test_username, "password": "password123"},
|
|
)
|
|
|
|
response = auth_client.post(
|
|
f"/api/v1/admin/users/{test_username}/reset-password",
|
|
json={"new_password": "short"},
|
|
)
|
|
assert response.status_code == 400
|
|
assert "at least 8 characters" in response.json()["detail"]
|
|
|
|
@pytest.mark.integration
|
|
def test_duplicate_username_rejected(self, auth_client):
|
|
"""Test that duplicate usernames are rejected."""
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
|
|
test_username = f"duplicate_{uuid4().hex[:8]}"
|
|
# Create user first time
|
|
response1 = auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": test_username, "password": "password123"},
|
|
)
|
|
assert response1.status_code == 200
|
|
|
|
# Try to create same username again
|
|
response2 = auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": test_username, "password": "password456"},
|
|
)
|
|
assert response2.status_code == 409
|
|
assert "already exists" in response2.json()["detail"]
|
|
|
|
@pytest.mark.integration
|
|
def test_cannot_delete_other_users_api_key(self, auth_client):
|
|
"""Test that users cannot delete API keys owned by other users."""
|
|
# Login as admin and create an API key
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
create_response = auth_client.post(
|
|
"/api/v1/auth/keys",
|
|
json={"name": "admin-key"},
|
|
)
|
|
admin_key_id = create_response.json()["id"]
|
|
|
|
# Create a non-admin user
|
|
test_username = f"nonadmin_{uuid4().hex[:8]}"
|
|
auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": test_username, "password": "password123"},
|
|
)
|
|
|
|
# Login as non-admin
|
|
auth_client.cookies.clear()
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": test_username, "password": "password123"},
|
|
)
|
|
|
|
# Try to delete admin's API key
|
|
response = auth_client.delete(f"/api/v1/auth/keys/{admin_key_id}")
|
|
assert response.status_code == 403
|
|
assert "Cannot delete another user's API key" in response.json()["detail"]
|
|
|
|
# Cleanup: login as admin and delete the key
|
|
auth_client.cookies.clear()
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
auth_client.delete(f"/api/v1/auth/keys/{admin_key_id}")
|
|
|
|
@pytest.mark.integration
|
|
def test_sessions_invalidated_on_password_change(self, auth_client):
|
|
"""Test that all sessions are invalidated when password is changed."""
|
|
# Create a test user
|
|
auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": get_admin_username(), "password": get_admin_password()},
|
|
)
|
|
test_username = f"sessiontest_{uuid4().hex[:8]}"
|
|
auth_client.post(
|
|
"/api/v1/admin/users",
|
|
json={"username": test_username, "password": "password123"},
|
|
)
|
|
|
|
# Login as test user
|
|
auth_client.cookies.clear()
|
|
login_response = auth_client.post(
|
|
"/api/v1/auth/login",
|
|
json={"username": test_username, "password": "password123"},
|
|
)
|
|
assert login_response.status_code == 200
|
|
|
|
# Verify session works
|
|
me_response = auth_client.get("/api/v1/auth/me")
|
|
assert me_response.status_code == 200
|
|
|
|
# Change password
|
|
auth_client.post(
|
|
"/api/v1/auth/change-password",
|
|
json={"current_password": "password123", "new_password": "newpassword123"},
|
|
)
|
|
|
|
# Old session should be invalidated - try to access /me
|
|
# (note: the change-password call itself may have cleared the session cookie)
|
|
me_response2 = auth_client.get("/api/v1/auth/me")
|
|
# This should fail because all sessions were invalidated
|
|
assert me_response2.status_code == 401
|