2 Commits

Author SHA1 Message Date
Mondo Diaz
1138309aaa Add Active Workers table to Background Jobs dashboard
Shows currently processing cache tasks in a dynamic table with:
- Package name and version constraint being cached
- Recursion depth and attempt number
- Start timestamp
- Pulsing indicator to show live activity

Backend changes:
- Add get_active_tasks() function to pypi_cache_worker.py
- Add GET /pypi/cache/active endpoint to pypi_proxy.py

Frontend changes:
- Add PyPICacheActiveTask type
- Add getPyPICacheActiveTasks() API function
- Add Active Workers section with animated table
- Auto-refreshes every 5 seconds with existing data
2026-02-02 13:50:45 -06:00
Mondo Diaz
3bdeade7ca Fix nested dependency depth tracking in PyPI cache worker
When the cache worker downloaded a package through the proxy, dependencies
were always queued with depth=0 instead of depth+1. This meant depth limits
weren't properly enforced for nested dependencies.

Changes:
- Add cache-depth query parameter to pypi_download_file endpoint
- Worker now passes its current depth when fetching packages
- Dependencies are queued at cache_depth+1 instead of hardcoded 0
- Add tests for depth tracking behavior
2026-02-02 13:47:22 -06:00
7 changed files with 345 additions and 51 deletions

View File

@@ -192,8 +192,8 @@ def _process_cache_task(task_id: UUID):
_mark_task_failed(db, task, f"Max depth {max_depth} exceeded")
return
# Do the actual caching
result = _fetch_and_cache_package(task.package_name, task.version_constraint)
# Do the actual caching - pass depth so nested deps are queued at depth+1
result = _fetch_and_cache_package(task.package_name, task.version_constraint, depth=task.depth)
if result["success"]:
_mark_task_completed(db, task, cached_artifact_id=result.get("artifact_id"))
@@ -256,6 +256,7 @@ def _find_cached_package(db: Session, package_name: str) -> Optional[str]:
def _fetch_and_cache_package(
package_name: str,
version_constraint: Optional[str] = None,
depth: int = 0,
) -> dict:
"""
Fetch and cache a PyPI package by making requests through our own proxy.
@@ -263,6 +264,7 @@ def _fetch_and_cache_package(
Args:
package_name: The package name to cache.
version_constraint: Optional version constraint (currently not used for selection).
depth: Current recursion depth for dependency tracking.
Returns:
Dict with "success" bool, "artifact_id" on success, "error" on failure.
@@ -317,6 +319,11 @@ def _fetch_and_cache_package(
elif not download_url.startswith("http"):
download_url = f"{base_url}/pypi/simple/{normalized_name}/{download_url}"
# Add cache-depth query parameter to track recursion depth
# The proxy will queue dependencies at depth+1
separator = "&" if "?" in download_url else "?"
download_url = f"{download_url}{separator}cache-depth={depth}"
# Step 3: Download the file through our proxy (this caches it)
logger.debug(f"Downloading: {download_url}")
response = client.get(download_url)
@@ -337,6 +344,10 @@ def _fetch_and_cache_package(
return {"success": False, "error": str(e)}
# Alias for backward compatibility and clearer naming
_fetch_and_cache_package_with_depth = _fetch_and_cache_package
def _mark_task_completed(
db: Session,
task: PyPICacheTask,
@@ -515,6 +526,38 @@ def get_failed_tasks(db: Session, limit: int = 50) -> List[dict]:
]
def get_active_tasks(db: Session, limit: int = 50) -> List[dict]:
"""
Get list of currently active (in_progress) tasks.
Args:
db: Database session.
limit: Maximum number of tasks to return.
Returns:
List of active task info dicts.
"""
tasks = (
db.query(PyPICacheTask)
.filter(PyPICacheTask.status == "in_progress")
.order_by(PyPICacheTask.started_at.desc())
.limit(limit)
.all()
)
return [
{
"id": str(task.id),
"package": task.package_name,
"version_constraint": task.version_constraint,
"depth": task.depth,
"attempts": task.attempts,
"started_at": task.started_at.isoformat() if task.started_at else None,
}
for task in tasks
]
def retry_failed_task(db: Session, package_name: str) -> Optional[PyPICacheTask]:
"""
Reset a failed task to retry.

View File

