Files
orchard/backend/tests/test_pypi_cache_worker.py
Mondo Diaz 5517048f05 Fix nested dependency depth tracking in PyPI cache worker
When the cache worker downloaded a package through the proxy, dependencies
were always queued with depth=0 instead of depth+1. This meant depth limits
weren't properly enforced for nested dependencies.

Changes:
- Add cache-depth query parameter to pypi_download_file endpoint
- Worker now passes its current depth when fetching packages
- Dependencies are queued at cache_depth+1 instead of hardcoded 0
- Add tests for depth tracking behavior
2026-02-05 09:15:08 -06:00

365 lines
14 KiB
Python

"""Tests for PyPI cache worker module."""
import os
import pytest
import re
from datetime import datetime, timedelta
from unittest.mock import MagicMock, patch
from uuid import uuid4
import httpx
def get_base_url():
"""Get the base URL for the Orchard server from environment."""
return os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
class TestPyPICacheTaskModel:
"""Tests for PyPICacheTask model."""
def test_model_creation(self):
"""Test that PyPICacheTask model can be instantiated with explicit values."""
from app.models import PyPICacheTask
task = PyPICacheTask(
package_name="requests",
version_constraint=">=2.25.0",
depth=0,
status="pending",
attempts=0,
max_attempts=3,
)
assert task.package_name == "requests"
assert task.version_constraint == ">=2.25.0"
assert task.depth == 0
assert task.status == "pending"
assert task.attempts == 0
assert task.max_attempts == 3
def test_model_fields_exist(self):
"""Test that PyPICacheTask has all expected fields."""
from app.models import PyPICacheTask
# Create with minimal required field
task = PyPICacheTask(package_name="urllib3")
# Verify all expected attributes exist (SQLAlchemy defaults apply on flush)
assert hasattr(task, "status")
assert hasattr(task, "depth")
assert hasattr(task, "attempts")
assert hasattr(task, "max_attempts")
assert hasattr(task, "version_constraint")
assert hasattr(task, "parent_task_id")
assert hasattr(task, "triggered_by_artifact")
class TestEnqueueCacheTask:
"""Tests for enqueue_cache_task function."""
def test_normalize_package_name(self):
"""Test that package names are normalized per PEP 503."""
# Test the normalization pattern used in the worker
test_cases = [
("Requests", "requests"),
("typing_extensions", "typing-extensions"),
("some.package", "some-package"),
("UPPER_CASE", "upper-case"),
("mixed-Case_name", "mixed-case-name"),
]
for input_name, expected in test_cases:
normalized = re.sub(r"[-_.]+", "-", input_name).lower()
assert normalized == expected, f"Failed for {input_name}"
class TestCacheWorkerFunctions:
"""Tests for cache worker helper functions."""
def test_exponential_backoff_calculation(self):
"""Test that exponential backoff is calculated correctly."""
# The formula is: 30 * (2 ** (attempts - 1))
# Attempt 1 failed → 30s
# Attempt 2 failed → 60s
# Attempt 3 failed → 120s
def calc_backoff(attempts):
return 30 * (2 ** (attempts - 1))
assert calc_backoff(1) == 30
assert calc_backoff(2) == 60
assert calc_backoff(3) == 120
class TestPyPICacheAPIEndpoints:
"""Integration tests for PyPI cache API endpoints."""
@pytest.mark.integration
def test_cache_status_endpoint(self):
"""Test GET /pypi/cache/status returns queue statistics."""
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
response = client.get("/pypi/cache/status")
assert response.status_code == 200
data = response.json()
assert "pending" in data
assert "in_progress" in data
assert "completed" in data
assert "failed" in data
# All values should be non-negative integers
assert isinstance(data["pending"], int)
assert isinstance(data["in_progress"], int)
assert isinstance(data["completed"], int)
assert isinstance(data["failed"], int)
assert data["pending"] >= 0
assert data["in_progress"] >= 0
assert data["completed"] >= 0
assert data["failed"] >= 0
@pytest.mark.integration
def test_cache_failed_endpoint(self):
"""Test GET /pypi/cache/failed returns list of failed tasks."""
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
response = client.get("/pypi/cache/failed")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
# If there are failed tasks, verify structure
if data:
task = data[0]
assert "id" in task
assert "package" in task
assert "error" in task
assert "attempts" in task
assert "depth" in task
@pytest.mark.integration
def test_cache_failed_with_limit(self):
"""Test GET /pypi/cache/failed respects limit parameter."""
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
response = client.get("/pypi/cache/failed?limit=5")
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) <= 5
@pytest.mark.integration
def test_cache_retry_nonexistent_package(self):
"""Test POST /pypi/cache/retry/{package} returns 404 for unknown package."""
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
# Use a random package name that definitely doesn't exist
response = client.post(f"/pypi/cache/retry/nonexistent-package-{uuid4().hex[:8]}")
assert response.status_code == 404
# Check for "no failed" or "not found" in error message
detail = response.json()["detail"].lower()
assert "no failed" in detail or "not found" in detail
@pytest.mark.integration
def test_cache_retry_all_endpoint(self):
"""Test POST /pypi/cache/retry-all returns success."""
with httpx.Client(base_url=get_base_url(), timeout=30.0) as client:
response = client.post("/pypi/cache/retry-all")
assert response.status_code == 200
data = response.json()
assert "count" in data
assert "message" in data
assert isinstance(data["count"], int)
assert data["count"] >= 0
class TestCacheTaskDeduplication:
"""Tests for cache task deduplication logic."""
def test_find_cached_package_returns_none_for_uncached(self):
"""Test that _find_cached_package returns None for uncached packages."""
# This is a unit test pattern - mock the database
from unittest.mock import MagicMock
mock_db = MagicMock()
mock_db.query.return_value.filter.return_value.first.return_value = None
from app.pypi_cache_worker import _find_cached_package
result = _find_cached_package(mock_db, "nonexistent-package")
assert result is None
class TestCacheWorkerConfiguration:
"""Tests for cache worker configuration."""
def test_config_settings_exist(self):
"""Test that PyPI cache config settings are available."""
from app.config import get_settings
settings = get_settings()
# Check that settings exist and have reasonable defaults
assert hasattr(settings, "pypi_cache_workers")
assert hasattr(settings, "pypi_cache_max_depth")
assert hasattr(settings, "pypi_cache_max_attempts")
# Check aliases work
assert settings.PYPI_CACHE_WORKERS == settings.pypi_cache_workers
assert settings.PYPI_CACHE_MAX_DEPTH == settings.pypi_cache_max_depth
assert settings.PYPI_CACHE_MAX_ATTEMPTS == settings.pypi_cache_max_attempts
def test_config_default_values(self):
"""Test that PyPI cache config has sensible defaults."""
from app.config import get_settings
settings = get_settings()
# These are the defaults from our implementation
assert settings.pypi_cache_workers == 5
assert settings.pypi_cache_max_depth == 10
assert settings.pypi_cache_max_attempts == 3
class TestFetchAndCachePackage:
"""Tests for _fetch_and_cache_package function."""
def test_result_structure_success(self):
"""Test that success result has correct structure."""
# Mock a successful result
result = {"success": True, "artifact_id": "abc123"}
assert result["success"] is True
assert "artifact_id" in result
def test_result_structure_failure(self):
"""Test that failure result has correct structure."""
# Mock a failure result
result = {"success": False, "error": "Package not found"}
assert result["success"] is False
assert "error" in result
class TestWorkerPoolLifecycle:
"""Tests for worker pool initialization and shutdown."""
def test_init_shutdown_cycle(self):
"""Test that worker pool can be initialized and shut down cleanly."""
from app.pypi_cache_worker import (
init_cache_worker_pool,
shutdown_cache_worker_pool,
_cache_worker_pool,
_cache_worker_running,
)
# Note: We can't fully test this in isolation because the module
# has global state and may conflict with the running server.
# These tests verify the function signatures work.
# The pool should be initialized by main.py on startup
# We just verify the functions are callable
assert callable(init_cache_worker_pool)
assert callable(shutdown_cache_worker_pool)
class TestNestedDependencyDepthTracking:
"""Tests for nested dependency depth tracking.
When the cache worker downloads a package, its dependencies should be
queued with depth = current_task_depth + 1, not depth = 0.
"""
def test_enqueue_with_depth_increments_for_nested_deps(self):
"""Test that enqueue_cache_task properly tracks depth for nested dependencies.
When a task at depth=2 discovers a new dependency, that dependency
should be queued at depth=3.
"""
from unittest.mock import MagicMock, patch
from app.pypi_cache_worker import enqueue_cache_task
mock_db = MagicMock()
# No existing task for this package
mock_db.query.return_value.filter.return_value.first.return_value = None
# Mock _find_cached_package to return None (not cached)
with patch('app.pypi_cache_worker._find_cached_package', return_value=None):
task = enqueue_cache_task(
mock_db,
package_name="nested-dep",
version_constraint=">=1.0",
parent_task_id=None,
depth=3, # Parent task was at depth 2, so this dep is at depth 3
triggered_by_artifact="abc123",
)
# Verify db.add was called
mock_db.add.assert_called_once()
# Get the task that was added
added_task = mock_db.add.call_args[0][0]
# The task should have the correct depth
assert added_task.depth == 3, f"Expected depth=3, got depth={added_task.depth}"
assert added_task.package_name == "nested-dep"
def test_proxy_download_accepts_cache_depth_param(self):
"""Test that proxy download endpoint accepts cache-depth query parameter.
The cache worker should pass its current depth via query param so the proxy
can queue dependencies at the correct depth.
"""
# Verify that pypi_download_file has a cache_depth parameter
import inspect
from app.pypi_proxy import pypi_download_file
sig = inspect.signature(pypi_download_file)
params = list(sig.parameters.keys())
# The endpoint should accept a cache_depth parameter
assert 'cache_depth' in params, \
f"pypi_download_file should accept cache_depth parameter. Got params: {params}"
def test_worker_sends_depth_in_url_when_fetching(self):
"""Test that _fetch_and_cache_package includes depth in download URL.
When the worker fetches a package, it should include its current depth
in the URL query params so nested dependencies get queued at depth+1.
"""
from unittest.mock import patch, MagicMock
import httpx
# We need to verify that the httpx.Client.get call includes the depth in URL
with patch('app.pypi_cache_worker.httpx.Client') as mock_client_class:
mock_client = MagicMock()
mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_class.return_value.__exit__ = MagicMock(return_value=False)
# Mock successful responses
mock_response_index = MagicMock()
mock_response_index.status_code = 200
mock_response_index.text = '''
<html><body>
<a href="/pypi/simple/test-pkg/test_pkg-1.0.0-py3-none-any.whl?upstream=http%3A%2F%2Fexample.com">test_pkg-1.0.0-py3-none-any.whl</a>
</body></html>
'''
mock_response_download = MagicMock()
mock_response_download.status_code = 200
mock_response_download.headers = {"X-Checksum-SHA256": "abc123"}
mock_client.get.side_effect = [mock_response_index, mock_response_download]
from app.pypi_cache_worker import _fetch_and_cache_package_with_depth
# This function should exist and accept depth parameter
result = _fetch_and_cache_package_with_depth("test-pkg", None, depth=2)
# Verify the download request included the cache-depth query param
download_call = mock_client.get.call_args_list[1]
download_url = download_call[0][0] # First positional arg is URL
assert "cache-depth=2" in download_url, \
f"Expected cache-depth=2 in URL, got: {download_url}"