Compare commits
2 Commits
8edb45879f
...
1138309aaa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1138309aaa | ||
|
|
3bdeade7ca |
@@ -192,8 +192,8 @@ def _process_cache_task(task_id: UUID):
|
||||
_mark_task_failed(db, task, f"Max depth {max_depth} exceeded")
|
||||
return
|
||||
|
||||
# Do the actual caching
|
||||
result = _fetch_and_cache_package(task.package_name, task.version_constraint)
|
||||
# Do the actual caching - pass depth so nested deps are queued at depth+1
|
||||
result = _fetch_and_cache_package(task.package_name, task.version_constraint, depth=task.depth)
|
||||
|
||||
if result["success"]:
|
||||
_mark_task_completed(db, task, cached_artifact_id=result.get("artifact_id"))
|
||||
@@ -256,6 +256,7 @@ def _find_cached_package(db: Session, package_name: str) -> Optional[str]:
|
||||
def _fetch_and_cache_package(
|
||||
package_name: str,
|
||||
version_constraint: Optional[str] = None,
|
||||
depth: int = 0,
|
||||
) -> dict:
|
||||
"""
|
||||
Fetch and cache a PyPI package by making requests through our own proxy.
|
||||
@@ -263,6 +264,7 @@ def _fetch_and_cache_package(
|
||||
Args:
|
||||
package_name: The package name to cache.
|
||||
version_constraint: Optional version constraint (currently not used for selection).
|
||||
depth: Current recursion depth for dependency tracking.
|
||||
|
||||
Returns:
|
||||
Dict with "success" bool, "artifact_id" on success, "error" on failure.
|
||||
@@ -317,6 +319,11 @@ def _fetch_and_cache_package(
|
||||
elif not download_url.startswith("http"):
|
||||
download_url = f"{base_url}/pypi/simple/{normalized_name}/{download_url}"
|
||||
|
||||
# Add cache-depth query parameter to track recursion depth
|
||||
# The proxy will queue dependencies at depth+1
|
||||
separator = "&" if "?" in download_url else "?"
|
||||
download_url = f"{download_url}{separator}cache-depth={depth}"
|
||||
|
||||
# Step 3: Download the file through our proxy (this caches it)
|
||||
logger.debug(f"Downloading: {download_url}")
|
||||
response = client.get(download_url)
|
||||
@@ -337,6 +344,10 @@ def _fetch_and_cache_package(
|
||||
return {"success": False, "error": str(e)}
|
||||
|
||||
|
||||
# Alias for backward compatibility and clearer naming
|
||||
_fetch_and_cache_package_with_depth = _fetch_and_cache_package
|
||||
|
||||
|
||||
def _mark_task_completed(
|
||||
db: Session,
|
||||
task: PyPICacheTask,
|
||||
@@ -515,6 +526,38 @@ def get_failed_tasks(db: Session, limit: int = 50) -> List[dict]:
|
||||
]
|
||||
|
||||
|
||||
def get_active_tasks(db: Session, limit: int = 50) -> List[dict]:
|
||||
"""
|
||||
Get list of currently active (in_progress) tasks.
|
||||
|
||||
Args:
|
||||
db: Database session.
|
||||
limit: Maximum number of tasks to return.
|
||||
|
||||
Returns:
|
||||
List of active task info dicts.
|
||||
"""
|
||||
tasks = (
|
||||
db.query(PyPICacheTask)
|
||||
.filter(PyPICacheTask.status == "in_progress")
|
||||
.order_by(PyPICacheTask.started_at.desc())
|
||||
.limit(limit)
|
||||
.all()
|
||||
)
|
||||
|
||||
return [
|
||||
{
|
||||
"id": str(task.id),
|
||||
"package": task.package_name,
|
||||
"version_constraint": task.version_constraint,
|
||||
"depth": task.depth,
|
||||
"attempts": task.attempts,
|
||||
"started_at": task.started_at.isoformat() if task.started_at else None,
|
||||
}
|
||||
for task in tasks
|
||||
]
|
||||
|
||||
|
||||
def retry_failed_task(db: Session, package_name: str) -> Optional[PyPICacheTask]:
|
||||
"""
|
||||
Reset a failed task to retry.
|
||||
|
||||
@@ -28,6 +28,7 @@ from .pypi_cache_worker import (
|
||||
enqueue_cache_task,
|
||||
get_cache_status,
|
||||
get_failed_tasks,
|
||||
get_active_tasks,
|
||||
retry_failed_task,
|
||||
retry_all_failed_tasks,
|
||||
)
|
||||
@@ -516,6 +517,7 @@ async def pypi_download_file(
|
||||
package_name: str,
|
||||
filename: str,
|
||||
upstream: Optional[str] = None,
|
||||
cache_depth: int = Query(default=0, ge=0, le=100, alias="cache-depth"),
|
||||
db: Session = Depends(get_db),
|
||||
storage: S3Storage = Depends(get_storage),
|
||||
):
|
||||
@@ -526,6 +528,7 @@ async def pypi_download_file(
|
||||
package_name: The package name
|
||||
filename: The filename to download
|
||||
upstream: URL-encoded upstream URL to fetch from
|
||||
cache_depth: Current cache recursion depth (used by cache worker for nested deps)
|
||||
"""
|
||||
if not upstream:
|
||||
raise HTTPException(
|
||||
@@ -772,17 +775,19 @@ async def pypi_download_file(
|
||||
db.add(dep)
|
||||
|
||||
# Proactively cache dependencies via task queue
|
||||
# Dependencies are queued at cache_depth + 1 to track recursion
|
||||
if unique_deps:
|
||||
next_depth = cache_depth + 1
|
||||
for dep_name, dep_version in unique_deps:
|
||||
enqueue_cache_task(
|
||||
db,
|
||||
package_name=dep_name,
|
||||
version_constraint=dep_version,
|
||||
parent_task_id=None, # Top-level, triggered by user download
|
||||
depth=0,
|
||||
depth=next_depth,
|
||||
triggered_by_artifact=sha256,
|
||||
)
|
||||
logger.info(f"PyPI proxy: queued {len(unique_deps)} dependencies for caching")
|
||||
logger.info(f"PyPI proxy: queued {len(unique_deps)} dependencies for caching (depth={next_depth})")
|
||||
|
||||
db.commit()
|
||||
|
||||
@@ -845,6 +850,25 @@ async def pypi_cache_failed(
|
||||
return get_failed_tasks(db, limit=limit)
|
||||
|
||||
|
||||
@router.get("/cache/active")
|
||||
async def pypi_cache_active(
|
||||
limit: int = Query(default=50, ge=1, le=500),
|
||||
db: Session = Depends(get_db),
|
||||
_current_user: User = Depends(require_admin),
|
||||
):
|
||||
"""
|
||||
Get list of currently active (in_progress) cache tasks.
|
||||
|
||||
Shows what the cache workers are currently processing.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of tasks to return (default 50, max 500).
|
||||
|
||||
Requires admin privileges.
|
||||
"""
|
||||
return get_active_tasks(db, limit=limit)
|
||||
|
||||
|
||||
@router.post("/cache/retry/{package_name}")
|
||||
async def pypi_cache_retry(
|
||||
package_name: str,
|
||||
|
||||
@@ -261,3 +261,104 @@ class TestWorkerPoolLifecycle:
|
||||
# We just verify the functions are callable
|
||||
assert callable(init_cache_worker_pool)
|
||||
assert callable(shutdown_cache_worker_pool)
|
||||
|
||||
|
||||
class TestNestedDependencyDepthTracking:
|
||||
"""Tests for nested dependency depth tracking.
|
||||
|
||||
When the cache worker downloads a package, its dependencies should be
|
||||
queued with depth = current_task_depth + 1, not depth = 0.
|
||||
"""
|
||||
|
||||
def test_enqueue_with_depth_increments_for_nested_deps(self):
|
||||
"""Test that enqueue_cache_task properly tracks depth for nested dependencies.
|
||||
|
||||
When a task at depth=2 discovers a new dependency, that dependency
|
||||
should be queued at depth=3.
|
||||
"""
|
||||
from unittest.mock import MagicMock, patch
|
||||
from app.pypi_cache_worker import enqueue_cache_task
|
||||
|
||||
mock_db = MagicMock()
|
||||
|
||||
# No existing task for this package
|
||||
mock_db.query.return_value.filter.return_value.first.return_value = None
|
||||
|
||||
# Mock _find_cached_package to return None (not cached)
|
||||
with patch('app.pypi_cache_worker._find_cached_package', return_value=None):
|
||||
task = enqueue_cache_task(
|
||||
mock_db,
|
||||
package_name="nested-dep",
|
||||
version_constraint=">=1.0",
|
||||
parent_task_id=None,
|
||||
depth=3, # Parent task was at depth 2, so this dep is at depth 3
|
||||
triggered_by_artifact="abc123",
|
||||
)
|
||||
|
||||
# Verify db.add was called
|
||||
mock_db.add.assert_called_once()
|
||||
|
||||
# Get the task that was added
|
||||
added_task = mock_db.add.call_args[0][0]
|
||||
|
||||
# The task should have the correct depth
|
||||
assert added_task.depth == 3, f"Expected depth=3, got depth={added_task.depth}"
|
||||
assert added_task.package_name == "nested-dep"
|
||||
|
||||
def test_proxy_download_accepts_cache_depth_param(self):
|
||||
"""Test that proxy download endpoint accepts cache-depth query parameter.
|
||||
|
||||
The cache worker should pass its current depth via query param so the proxy
|
||||
can queue dependencies at the correct depth.
|
||||
"""
|
||||
# Verify that pypi_download_file has a cache_depth parameter
|
||||
import inspect
|
||||
from app.pypi_proxy import pypi_download_file
|
||||
|
||||
sig = inspect.signature(pypi_download_file)
|
||||
params = list(sig.parameters.keys())
|
||||
|
||||
# The endpoint should accept a cache_depth parameter
|
||||
assert 'cache_depth' in params, \
|
||||
f"pypi_download_file should accept cache_depth parameter. Got params: {params}"
|
||||
|
||||
def test_worker_sends_depth_in_url_when_fetching(self):
|
||||
"""Test that _fetch_and_cache_package includes depth in download URL.
|
||||
|
||||
When the worker fetches a package, it should include its current depth
|
||||
in the URL query params so nested dependencies get queued at depth+1.
|
||||
"""
|
||||
from unittest.mock import patch, MagicMock
|
||||
import httpx
|
||||
|
||||
# We need to verify that the httpx.Client.get call includes the depth in URL
|
||||
with patch('app.pypi_cache_worker.httpx.Client') as mock_client_class:
|
||||
mock_client = MagicMock()
|
||||
mock_client_class.return_value.__enter__ = MagicMock(return_value=mock_client)
|
||||
mock_client_class.return_value.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
# Mock successful responses
|
||||
mock_response_index = MagicMock()
|
||||
mock_response_index.status_code = 200
|
||||
mock_response_index.text = '''
|
||||
<html><body>
|
||||
<a href="/pypi/simple/test-pkg/test_pkg-1.0.0-py3-none-any.whl?upstream=http%3A%2F%2Fexample.com">test_pkg-1.0.0-py3-none-any.whl</a>
|
||||
</body></html>
|
||||
'''
|
||||
|
||||
mock_response_download = MagicMock()
|
||||
mock_response_download.status_code = 200
|
||||
mock_response_download.headers = {"X-Checksum-SHA256": "abc123"}
|
||||
|
||||
mock_client.get.side_effect = [mock_response_index, mock_response_download]
|
||||
|
||||
from app.pypi_cache_worker import _fetch_and_cache_package_with_depth
|
||||
|
||||
# This function should exist and accept depth parameter
|
||||
result = _fetch_and_cache_package_with_depth("test-pkg", None, depth=2)
|
||||
|
||||
# Verify the download request included the cache-depth query param
|
||||
download_call = mock_client.get.call_args_list[1]
|
||||
download_url = download_call[0][0] # First positional arg is URL
|
||||
assert "cache-depth=2" in download_url, \
|
||||
f"Expected cache-depth=2 in URL, got: {download_url}"
|
||||
|
||||
@@ -754,6 +754,7 @@ export async function testUpstreamSource(id: string): Promise<UpstreamSourceTest
|
||||
import {
|
||||
PyPICacheStatus,
|
||||
PyPICacheTask,
|
||||
PyPICacheActiveTask,
|
||||
PyPICacheRetryResponse,
|
||||
} from './types';
|
||||
|
||||
@@ -771,6 +772,13 @@ export async function getPyPICacheFailedTasks(limit: number = 50): Promise<PyPIC
|
||||
return handleResponse<PyPICacheTask[]>(response);
|
||||
}
|
||||
|
||||
export async function getPyPICacheActiveTasks(limit: number = 50): Promise<PyPICacheActiveTask[]> {
|
||||
const response = await fetch(`/pypi/cache/active?limit=${limit}`, {
|
||||
credentials: 'include',
|
||||
});
|
||||
return handleResponse<PyPICacheActiveTask[]>(response);
|
||||
}
|
||||
|
||||
export async function retryPyPICacheTask(packageName: string): Promise<PyPICacheRetryResponse> {
|
||||
const response = await fetch(`/pypi/cache/retry/${encodeURIComponent(packageName)}`, {
|
||||
method: 'POST',
|
||||
|
||||
@@ -271,6 +271,62 @@
|
||||
font-size: 0.8rem;
|
||||
}
|
||||
|
||||
/* Active Workers Section */
|
||||
.active-workers-section {
|
||||
margin-bottom: 1.5rem;
|
||||
}
|
||||
|
||||
.active-workers-section h3 {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
gap: 0.5rem;
|
||||
color: #3b82f6;
|
||||
margin-top: 1rem;
|
||||
}
|
||||
|
||||
.pulse-indicator {
|
||||
width: 8px;
|
||||
height: 8px;
|
||||
background-color: #3b82f6;
|
||||
border-radius: 50%;
|
||||
animation: pulse 1.5s ease-in-out infinite;
|
||||
}
|
||||
|
||||
@keyframes pulse {
|
||||
0%, 100% {
|
||||
opacity: 1;
|
||||
transform: scale(1);
|
||||
}
|
||||
50% {
|
||||
opacity: 0.5;
|
||||
transform: scale(1.2);
|
||||
}
|
||||
}
|
||||
|
||||
.jobs-table.active-table {
|
||||
border: 1px solid #3b82f6;
|
||||
border-radius: 4px;
|
||||
}
|
||||
|
||||
.jobs-table.active-table th {
|
||||
background: #eff6ff;
|
||||
color: #1d4ed8;
|
||||
}
|
||||
|
||||
.jobs-table .active-row {
|
||||
background-color: #f0f9ff;
|
||||
}
|
||||
|
||||
.jobs-table .active-row:hover {
|
||||
background-color: #e0f2fe;
|
||||
}
|
||||
|
||||
.jobs-table .version-constraint {
|
||||
font-family: monospace;
|
||||
font-size: 0.85rem;
|
||||
color: var(--text-secondary);
|
||||
}
|
||||
|
||||
/* Responsive */
|
||||
@media (max-width: 768px) {
|
||||
.status-cards {
|
||||
|
||||
@@ -4,10 +4,11 @@ import { useAuth } from '../contexts/AuthContext';
|
||||
import {
|
||||
getPyPICacheStatus,
|
||||
getPyPICacheFailedTasks,
|
||||
getPyPICacheActiveTasks,
|
||||
retryPyPICacheTask,
|
||||
retryAllPyPICacheTasks,
|
||||
} from '../api';
|
||||
import { PyPICacheStatus, PyPICacheTask } from '../types';
|
||||
import { PyPICacheStatus, PyPICacheTask, PyPICacheActiveTask } from '../types';
|
||||
import './AdminJobsPage.css';
|
||||
|
||||
function AdminJobsPage() {
|
||||
@@ -17,6 +18,7 @@ function AdminJobsPage() {
|
||||
// PyPI cache status
|
||||
const [cacheStatus, setCacheStatus] = useState<PyPICacheStatus | null>(null);
|
||||
const [failedTasks, setFailedTasks] = useState<PyPICacheTask[]>([]);
|
||||
const [activeTasks, setActiveTasks] = useState<PyPICacheActiveTask[]>([]);
|
||||
const [loadingStatus, setLoadingStatus] = useState(true);
|
||||
const [statusError, setStatusError] = useState<string | null>(null);
|
||||
|
||||
@@ -39,12 +41,14 @@ function AdminJobsPage() {
|
||||
|
||||
setStatusError(null);
|
||||
try {
|
||||
const [status, failed] = await Promise.all([
|
||||
const [status, failed, active] = await Promise.all([
|
||||
getPyPICacheStatus(),
|
||||
getPyPICacheFailedTasks(100),
|
||||
getPyPICacheActiveTasks(50),
|
||||
]);
|
||||
setCacheStatus(status);
|
||||
setFailedTasks(failed);
|
||||
setActiveTasks(active);
|
||||
} catch (err) {
|
||||
setStatusError(err instanceof Error ? err.message : 'Failed to load status');
|
||||
} finally {
|
||||
@@ -192,52 +196,101 @@ function AdminJobsPage() {
|
||||
|
||||
{totalJobs === 0 ? (
|
||||
<p className="empty-message">No cache jobs yet. Jobs are created when packages are downloaded through the PyPI proxy.</p>
|
||||
) : failedTasks.length === 0 && cacheStatus?.pending === 0 && cacheStatus?.in_progress === 0 ? (
|
||||
<p className="success-text">All jobs completed successfully.</p>
|
||||
) : failedTasks.length > 0 ? (
|
||||
<>
|
||||
<h3>Failed Tasks</h3>
|
||||
<table className="jobs-table">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Package</th>
|
||||
<th>Error</th>
|
||||
<th>Attempts</th>
|
||||
<th>Depth</th>
|
||||
<th>Failed At</th>
|
||||
<th>Actions</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{failedTasks.map((task) => (
|
||||
<tr key={task.id}>
|
||||
<td className="package-name">{task.package}</td>
|
||||
<td className="error-cell" title={task.error || ''}>
|
||||
{task.error || 'Unknown error'}
|
||||
</td>
|
||||
<td>{task.attempts}</td>
|
||||
<td>{task.depth}</td>
|
||||
<td className="timestamp">
|
||||
{task.failed_at
|
||||
? new Date(task.failed_at).toLocaleString()
|
||||
: '-'}
|
||||
</td>
|
||||
<td className="actions-cell">
|
||||
<button
|
||||
className="btn btn-sm btn-secondary"
|
||||
onClick={() => handleRetryPackage(task.package)}
|
||||
disabled={retryingPackage === task.package}
|
||||
>
|
||||
{retryingPackage === task.package ? 'Retrying...' : 'Retry'}
|
||||
</button>
|
||||
</td>
|
||||
</tr>
|
||||
))}
|
||||
</tbody>
|
||||
</table>
|
||||
</>
|
||||
) : (
|
||||
<p className="empty-message">Jobs are processing. No failures yet.</p>
|
||||
<>
|
||||
{/* Active Workers Table */}
|
||||
{activeTasks.length > 0 && (
|
||||
<div className="active-workers-section">
|
||||
<h3>
|
||||
<span className="pulse-indicator"></span>
|
||||
Active Workers ({activeTasks.length})
|
||||
</h3>
|
||||
<table className="jobs-table active-table">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Package</th>
|
||||
<th>Version</th>
|
||||
<th>Depth</th>
|
||||
<th>Attempt</th>
|
||||
<th>Started</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{activeTasks.map((task) => (
|
||||
<tr key={task.id} className="active-row">
|
||||
<td className="package-name">{task.package}</td>
|
||||
<td className="version-constraint">
|
||||
{task.version_constraint || '*'}
|
||||
</td>
|
||||
<td>{task.depth}</td>
|
||||
<td>{task.attempts + 1}</td>
|
||||
<td className="timestamp">
|
||||
{task.started_at
|
||||
? new Date(task.started_at).toLocaleTimeString()
|
||||
: '-'}
|
||||
</td>
|
||||
</tr>
|
||||
))}
|
||||
</tbody>
|
||||
</table>
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Failed Tasks Table */}
|
||||
{failedTasks.length > 0 && (
|
||||
<>
|
||||
<h3>Failed Tasks</h3>
|
||||
<table className="jobs-table">
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Package</th>
|
||||
<th>Error</th>
|
||||
<th>Attempts</th>
|
||||
<th>Depth</th>
|
||||
<th>Failed At</th>
|
||||
<th>Actions</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{failedTasks.map((task) => (
|
||||
<tr key={task.id}>
|
||||
<td className="package-name">{task.package}</td>
|
||||
<td className="error-cell" title={task.error || ''}>
|
||||
{task.error || 'Unknown error'}
|
||||
</td>
|
||||
<td>{task.attempts}</td>
|
||||
<td>{task.depth}</td>
|
||||
<td className="timestamp">
|
||||
{task.failed_at
|
||||
? new Date(task.failed_at).toLocaleString()
|
||||
: '-'}
|
||||
</td>
|
||||
<td className="actions-cell">
|
||||
<button
|
||||
className="btn btn-sm btn-secondary"
|
||||
onClick={() => handleRetryPackage(task.package)}
|
||||
disabled={retryingPackage === task.package}
|
||||
>
|
||||
{retryingPackage === task.package ? 'Retrying...' : 'Retry'}
|
||||
</button>
|
||||
</td>
|
||||
</tr>
|
||||
))}
|
||||
</tbody>
|
||||
</table>
|
||||
</>
|
||||
)}
|
||||
|
||||
{/* Success message when nothing is pending/in-progress/failed */}
|
||||
{failedTasks.length === 0 && activeTasks.length === 0 && cacheStatus?.pending === 0 && (
|
||||
<p className="success-text">All jobs completed successfully.</p>
|
||||
)}
|
||||
|
||||
{/* In progress message */}
|
||||
{activeTasks.length === 0 && failedTasks.length === 0 && (cacheStatus?.pending ?? 0) > 0 && (
|
||||
<p className="empty-message">Jobs queued for processing...</p>
|
||||
)}
|
||||
</>
|
||||
)}
|
||||
</>
|
||||
)}
|
||||
|
||||
@@ -575,6 +575,15 @@ export interface PyPICacheTask {
|
||||
failed_at: string | null;
|
||||
}
|
||||
|
||||
export interface PyPICacheActiveTask {
|
||||
id: string;
|
||||
package: string;
|
||||
version_constraint: string | null;
|
||||
depth: number;
|
||||
attempts: number;
|
||||
started_at: string | null;
|
||||
}
|
||||
|
||||
export interface PyPICacheRetryResponse {
|
||||
message: string;
|
||||
task_id?: string;
|
||||
|
||||
Reference in New Issue
Block a user