@@ -28,6 +28,7 @@ from .pypi_cache_worker import (
enqueue_cache_task,
get_cache_status,
get_failed_tasks,
get_active_tasks,
retry_failed_task,
retry_all_failed_tasks,
)
@@ -516,6 +517,7 @@ async def pypi_download_file(
package_name: str,
filename: str,
upstream: Optional[str] = None,
cache_depth: int = Query(default=0, ge=0, le=100, alias="cache-depth"),
db: Session = Depends(get_db),
storage: S3Storage = Depends(get_storage),
):
@@ -526,6 +528,7 @@ async def pypi_download_file(
package_name: The package name
filename: The filename to download
upstream: URL-encoded upstream URL to fetch from
cache_depth: Current cache recursion depth (used by cache worker for nested deps)
"""
if not upstream:
raise HTTPException(
@@ -772,17 +775,19 @@ async def pypi_download_file(
db.add(dep)
# Proactively cache dependencies via task queue
# Dependencies are queued at cache_depth + 1 to track recursion
if unique_deps:
next_depth = cache_depth + 1
for dep_name, dep_version in unique_deps:
enqueue_cache_task(
db,
package_name=dep_name,
version_constraint=dep_version,
parent_task_id=None, # Top-level, triggered by user download
depth=0,
depth=next_depth,
triggered_by_artifact=sha256,
)
logger.info(f"PyPI proxy: queued {len(unique_deps)} dependencies for caching")
logger.info(f"PyPI proxy: queued {len(unique_deps)} dependencies for caching (depth={next_depth})")
db.commit()
@@ -845,6 +850,25 @@ async def pypi_cache_failed(
return get_failed_tasks(db, limit=limit)
@router.get("/cache/active")
async def pypi_cache_active(
limit: int = Query(default=50, ge=1, le=500),
db: Session = Depends(get_db),
_current_user: User = Depends(require_admin),
):
"""
Get list of currently active (in_progress) cache tasks.
Shows what the cache workers are currently processing.
Args:
limit: Maximum number of tasks to return (default 50, max 500).
Requires admin privileges.
"""
return get_active_tasks(db, limit=limit)
@router.post("/cache/retry/{package_name}")
async def pypi_cache_retry(
package_name: str,

View File

@@ -261,3 +261,104 @@ class TestWorkerPoolLifecycle:
# We just verify the functions are callable
assert callable(init_cache_worker_pool)
assert callable(shutdown_cache_worker_pool)
class TestNestedDependencyDepthTracking:
"""Tests for nested dependency depth tracking.
When the cache worker downloads a package, its dependencies should be
queued with depth = current_task_depth + 1, not depth = 0.
"""
def test_enqueue_with_depth_increments_for_nested_deps(self):
"""Test that enqueue_cache_task properly tracks depth for nested dependencies.
When a task at depth=2 discovers a new dependency, that dependency
should be queued at depth=3.
"""
from unittest.mock import MagicMock, patch
from app.pypi_cache_worker import enqueue_cache_task
mock_db = MagicMock()
# No existing task for this package
mock_db.query.return_value.filter.return_value.first.return_value = None
# Mock _find_cached_package to return None (not cached)
with patch('app.pypi_cache_worker._find_cached_package', return_value=None):
task = enqueue_cache_task(
mock_db,
package_name="nested-dep",
version_constraint=">=1.0",
parent_task_id=None,
depth=3, # Parent task was at depth 2, so this dep is at depth 3
triggered_by_artifact="abc123",
)
# Verify db.add was called
mock_db.add.assert_called_once()
# Get the task that was added
added_task = mock_db.add.call_args[0][0]
# The task should have the correct depth
assert added_task.depth == 3, f"Expected depth=3, got depth={added_task.depth}"
assert added_task.package_name == "nested-dep"
def test_proxy_download_accepts_cache_depth_param(self):
"""Test that proxy download endpoint accepts cache-depth query parameter.
The cache worker should pass its current depth via query param so the proxy
can queue dependencies at the correct depth.
"""
# Verify that pypi_download_file has a cache_depth parameter
import inspect
from app.pypi_proxy import pypi_download_file
sig = inspect.signature(pypi_download_file)
params = list(sig.parameters.keys())
# The endpoint should accept a cache_depth parameter
assert 'cache_depth' in params, \
f"pypi_download_file should accept cache_depth parameter. Got params: {params}"
def test_worker_sends_depth_in_url_when_fetching(self):
"""Test that _fetch_and_cache_package includes depth in download URL.
When the worker fetches a package, it should include its current depth
in the URL query params so nested dependencies get queued at depth+1.
"""
from unittest.mock import patch, MagicMock
import httpx
# We need to verify that the httpx.Client.get call includes the depth in URL
with patch('app.pypi_cache_worker.httpx.Client') as mock_client_class:
mock_client = MagicMock()
mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client)
mock_client_class.return_value.__exit__ = MagicMock(return_value=False)
# Mock successful responses
mock_response_index = MagicMock()
mock_response_index.status_code = 200
mock_response_index.text = '''
<html><body>
<a href="/pypi/simple/test-pkg/test_pkg-1.0.0-py3-none-any.whl?upstream=http%3A%2F%2Fexample.com">test_pkg-1.0.0-py3-none-any.whl</a>
</body></html>
'''
mock_response_download = MagicMock()
mock_response_download.status_code = 200
mock_response_download.headers = {"X-Checksum-SHA256": "abc123"}
mock_client.get.side_effect = [mock_response_index, mock_response_download]
from app.pypi_cache_worker import _fetch_and_cache_package_with_depth
# This function should exist and accept depth parameter
result = _fetch_and_cache_package_with_depth("test-pkg", None, depth=2)
# Verify the download request included the cache-depth query param
download_call = mock_client.get.call_args_list[1]
download_url = download_call[0][0] # First positional arg is URL
assert "cache-depth=2" in download_url, \
f"Expected cache-depth=2 in URL, got: {download_url}"

View File

@@ -754,6 +754,7 @@ export async function testUpstreamSource(id: string): Promise<UpstreamSourceTest
import {
PyPICacheStatus,
PyPICacheTask,
PyPICacheActiveTask,
PyPICacheRetryResponse,
} from './types';
@@ -771,6 +772,13 @@ export async function getPyPICacheFailedTasks(limit: number = 50): Promise<PyPIC
return handleResponse<PyPICacheTask[]>(response);
}
export async function getPyPICacheActiveTasks(limit: number = 50): Promise<PyPICacheActiveTask[]> {
const response = await fetch(`/pypi/cache/active?limit=${limit}`, {
credentials: 'include',
});
return handleResponse<PyPICacheActiveTask[]>(response);
}
export async function retryPyPICacheTask(packageName: string): Promise<PyPICacheRetryResponse> {
const response = await fetch(`/pypi/cache/retry/${encodeURIComponent(packageName)}`, {
method: 'POST',

View File

@@ -271,6 +271,62 @@
font-size: 0.8rem;
}
/* Active Workers Section */
.active-workers-section {
margin-bottom: 1.5rem;
}
.active-workers-section h3 {
display: flex;
align-items: center;
gap: 0.5rem;
color: #3b82f6;
margin-top: 1rem;
}
.pulse-indicator {
width: 8px;
height: 8px;
background-color: #3b82f6;
border-radius: 50%;
animation: pulse 1.5s ease-in-out infinite;
}
@keyframes pulse {
0%, 100% {
opacity: 1;
transform: scale(1);
}
50% {
opacity: 0.5;
transform: scale(1.2);
}
}
.jobs-table.active-table {
border: 1px solid #3b82f6;
border-radius: 4px;
}
.jobs-table.active-table th {
background: #eff6ff;
color: #1d4ed8;
}
.jobs-table .active-row {
background-color: #f0f9ff;
}
.jobs-table .active-row:hover {
background-color: #e0f2fe;
}
.jobs-table .version-constraint {
font-family: monospace;
font-size: 0.85rem;
color: var(--text-secondary);
}
/* Responsive */
@media (max-width: 768px) {
.status-cards {

View File

@@ -4,10 +4,11 @@ import { useAuth } from '../contexts/AuthContext';
import {
getPyPICacheStatus,
getPyPICacheFailedTasks,
getPyPICacheActiveTasks,
retryPyPICacheTask,
retryAllPyPICacheTasks,
} from '../api';
import { PyPICacheStatus, PyPICacheTask } from '../types';
import { PyPICacheStatus, PyPICacheTask, PyPICacheActiveTask } from '../types';
import './AdminJobsPage.css';
function AdminJobsPage() {
@@ -17,6 +18,7 @@ function AdminJobsPage() {
// PyPI cache status
const [cacheStatus, setCacheStatus] = useState<PyPICacheStatus | null>(null);
const [failedTasks, setFailedTasks] = useState<PyPICacheTask[]>([]);
const [activeTasks, setActiveTasks] = useState<PyPICacheActiveTask[]>([]);
const [loadingStatus, setLoadingStatus] = useState(true);
const [statusError, setStatusError] = useState<string | null>(null);
@@ -39,12 +41,14 @@ function AdminJobsPage() {
setStatusError(null);
try {
const [status, failed] = await Promise.all([
const [status, failed, active] = await Promise.all([
getPyPICacheStatus(),
getPyPICacheFailedTasks(100),
getPyPICacheActiveTasks(50),
]);
setCacheStatus(status);
setFailedTasks(failed);
setActiveTasks(active);
} catch (err) {
setStatusError(err instanceof Error ? err.message : 'Failed to load status');
} finally {
@@ -192,52 +196,101 @@ function AdminJobsPage() {
{totalJobs === 0 ? (
<p className="empty-message">No cache jobs yet. Jobs are created when packages are downloaded through the PyPI proxy.</p>
) : failedTasks.length === 0 && cacheStatus?.pending === 0 && cacheStatus?.in_progress === 0 ? (
<p className="success-text">All jobs completed successfully.</p>
) : failedTasks.length > 0 ? (
<>
<h3>Failed Tasks</h3>
<table className="jobs-table">
<thead>
<tr>
<th>Package</th>
<th>Error</th>
<th>Attempts</th>
<th>Depth</th>
<th>Failed At</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{failedTasks.map((task) => (
<tr key={task.id}>
<td className="package-name">{task.package}</td>
<td className="error-cell" title={task.error || ''}>
{task.error || 'Unknown error'}
</td>
<td>{task.attempts}</td>
<td>{task.depth}</td>
<td className="timestamp">
{task.failed_at
? new Date(task.failed_at).toLocaleString()
: '-'}
</td>
<td className="actions-cell">
<button
className="btn btn-sm btn-secondary"
onClick={() => handleRetryPackage(task.package)}
disabled={retryingPackage === task.package}
>
{retryingPackage === task.package ? 'Retrying...' : 'Retry'}
</button>
</td>
</tr>
))}
</tbody>
</table>
</>
) : (
<p className="empty-message">Jobs are processing. No failures yet.</p>
<>
{/* Active Workers Table */}
{activeTasks.length > 0 && (
<div className="active-workers-section">
<h3>
<span className="pulse-indicator"></span>
Active Workers ({activeTasks.length})
</h3>
<table className="jobs-table active-table">
<thead>
<tr>
<th>Package</th>
<th>Version</th>
<th>Depth</th>
<th>Attempt</th>
<th>Started</th>
</tr>
</thead>
<tbody>
{activeTasks.map((task) => (
<tr key={task.id} className="active-row">
<td className="package-name">{task.package}</td>
<td className="version-constraint">
{task.version_constraint || '*'}
</td>
<td>{task.depth}</td>
<td>{task.attempts + 1}</td>
<td className="timestamp">
{task.started_at
? new Date(task.started_at).toLocaleTimeString()
: '-'}
</td>
</tr>
))}
</tbody>
</table>
</div>
)}
{/* Failed Tasks Table */}
{failedTasks.length > 0 && (
<>
<h3>Failed Tasks</h3>
<table className="jobs-table">
<thead>
<tr>
<th>Package</th>
<th>Error</th>
<th>Attempts</th>
<th>Depth</th>
<th>Failed At</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{failedTasks.map((task) => (
<tr key={task.id}>
<td className="package-name">{task.package}</td>
<td className="error-cell" title={task.error || ''}>
{task.error || 'Unknown error'}
</td>
<td>{task.attempts}</td>
<td>{task.depth}</td>
<td className="timestamp">
{task.failed_at
? new Date(task.failed_at).toLocaleString()
: '-'}
</td>
<td className="actions-cell">
<button
className="btn btn-sm btn-secondary"
onClick={() => handleRetryPackage(task.package)}
disabled={retryingPackage === task.package}
>
{retryingPackage === task.package ? 'Retrying...' : 'Retry'}
</button>
</td>
</tr>
))}
</tbody>
</table>
</>
)}
{/* Success message when nothing is pending/in-progress/failed */}
{failedTasks.length === 0 && activeTasks.length === 0 && cacheStatus?.pending === 0 && (
<p className="success-text">All jobs completed successfully.</p>
)}
{/* In progress message */}
{activeTasks.length === 0 && failedTasks.length === 0 && (cacheStatus?.pending ?? 0) > 0 && (
<p className="empty-message">Jobs queued for processing...</p>
)}
</>
)}
</>
)}

View File

@@ -575,6 +575,15 @@ export interface PyPICacheTask {
failed_at: string | null;
}
export interface PyPICacheActiveTask {
id: string;
package: string;
version_constraint: string | null;
depth: number;
attempts: number;
started_at: string | null;
}
export interface PyPICacheRetryResponse {
message: string;
task_id?: string;