When the cache worker downloaded a package through the proxy, dependencies were always queued with depth=0 instead of depth+1. This meant depth limits weren't properly enforced for nested dependencies. Changes: - Add cache-depth query parameter to pypi_download_file endpoint - Worker now passes its current depth when fetching packages - Dependencies are queued at cache_depth+1 instead of hardcoded 0 - Add tests for depth tracking behavior
365 lines
14 KiB
Python
365 lines
14 KiB
Python
"""Tests for PyPI cache worker module."""
|
|
|
|
import os
|
|
import pytest
|
|
import re
|
|
from datetime import datetime, timedelta
|
|
from unittest.mock import MagicMock, patch
|
|
from uuid import uuid4
|
|
|
|
import httpx
|
|
|
|
|
|
def get_base_url():
|
|
"""Get the base URL for the Orchard server from environment."""
|
|
return os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
|
|
|
|
|
class TestPyPICacheTaskModel:
|
|
"""Tests for PyPICacheTask model."""
|
|
|
|
def test_model_creation(self):
|
|
"""Test that PyPICacheTask model can be instantiated with explicit values."""
|
|
from app.models import PyPICacheTask
|
|
|
|
task = PyPICacheTask(
|
|
package_name="requests",
|
|
version_constraint=">=2.25.0",
|
|
depth=0,
|
|
status="pending",
|
|
attempts=0,
|
|
max_attempts=3,
|
|
)
|
|
|
|
assert task.package_name == "requests"
|
|
assert task.version_constraint == ">=2.25.0"
|
|
assert task.depth == 0
|
|
assert task.status == "pending"
|
|
assert task.attempts == 0
|
|
assert task.max_attempts == 3
|
|
|
|
def test_model_fields_exist(self):
|
|
"""Test that PyPICacheTask has all expected fields."""
|
|
from app.models import PyPICacheTask
|
|
|
|
# Create with minimal required field
|
|
task = PyPICacheTask(package_name="urllib3")
|
|
|
|
# Verify all expected attributes exist (SQLAlchemy defaults apply on flush)
|
|
assert hasattr(task, "status")
|
|
assert hasattr(task, "depth")
|
|
assert hasattr(task, "attempts")
|
|
assert hasattr(task, "max_attempts")
|
|
assert hasattr(task, "version_constraint")
|
|
assert hasattr(task, "parent_task_id")
|
|
assert hasattr(task, "triggered_by_artifact")
|
|
|
|
|
|
class TestEnqueueCacheTask:
|
|
"""Tests for enqueue_cache_task function."""
|
|
|
|
def test_normalize_package_name(self):
|
|
"""Test that package names are normalized per PEP 503."""
|
|
# Test the normalization pattern used in the worker
|
|
test_cases = [
|
|
("Requests", "requests"),
|
|
("typing_extensions", "typing-extensions"),
|
|
("some.package", "some-package"),
|
|
("UPPER_CASE", "upper-case"),
|
|
("mixed-Case_name", "mixed-case-name"),
|
|
]
|
|
|
|
for input_name, expected in test_cases:
|
|
normalized = re.sub(r"[-_.]+", "-", input_name).lower()
|
|
assert normalized == expected, f"Failed for {input_name}"
|
|
|
|
|
|
class TestCacheWorkerFunctions:
|
|
"""Tests for cache worker helper functions."""
|
|
|
|
def test_exponential_backoff_calculation(self):
|
|
"""Test that exponential backoff is calculated correctly."""
|
|
# The formula is: 30 * (2 ** (attempts - 1))
|
|
# Attempt 1 failed → 30s
|
|
# Attempt 2 failed → 60s
|
|
# Attempt 3 failed → 120s
|
|
|
|
def calc_backoff(attempts):
|
|
return 30 * (2 ** (attempts - 1))
|
|
|
|
assert calc_backoff(1) == 30
|
|
assert calc_backoff(2) == 60
|
|
assert calc_backoff(3) == 120
|
|
|
|
|
|
class TestPyPICacheAPIEndpoints:
|
|
"""Integration tests for PyPI cache API endpoints."""
|
|
|
|
@pytest.mark.integration
|
|
def test_cache_status_endpoint(self):
|
|
"""Test GET /pypi/cache/status returns queue statistics."""
|
|
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
|
|
response = client.get("/pypi/cache/status")
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
assert "pending" in data
|
|
assert "in_progress" in data
|
|
assert "completed" in data
|
|
assert "failed" in data
|
|
|
|
# All values should be non-negative integers
|
|
assert isinstance(data["pending"], int)
|
|
assert isinstance(data["in_progress"], int)
|
|
assert isinstance(data["completed"], int)
|
|
assert isinstance(data["failed"], int)
|
|
assert data["pending"] >= 0
|
|
assert data["in_progress"] >= 0
|
|
assert data["completed"] >= 0
|
|
assert data["failed"] >= 0
|
|
|
|
@pytest.mark.integration
|
|
def test_cache_failed_endpoint(self):
|
|
"""Test GET /pypi/cache/failed returns list of failed tasks."""
|
|
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
|
|
response = client.get("/pypi/cache/failed")
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
|
|
# If there are failed tasks, verify structure
|
|
if data:
|
|
task = data[0]
|
|
assert "id" in task
|
|
assert "package" in task
|
|
assert "error" in task
|
|
assert "attempts" in task
|
|
assert "depth" in task
|
|
|
|
@pytest.mark.integration
|
|
def test_cache_failed_with_limit(self):
|
|
"""Test GET /pypi/cache/failed respects limit parameter."""
|
|
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
|
|
response = client.get("/pypi/cache/failed?limit=5")
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
assert len(data) <= 5
|
|
|
|
@pytest.mark.integration
|
|
def test_cache_retry_nonexistent_package(self):
|
|
"""Test POST /pypi/cache/retry/{package} returns 404 for unknown package."""
|
|
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
|
|
# Use a random package name that definitely doesn't exist
|
|
response = client.post(f"/pypi/cache/retry/nonexistent-package-{uuid4().hex[:8]}")
|
|
assert response.status_code == 404
|
|
# Check for "no failed" or "not found" in error message
|
|
detail = response.json()["detail"].lower()
|
|
assert "no failed" in detail or "not found" in detail
|
|
|
|
@pytest.mark.integration
|
|
def test_cache_retry_all_endpoint(self):
|
|
"""Test POST /pypi/cache/retry-all returns success."""
|
|
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
|
|
response = client.post("/pypi/cache/retry-all")
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
assert "count" in data
|
|
assert "message" in data
|
|
assert isinstance(data["count"], int)
|
|
assert data["count"] >= 0
|
|
|
|
|
|
class TestCacheTaskDeduplication:
|
|
"""Tests for cache task deduplication logic."""
|
|
|
|
def test_find_cached_package_returns_none_for_uncached(self):
|
|
"""Test that _find_cached_package returns None for uncached packages."""
|
|
# This is a unit test pattern - mock the database
|
|
from unittest.mock import MagicMock
|
|
|
|
mock_db = MagicMock()
|
|
mock_db.query.return_value.filter.return_value.first.return_value = None
|
|
|
|
from app.pypi_cache_worker import _find_cached_package
|
|
|
|
result = _find_cached_package(mock_db, "nonexistent-package")
|
|
assert result is None
|
|
|
|
|
|
class TestCacheWorkerConfiguration:
|
|
"""Tests for cache worker configuration."""
|
|
|
|
def test_config_settings_exist(self):
|
|
"""Test that PyPI cache config settings are available."""
|
|
from app.config import get_settings
|
|
|
|
settings = get_settings()
|
|
|
|
# Check that settings exist and have reasonable defaults
|
|
assert hasattr(settings, "pypi_cache_workers")
|
|
assert hasattr(settings, "pypi_cache_max_depth")
|
|
assert hasattr(settings, "pypi_cache_max_attempts")
|
|
|
|
# Check aliases work
|
|
assert settings.PYPI_CACHE_WORKERS == settings.pypi_cache_workers
|
|
assert settings.PYPI_CACHE_MAX_DEPTH == settings.pypi_cache_max_depth
|
|
assert settings.PYPI_CACHE_MAX_ATTEMPTS == settings.pypi_cache_max_attempts
|
|
|
|
def test_config_default_values(self):
|
|
"""Test that PyPI cache config has sensible defaults."""
|
|
from app.config import get_settings
|
|
|
|
settings = get_settings()
|
|
|
|
# These are the defaults from our implementation
|
|
assert settings.pypi_cache_workers == 5
|
|
assert settings.pypi_cache_max_depth == 10
|
|
assert settings.pypi_cache_max_attempts == 3
|
|
|
|
|
|
class TestFetchAndCachePackage:
|
|
"""Tests for _fetch_and_cache_package function."""
|
|
|
|
def test_result_structure_success(self):
|
|
"""Test that success result has correct structure."""
|
|
# Mock a successful result
|
|
result = {"success": True, "artifact_id": "abc123"}
|
|
|
|
assert result["success"] is True
|
|
assert "artifact_id" in result
|
|
|
|
def test_result_structure_failure(self):
|
|
"""Test that failure result has correct structure."""
|
|
# Mock a failure result
|
|
result = {"success": False, "error": "Package not found"}
|
|
|
|
assert result["success"] is False
|
|
assert "error" in result
|
|
|
|
|
|
class TestWorkerPoolLifecycle:
|
|
"""Tests for worker pool initialization and shutdown."""
|
|
|
|
def test_init_shutdown_cycle(self):
|
|
"""Test that worker pool can be initialized and shut down cleanly."""
|
|
from app.pypi_cache_worker import (
|
|
init_cache_worker_pool,
|
|
shutdown_cache_worker_pool,
|
|
_cache_worker_pool,
|
|
_cache_worker_running,
|
|
)
|
|
|
|
# Note: We can't fully test this in isolation because the module
|
|
# has global state and may conflict with the running server.
|
|
# These tests verify the function signatures work.
|
|
|
|
# The pool should be initialized by main.py on startup
|
|
# We just verify the functions are callable
|
|
assert callable(init_cache_worker_pool)
|
|
assert callable(shutdown_cache_worker_pool)
|
|
|
|
|
|
class TestNestedDependencyDepthTracking:
|
|
"""Tests for nested dependency depth tracking.
|
|
|
|
When the cache worker downloads a package, its dependencies should be
|
|
queued with depth = current_task_depth + 1, not depth = 0.
|
|
"""
|
|
|
|
def test_enqueue_with_depth_increments_for_nested_deps(self):
|
|
"""Test that enqueue_cache_task properly tracks depth for nested dependencies.
|
|
|
|
When a task at depth=2 discovers a new dependency, that dependency
|
|
should be queued at depth=3.
|
|
"""
|
|
from unittest.mock import MagicMock, patch
|
|
from app.pypi_cache_worker import enqueue_cache_task
|
|
|
|
mock_db = MagicMock()
|
|
|
|
# No existing task for this package
|
|
mock_db.query.return_value.filter.return_value.first.return_value = None
|
|
|
|
# Mock _find_cached_package to return None (not cached)
|
|
with patch('app.pypi_cache_worker._find_cached_package', return_value=None):
|
|
task = enqueue_cache_task(
|
|
mock_db,
|
|
package_name="nested-dep",
|
|
version_constraint=">=1.0",
|
|
parent_task_id=None,
|
|
depth=3, # Parent task was at depth 2, so this dep is at depth 3
|
|
triggered_by_artifact="abc123",
|
|
)
|
|
|
|
# Verify db.add was called
|
|
mock_db.add.assert_called_once()
|
|
|
|
# Get the task that was added
|
|
added_task = mock_db.add.call_args[0][0]
|
|
|
|
# The task should have the correct depth
|
|
assert added_task.depth == 3, f"Expected depth=3, got depth={added_task.depth}"
|
|
assert added_task.package_name == "nested-dep"
|
|
|
|
def test_proxy_download_accepts_cache_depth_param(self):
|
|
"""Test that proxy download endpoint accepts cache-depth query parameter.
|
|
|
|
The cache worker should pass its current depth via query param so the proxy
|
|
can queue dependencies at the correct depth.
|
|
"""
|
|
# Verify that pypi_download_file has a cache_depth parameter
|
|
import inspect
|
|
from app.pypi_proxy import pypi_download_file
|
|
|
|
sig = inspect.signature(pypi_download_file)
|
|
params = list(sig.parameters.keys())
|
|
|
|
# The endpoint should accept a cache_depth parameter
|
|
assert 'cache_depth' in params, \
|
|
f"pypi_download_file should accept cache_depth parameter. Got params: {params}"
|
|
|
|
def test_worker_sends_depth_in_url_when_fetching(self):
|
|
"""Test that _fetch_and_cache_package includes depth in download URL.
|
|
|
|
When the worker fetches a package, it should include its current depth
|
|
in the URL query params so nested dependencies get queued at depth+1.
|
|
"""
|
|
from unittest.mock import patch, MagicMock
|
|
import httpx
|
|
|
|
# We need to verify that the httpx.Client.get call includes the depth in URL
|
|
with patch('app.pypi_cache_worker.httpx.Client') as mock_client_class:
|
|
mock_client = MagicMock()
|
|
mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client)
|
|
mock_client_class.return_value.__exit__ = MagicMock(return_value=False)
|
|
|
|
# Mock successful responses
|
|
mock_response_index = MagicMock()
|
|
mock_response_index.status_code = 200
|
|
mock_response_index.text = '''
|
|
<html><body>
|
|
<a href="/pypi/simple/test-pkg/test_pkg-1.0.0-py3-none-any.whl?upstream=http%3A%2F%2Fexample.com">test_pkg-1.0.0-py3-none-any.whl</a>
|
|
</body></html>
|
|
'''
|
|
|
|
mock_response_download = MagicMock()
|
|
mock_response_download.status_code = 200
|
|
mock_response_download.headers = {"X-Checksum-SHA256": "abc123"}
|
|
|
|
mock_client.get.side_effect = [mock_response_index, mock_response_download]
|
|
|
|
from app.pypi_cache_worker import _fetch_and_cache_package_with_depth
|
|
|
|
# This function should exist and accept depth parameter
|
|
result = _fetch_and_cache_package_with_depth("test-pkg", None, depth=2)
|
|
|
|
# Verify the download request included the cache-depth query param
|
|
download_call = mock_client.get.call_args_list[1]
|
|
download_url = download_call[0][0] # First positional arg is URL
|
|
assert "cache-depth=2" in download_url, \
|
|
f"Expected cache-depth=2 in URL, got: {download_url}"
|