"""Tests for PyPI cache worker module.""" import os import pytest import re from datetime import datetime, timedelta from unittest.mock import MagicMock, patch from uuid import uuid4 import httpx def get_base_url(): """Get the base URL for the Orchard server from environment.""" return os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") class TestPyPICacheTaskModel: """Tests for PyPICacheTask model.""" def test_model_creation(self): """Test that PyPICacheTask model can be instantiated with explicit values.""" from app.models import PyPICacheTask task = PyPICacheTask( package_name="requests", version_constraint=">=2.25.0", depth=0, status="pending", attempts=0, max_attempts=3, ) assert task.package_name == "requests" assert task.version_constraint == ">=2.25.0" assert task.depth == 0 assert task.status == "pending" assert task.attempts == 0 assert task.max_attempts == 3 def test_model_fields_exist(self): """Test that PyPICacheTask has all expected fields.""" from app.models import PyPICacheTask # Create with minimal required field task = PyPICacheTask(package_name="urllib3") # Verify all expected attributes exist (SQLAlchemy defaults apply on flush) assert hasattr(task, "status") assert hasattr(task, "depth") assert hasattr(task, "attempts") assert hasattr(task, "max_attempts") assert hasattr(task, "version_constraint") assert hasattr(task, "parent_task_id") assert hasattr(task, "triggered_by_artifact") class TestEnqueueCacheTask: """Tests for enqueue_cache_task function.""" def test_normalize_package_name(self): """Test that package names are normalized per PEP 503.""" # Test the normalization pattern used in the worker test_cases = [ ("Requests", "requests"), ("typing_extensions", "typing-extensions"), ("some.package", "some-package"), ("UPPER_CASE", "upper-case"), ("mixed-Case_name", "mixed-case-name"), ] for input_name, expected in test_cases: normalized = re.sub(r"[-_.]+", "-", input_name).lower() assert normalized == expected, f"Failed for {input_name}" class TestCacheWorkerFunctions: """Tests for cache worker helper functions.""" def test_exponential_backoff_calculation(self): """Test that exponential backoff is calculated correctly.""" # The formula is: 30 * (2 ** (attempts - 1)) # Attempt 1 failed → 30s # Attempt 2 failed → 60s # Attempt 3 failed → 120s def calc_backoff(attempts): return 30 * (2 ** (attempts - 1)) assert calc_backoff(1) == 30 assert calc_backoff(2) == 60 assert calc_backoff(3) == 120 class TestPyPICacheAPIEndpoints: """Integration tests for PyPI cache API endpoints.""" @pytest.mark.integration def test_cache_status_endpoint(self): """Test GET /pypi/cache/status returns queue statistics.""" with httpx.Client(base_url=get_base_url(), timeout=30.0) as client: response = client.get("/pypi/cache/status") assert response.status_code == 200 data = response.json() assert "pending" in data assert "in_progress" in data assert "completed" in data assert "failed" in data # All values should be non-negative integers assert isinstance(data["pending"], int) assert isinstance(data["in_progress"], int) assert isinstance(data["completed"], int) assert isinstance(data["failed"], int) assert data["pending"] >= 0 assert data["in_progress"] >= 0 assert data["completed"] >= 0 assert data["failed"] >= 0 @pytest.mark.integration def test_cache_failed_endpoint(self): """Test GET /pypi/cache/failed returns list of failed tasks.""" with httpx.Client(base_url=get_base_url(), timeout=30.0) as client: response = client.get("/pypi/cache/failed") assert response.status_code == 200 data = response.json() assert isinstance(data, list) # If there are failed tasks, verify structure if data: task = data[0] assert "id" in task assert "package" in task assert "error" in task assert "attempts" in task assert "depth" in task @pytest.mark.integration def test_cache_failed_with_limit(self): """Test GET /pypi/cache/failed respects limit parameter.""" with httpx.Client(base_url=get_base_url(), timeout=30.0) as client: response = client.get("/pypi/cache/failed?limit=5") assert response.status_code == 200 data = response.json() assert isinstance(data, list) assert len(data) <= 5 @pytest.mark.integration def test_cache_retry_nonexistent_package(self): """Test POST /pypi/cache/retry/{package} returns 404 for unknown package.""" with httpx.Client(base_url=get_base_url(), timeout=30.0) as client: # Use a random package name that definitely doesn't exist response = client.post(f"/pypi/cache/retry/nonexistent-package-{uuid4().hex[:8]}") assert response.status_code == 404 # Check for "no failed" or "not found" in error message detail = response.json()["detail"].lower() assert "no failed" in detail or "not found" in detail @pytest.mark.integration def test_cache_retry_all_endpoint(self): """Test POST /pypi/cache/retry-all returns success.""" with httpx.Client(base_url=get_base_url(), timeout=30.0) as client: response = client.post("/pypi/cache/retry-all") assert response.status_code == 200 data = response.json() assert "count" in data assert "message" in data assert isinstance(data["count"], int) assert data["count"] >= 0 class TestCacheTaskDeduplication: """Tests for cache task deduplication logic.""" def test_find_cached_package_returns_none_for_uncached(self): """Test that _find_cached_package returns None for uncached packages.""" # This is a unit test pattern - mock the database from unittest.mock import MagicMock mock_db = MagicMock() mock_db.query.return_value.filter.return_value.first.return_value = None from app.pypi_cache_worker import _find_cached_package result = _find_cached_package(mock_db, "nonexistent-package") assert result is None class TestCacheWorkerConfiguration: """Tests for cache worker configuration.""" def test_config_settings_exist(self): """Test that PyPI cache config settings are available.""" from app.config import get_settings settings = get_settings() # Check that settings exist and have reasonable defaults assert hasattr(settings, "pypi_cache_workers") assert hasattr(settings, "pypi_cache_max_depth") assert hasattr(settings, "pypi_cache_max_attempts") # Check aliases work assert settings.PYPI_CACHE_WORKERS == settings.pypi_cache_workers assert settings.PYPI_CACHE_MAX_DEPTH == settings.pypi_cache_max_depth assert settings.PYPI_CACHE_MAX_ATTEMPTS == settings.pypi_cache_max_attempts def test_config_default_values(self): """Test that PyPI cache config has sensible defaults.""" from app.config import get_settings settings = get_settings() # These are the defaults from our implementation assert settings.pypi_cache_workers == 5 assert settings.pypi_cache_max_depth == 10 assert settings.pypi_cache_max_attempts == 3 class TestFetchAndCachePackage: """Tests for _fetch_and_cache_package function.""" def test_result_structure_success(self): """Test that success result has correct structure.""" # Mock a successful result result = {"success": True, "artifact_id": "abc123"} assert result["success"] is True assert "artifact_id" in result def test_result_structure_failure(self): """Test that failure result has correct structure.""" # Mock a failure result result = {"success": False, "error": "Package not found"} assert result["success"] is False assert "error" in result class TestWorkerPoolLifecycle: """Tests for worker pool initialization and shutdown.""" def test_init_shutdown_cycle(self): """Test that worker pool can be initialized and shut down cleanly.""" from app.pypi_cache_worker import ( init_cache_worker_pool, shutdown_cache_worker_pool, _cache_worker_pool, _cache_worker_running, ) # Note: We can't fully test this in isolation because the module # has global state and may conflict with the running server. # These tests verify the function signatures work. # The pool should be initialized by main.py on startup # We just verify the functions are callable assert callable(init_cache_worker_pool) assert callable(shutdown_cache_worker_pool) class TestNestedDependencyDepthTracking: """Tests for nested dependency depth tracking. When the cache worker downloads a package, its dependencies should be queued with depth = current_task_depth + 1, not depth = 0. """ def test_enqueue_with_depth_increments_for_nested_deps(self): """Test that enqueue_cache_task properly tracks depth for nested dependencies. When a task at depth=2 discovers a new dependency, that dependency should be queued at depth=3. """ from unittest.mock import MagicMock, patch from app.pypi_cache_worker import enqueue_cache_task mock_db = MagicMock() # No existing task for this package mock_db.query.return_value.filter.return_value.first.return_value = None # Mock _find_cached_package to return None (not cached) with patch('app.pypi_cache_worker._find_cached_package', return_value=None): task = enqueue_cache_task( mock_db, package_name="nested-dep", version_constraint=">=1.0", parent_task_id=None, depth=3, # Parent task was at depth 2, so this dep is at depth 3 triggered_by_artifact="abc123", ) # Verify db.add was called mock_db.add.assert_called_once() # Get the task that was added added_task = mock_db.add.call_args[0][0] # The task should have the correct depth assert added_task.depth == 3, f"Expected depth=3, got depth={added_task.depth}" assert added_task.package_name == "nested-dep" def test_proxy_download_accepts_cache_depth_param(self): """Test that proxy download endpoint accepts cache-depth query parameter. The cache worker should pass its current depth via query param so the proxy can queue dependencies at the correct depth. """ # Verify that pypi_download_file has a cache_depth parameter import inspect from app.pypi_proxy import pypi_download_file sig = inspect.signature(pypi_download_file) params = list(sig.parameters.keys()) # The endpoint should accept a cache_depth parameter assert 'cache_depth' in params, \ f"pypi_download_file should accept cache_depth parameter. Got params: {params}" def test_worker_sends_depth_in_url_when_fetching(self): """Test that _fetch_and_cache_package includes depth in download URL. When the worker fetches a package, it should include its current depth in the URL query params so nested dependencies get queued at depth+1. """ from unittest.mock import patch, MagicMock import httpx # We need to verify that the httpx.Client.get call includes the depth in URL with patch('app.pypi_cache_worker.httpx.Client') as mock_client_class: mock_client = MagicMock() mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client) mock_client_class.return_value.__exit__ = MagicMock(return_value=False) # Mock successful responses mock_response_index = MagicMock() mock_response_index.status_code = 200 mock_response_index.text = ''' test_pkg-1.0.0-py3-none-any.whl ''' mock_response_download = MagicMock() mock_response_download.status_code = 200 mock_response_download.headers = {"X-Checksum-SHA256": "abc123"} mock_client.get.side_effect = [mock_response_index, mock_response_download] from app.pypi_cache_worker import _fetch_and_cache_package_with_depth # This function should exist and accept depth parameter result = _fetch_and_cache_package_with_depth("test-pkg", None, depth=2) # Verify the download request included the cache-depth query param download_call = mock_client.get.call_args_list[1] download_url = download_call[0][0] # First positional arg is URL assert "cache-depth=2" in download_url, \ f"Expected cache-depth=2 in URL, got: {download_url}"