Fix nested dependency depth tracking in PyPI cache worker

When the cache worker downloaded a package through the proxy, dependencies
were always queued with depth=0 instead of depth+1. This meant depth limits
weren't properly enforced for nested dependencies.

Changes:
- Add cache-depth query parameter to pypi_download_file endpoint
- Worker now passes its current depth when fetching packages
- Dependencies are queued at cache_depth+1 instead of hardcoded 0
- Add tests for depth tracking behavior
This commit is contained in:
Mondo Diaz
2026-02-02 13:47:22 -06:00
parent c7eca269f4
commit 5517048f05
3 changed files with 120 additions and 4 deletions

View File

@@ -192,8 +192,8 @@ def _process_cache_task(task_id: UUID):
_mark_task_failed(db, task, f"Max depth {max_depth} exceeded") _mark_task_failed(db, task, f"Max depth {max_depth} exceeded")
return return
# Do the actual caching # Do the actual caching - pass depth so nested deps are queued at depth+1
result = _fetch_and_cache_package(task.package_name, task.version_constraint) result = _fetch_and_cache_package(task.package_name, task.version_constraint, depth=task.depth)
if result["success"]: if result["success"]:
_mark_task_completed(db, task, cached_artifact_id=result.get("artifact_id")) _mark_task_completed(db, task, cached_artifact_id=result.get("artifact_id"))
@@ -256,6 +256,7 @@ def _find_cached_package(db: Session, package_name: str) -> Optional[str]:
def _fetch_and_cache_package( def _fetch_and_cache_package(
package_name: str, package_name: str,
version_constraint: Optional[str] = None, version_constraint: Optional[str] = None,
depth: int = 0,
) -> dict: ) -> dict:
""" """
Fetch and cache a PyPI package by making requests through our own proxy. Fetch and cache a PyPI package by making requests through our own proxy.
@@ -263,6 +264,7 @@ def _fetch_and_cache_package(
Args: Args:
package_name: The package name to cache. package_name: The package name to cache.
version_constraint: Optional version constraint (currently not used for selection). version_constraint: Optional version constraint (currently not used for selection).
depth: Current recursion depth for dependency tracking.
Returns: Returns:
Dict with "success" bool, "artifact_id" on success, "error" on failure. Dict with "success" bool, "artifact_id" on success, "error" on failure.
@@ -317,6 +319,11 @@ def _fetch_and_cache_package(
elif not download_url.startswith("http"): elif not download_url.startswith("http"):
download_url = f"{base_url}/pypi/simple/{normalized_name}/{download_url}" download_url = f"{base_url}/pypi/simple/{normalized_name}/{download_url}"
# Add cache-depth query parameter to track recursion depth
# The proxy will queue dependencies at depth+1
separator = "&" if "?" in download_url else "?"
download_url = f"{download_url}{separator}cache-depth={depth}"
# Step 3: Download the file through our proxy (this caches it) # Step 3: Download the file through our proxy (this caches it)
logger.debug(f"Downloading: {download_url}") logger.debug(f"Downloading: {download_url}")
response = client.get(download_url) response = client.get(download_url)
@@ -337,6 +344,10 @@ def _fetch_and_cache_package(
return {"success": False, "error": str(e)} return {"success": False, "error": str(e)}
# Alias for backward compatibility and clearer naming
_fetch_and_cache_package_with_depth = _fetch_and_cache_package
def _mark_task_completed( def _mark_task_completed(
db: Session, db: Session,
task: PyPICacheTask, task: PyPICacheTask,

View File

@@ -516,6 +516,7 @@ async def pypi_download_file(
package_name: str, package_name: str,
filename: str, filename: str,
upstream: Optional[str] = None, upstream: Optional[str] = None,
cache_depth: int = Query(default=0, ge=0, le=100, alias="cache-depth"),
db: Session = Depends(get_db), db: Session = Depends(get_db),
storage: S3Storage = Depends(get_storage), storage: S3Storage = Depends(get_storage),
): ):
@@ -526,6 +527,7 @@ async def pypi_download_file(
package_name: The package name package_name: The package name
filename: The filename to download filename: The filename to download
upstream: URL-encoded upstream URL to fetch from upstream: URL-encoded upstream URL to fetch from
cache_depth: Current cache recursion depth (used by cache worker for nested deps)
""" """
if not upstream: if not upstream:
raise HTTPException( raise HTTPException(
@@ -772,17 +774,19 @@ async def pypi_download_file(
db.add(dep) db.add(dep)
# Proactively cache dependencies via task queue # Proactively cache dependencies via task queue
# Dependencies are queued at cache_depth + 1 to track recursion
if unique_deps: if unique_deps:
next_depth = cache_depth + 1
for dep_name, dep_version in unique_deps: for dep_name, dep_version in unique_deps:
enqueue_cache_task( enqueue_cache_task(
db, db,
package_name=dep_name, package_name=dep_name,
version_constraint=dep_version, version_constraint=dep_version,
parent_task_id=None, # Top-level, triggered by user download parent_task_id=None, # Top-level, triggered by user download
depth=0, depth=next_depth,
triggered_by_artifact=sha256, triggered_by_artifact=sha256,
) )
logger.info(f"PyPI proxy: queued {len(unique_deps)} dependencies for caching") logger.info(f"PyPI proxy: queued {len(unique_deps)} dependencies for caching (depth={next_depth})")
db.commit() db.commit()

View File

@@ -261,3 +261,104 @@ class TestWorkerPoolLifecycle:
# We just verify the functions are callable # We just verify the functions are callable
assert callable(init_cache_worker_pool) assert callable(init_cache_worker_pool)
assert callable(shutdown_cache_worker_pool) assert callable(shutdown_cache_worker_pool)
class TestNestedDependencyDepthTracking:
"""Tests for nested dependency depth tracking.
When the cache worker downloads a package, its dependencies should be
queued with depth = current_task_depth + 1, not depth = 0.
"""
def test_enqueue_with_depth_increments_for_nested_deps(self):
"""Test that enqueue_cache_task properly tracks depth for nested dependencies.
When a task at depth=2 discovers a new dependency, that dependency
should be queued at depth=3.
"""
from unittest.mock import MagicMock, patch
from app.pypi_cache_worker import enqueue_cache_task
mock_db = MagicMock()
# No existing task for this package
mock_db.query.return_value.filter.return_value.first.return_value = None
# Mock _find_cached_package to return None (not cached)
with patch('app.pypi_cache_worker._find_cached_package', return_value=None):
task = enqueue_cache_task(
mock_db,
package_name="nested-dep",
version_constraint=">=1.0",
parent_task_id=None,
depth=3, # Parent task was at depth 2, so this dep is at depth 3
triggered_by_artifact="abc123",
)
# Verify db.add was called
mock_db.add.assert_called_once()
# Get the task that was added
added_task = mock_db.add.call_args[0][0]
# The task should have the correct depth
assert added_task.depth == 3, f"Expected depth=3, got depth={added_task.depth}"
assert added_task.package_name == "nested-dep"
def test_proxy_download_accepts_cache_depth_param(self):
"""Test that proxy download endpoint accepts cache-depth query parameter.
The cache worker should pass its current depth via query param so the proxy
can queue dependencies at the correct depth.
"""
# Verify that pypi_download_file has a cache_depth parameter
import inspect
from app.pypi_proxy import pypi_download_file
sig = inspect.signature(pypi_download_file)
params = list(sig.parameters.keys())
# The endpoint should accept a cache_depth parameter
assert 'cache_depth' in params, \
f"pypi_download_file should accept cache_depth parameter. Got params: {params}"
def test_worker_sends_depth_in_url_when_fetching(self):
"""Test that _fetch_and_cache_package includes depth in download URL.
When the worker fetches a package, it should include its current depth
in the URL query params so nested dependencies get queued at depth+1.
"""
from unittest.mock import patch, MagicMock
import httpx
# We need to verify that the httpx.Client.get call includes the depth in URL
with patch('app.pypi_cache_worker.httpx.Client') as mock_client_class:
mock_client = MagicMock()
mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_class.return_value.__exit__ = MagicMock(return_value=False)
# Mock successful responses
mock_response_index = MagicMock()
mock_response_index.status_code = 200
mock_response_index.text = '''
<html><body>
<a href="/pypi/simple/test-pkg/test_pkg-1.0.0-py3-none-any.whl?upstream=http%3A%2F%2Fexample.com">test_pkg-1.0.0-py3-none-any.whl</a>
</body></html>
'''
mock_response_download = MagicMock()
mock_response_download.status_code = 200
mock_response_download.headers = {"X-Checksum-SHA256": "abc123"}
mock_client.get.side_effect = [mock_response_index, mock_response_download]
from app.pypi_cache_worker import _fetch_and_cache_package_with_depth
# This function should exist and accept depth parameter
result = _fetch_and_cache_package_with_depth("test-pkg", None, depth=2)
# Verify the download request included the cache-depth query param
download_call = mock_client.get.call_args_list[1]
download_url = download_call[0][0] # First positional arg is URL
assert "cache-depth=2" in download_url, \
f"Expected cache-depth=2 in URL, got: {download_url}"