feat: add auto-fetch for missing dependencies from upstream registries

Add auto_fetch parameter to dependency resolution endpoint that fetches
missing dependencies from upstream registries (PyPI) when resolving.

- Add RegistryClient abstraction with PyPIRegistryClient implementation
- Extract fetch_and_cache_pypi_package() for reuse
- Add resolve_dependencies_with_fetch() async function
- Extend MissingDependency schema with fetch_attempted/fetch_error
- Add fetched list to DependencyResolutionResponse
- Add auto_fetch_max_depth config setting (default: 3)
- Remove Usage section from Package page UI
- Add 6 integration tests for auto-fetch functionality
This commit is contained in:
Mondo Diaz
2026-02-04 12:01:49 -06:00
parent 9f233e0d4d
commit cbc2e5e11a
10 changed files with 1348 additions and 65 deletions

View File

@@ -10,6 +10,17 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added S3 bucket provisioning terraform configuration (#59)
- Creates an S3 bucket to be used for anything Orchard
- Creates a log bucket for any logs tracking the S3 bucket
- Added auto-fetch capability to dependency resolution endpoint
- `GET /api/v1/project/{project}/{package}/+/{ref}/resolve?auto_fetch=true` fetches missing dependencies from upstream registries
- PyPI registry client queries PyPI JSON API to resolve version constraints
- Fetched artifacts are cached and included in response `fetched` field
- Missing dependencies show `fetch_attempted` and `fetch_error` status
- Configurable max fetch depth via `ORCHARD_AUTO_FETCH_MAX_DEPTH` (default: 3)
- Added `backend/app/registry_client.py` with extensible registry client abstraction
- `RegistryClient` ABC for implementing upstream registry clients
- `PyPIRegistryClient` implementation using PyPI JSON API
- `get_registry_client()` factory function for future npm/maven support
- Added `fetch_and_cache_pypi_package()` reusable function for PyPI package fetching
- Added HTTP connection pooling infrastructure for improved PyPI proxy performance
- `HttpClientManager` with configurable pool size, timeouts, and thread pool executor
- Eliminates per-request connection overhead (~100-500ms → ~5ms)
@@ -36,6 +47,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added `POST /api/v1/cache/resolve` endpoint to cache packages by coordinates instead of URL (#108)
### Changed
- Removed Usage section from Package page (curl command examples)
- PyPI proxy now uses shared HTTP connection pool instead of per-request clients
- PyPI proxy now caches upstream source configuration in Redis
- Dependency storage now uses batch INSERT instead of individual queries

View File

@@ -89,6 +89,11 @@ class Settings(BaseSettings):
pypi_cache_max_depth: int = 10 # Maximum recursion depth for dependency caching
pypi_cache_max_attempts: int = 3 # Maximum retry attempts for failed cache tasks
# Auto-fetch configuration for dependency resolution
auto_fetch_dependencies: bool = False # Server default for auto_fetch parameter
auto_fetch_max_depth: int = 3 # Maximum fetch recursion depth to prevent runaway fetching
auto_fetch_timeout: int = 300 # Total timeout for auto-fetch resolution in seconds
# JWT Authentication settings (optional, for external identity providers)
jwt_enabled: bool = False # Enable JWT token validation
jwt_secret: str = "" # Secret key for HS256, or leave empty for RS256 with JWKS

View File

@@ -11,11 +11,18 @@ Handles:
"""
import re
import logging
import yaml
from typing import List, Dict, Any, Optional, Set, Tuple
from typing import List, Dict, Any, Optional, Set, Tuple, TYPE_CHECKING
from sqlalchemy.orm import Session
from sqlalchemy import and_
if TYPE_CHECKING:
from .storage import S3Storage
from .registry_client import RegistryClient
logger = logging.getLogger(__name__)
# Import packaging for PEP 440 version matching
try:
from packaging.specifiers import SpecifierSet, InvalidSpecifier
@@ -848,6 +855,334 @@ def resolve_dependencies(
},
resolved=resolved_list,
missing=missing_dependencies,
fetched=[], # No fetching in sync version
total_size=total_size,
artifact_count=len(resolved_list),
)
# System project mapping for auto-fetch
SYSTEM_PROJECT_REGISTRY_MAP = {
"_pypi": "pypi",
"_npm": "npm",
"_maven": "maven",
}
async def resolve_dependencies_with_fetch(
db: Session,
project_name: str,
package_name: str,
ref: str,
base_url: str,
storage: "S3Storage",
registry_clients: Dict[str, "RegistryClient"],
max_fetch_depth: int = 3,
) -> DependencyResolutionResponse:
"""
Resolve all dependencies for an artifact recursively, fetching missing ones from upstream.
This async version extends the basic resolution with auto-fetch capability:
when a missing dependency is from a system project (e.g., _pypi), it attempts
to fetch the package from the corresponding upstream registry.
Args:
db: Database session
project_name: Project name
package_name: Package name
ref: Version reference (or artifact:hash)
base_url: Base URL for download URLs
storage: S3 storage for caching fetched artifacts
registry_clients: Map of system project to registry client {"_pypi": PyPIRegistryClient}
max_fetch_depth: Maximum depth for auto-fetching (prevents runaway fetching)
Returns:
DependencyResolutionResponse with all resolved artifacts and fetch status
Raises:
DependencyNotFoundError: If the root artifact cannot be found
CircularDependencyError: If circular dependencies are detected
DependencyConflictError: If conflicting versions are required
"""
# Resolve the initial artifact (same as sync version)
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
raise DependencyNotFoundError(project_name, package_name, ref)
package = db.query(Package).filter(
Package.project_id == project.id,
Package.name == package_name,
).first()
if not package:
raise DependencyNotFoundError(project_name, package_name, ref)
# Handle artifact: prefix for direct artifact ID references
if ref.startswith("artifact:"):
artifact_id = ref[9:]
artifact = db.query(Artifact).filter(Artifact.id == artifact_id).first()
if not artifact:
raise DependencyNotFoundError(project_name, package_name, ref)
root_artifact_id = artifact.id
root_version = artifact_id[:12]
root_size = artifact.size
else:
resolved = _resolve_dependency_to_artifact(
db, project_name, package_name, ref
)
if not resolved:
raise DependencyNotFoundError(project_name, package_name, ref)
root_artifact_id, root_version, root_size = resolved
# Track state
resolved_artifacts: Dict[str, ResolvedArtifact] = {}
missing_dependencies: List[MissingDependency] = []
fetched_artifacts: List[ResolvedArtifact] = [] # Newly fetched
version_requirements: Dict[str, List[Dict[str, Any]]] = {}
visiting: Set[str] = set()
visited: Set[str] = set()
current_path: Dict[str, str] = {}
resolution_order: List[str] = []
# Track fetch attempts to prevent loops
fetch_attempted: Set[str] = set() # "project/package@constraint"
async def _try_fetch_dependency(
dep_project: str,
dep_package: str,
constraint: str,
required_by: str,
fetch_depth: int,
) -> Optional[Tuple[str, str, int]]:
"""
Try to fetch a missing dependency from upstream registry.
Returns (artifact_id, version, size) if successful, None otherwise.
"""
# Only fetch from system projects
registry_type = SYSTEM_PROJECT_REGISTRY_MAP.get(dep_project)
if not registry_type:
logger.debug(
f"Not a system project, skipping fetch: {dep_project}/{dep_package}"
)
return None
# Check fetch depth
if fetch_depth > max_fetch_depth:
logger.info(
f"Max fetch depth ({max_fetch_depth}) exceeded for {dep_project}/{dep_package}"
)
return None
# Build fetch key for loop prevention
fetch_key = f"{dep_project}/{dep_package}@{constraint}"
if fetch_key in fetch_attempted:
logger.debug(f"Already attempted fetch for {fetch_key}")
return None
fetch_attempted.add(fetch_key)
# Get registry client
client = registry_clients.get(dep_project)
if not client:
logger.debug(f"No registry client for {dep_project}")
return None
try:
# Resolve version constraint
version_info = await client.resolve_constraint(dep_package, constraint)
if not version_info:
logger.info(
f"No version of {dep_package} matches constraint '{constraint}' on upstream"
)
return None
# Fetch and cache the package
fetch_result = await client.fetch_package(
dep_package, version_info, db, storage
)
if not fetch_result:
logger.warning(f"Failed to fetch {dep_package}=={version_info.version}")
return None
logger.info(
f"Successfully fetched {dep_package}=={version_info.version} "
f"(artifact {fetch_result.artifact_id[:12]})"
)
# Add to fetched list for response
fetched_artifacts.append(ResolvedArtifact(
artifact_id=fetch_result.artifact_id,
project=dep_project,
package=dep_package,
version=fetch_result.version,
size=fetch_result.size,
download_url=f"{base_url}/api/v1/project/{dep_project}/{dep_package}/+/{fetch_result.version}",
))
return (fetch_result.artifact_id, fetch_result.version, fetch_result.size)
except Exception as e:
logger.warning(f"Error fetching {dep_package}: {e}")
return None
async def _resolve_recursive_async(
artifact_id: str,
proj_name: str,
pkg_name: str,
version_or_tag: str,
size: int,
required_by: Optional[str],
depth: int = 0,
fetch_depth: int = 0,
):
"""Recursively resolve dependencies with fetch capability."""
if depth > MAX_DEPENDENCY_DEPTH:
raise DependencyDepthExceededError(MAX_DEPENDENCY_DEPTH)
pkg_key = f"{proj_name}/{pkg_name}"
# Cycle detection
if artifact_id in visiting:
cycle_start = current_path.get(artifact_id, pkg_key)
cycle = [cycle_start, pkg_key]
raise CircularDependencyError(cycle)
# Conflict detection
if pkg_key in version_requirements:
existing_versions = {r["version"] for r in version_requirements[pkg_key]}
if version_or_tag not in existing_versions:
requirements = version_requirements[pkg_key] + [
{"version": version_or_tag, "required_by": required_by}
]
raise DependencyConflictError([
DependencyConflict(
project=proj_name,
package=pkg_name,
requirements=[
{
"version": r["version"],
"required_by": [{"path": r["required_by"]}] if r["required_by"] else []
}
for r in requirements
],
)
])
if artifact_id in visited:
return
if artifact_id in visited:
return
visiting.add(artifact_id)
current_path[artifact_id] = pkg_key
if pkg_key not in version_requirements:
version_requirements[pkg_key] = []
version_requirements[pkg_key].append({
"version": version_or_tag,
"required_by": required_by,
})
# Get dependencies
deps = db.query(ArtifactDependency).filter(
ArtifactDependency.artifact_id == artifact_id
).all()
for dep in deps:
# Skip self-dependencies
dep_proj_normalized = dep.dependency_project.lower()
dep_pkg_normalized = _normalize_pypi_package_name(dep.dependency_package)
curr_proj_normalized = proj_name.lower()
curr_pkg_normalized = _normalize_pypi_package_name(pkg_name)
if dep_proj_normalized == curr_proj_normalized and dep_pkg_normalized == curr_pkg_normalized:
continue
resolved_dep = _resolve_dependency_to_artifact(
db,
dep.dependency_project,
dep.dependency_package,
dep.version_constraint,
)
if not resolved_dep:
# Try to fetch from upstream if it's a system project
fetched = await _try_fetch_dependency(
dep.dependency_project,
dep.dependency_package,
dep.version_constraint,
pkg_key,
fetch_depth + 1,
)
if fetched:
resolved_dep = fetched
else:
# Still missing - add to missing list with fetch status
fetch_key = f"{dep.dependency_project}/{dep.dependency_package}@{dep.version_constraint}"
was_attempted = fetch_key in fetch_attempted
missing_dependencies.append(MissingDependency(
project=dep.dependency_project,
package=dep.dependency_package,
constraint=dep.version_constraint,
required_by=pkg_key,
fetch_attempted=was_attempted,
fetch_error="Max fetch depth exceeded" if was_attempted and fetch_depth >= max_fetch_depth else None,
))
continue
dep_artifact_id, dep_version, dep_size = resolved_dep
if dep_artifact_id == artifact_id:
continue
await _resolve_recursive_async(
dep_artifact_id,
dep.dependency_project,
dep.dependency_package,
dep_version,
dep_size,
pkg_key,
depth + 1,
fetch_depth + 1 if dep_artifact_id in [f.artifact_id for f in fetched_artifacts] else fetch_depth,
)
visiting.remove(artifact_id)
del current_path[artifact_id]
visited.add(artifact_id)
resolution_order.append(artifact_id)
resolved_artifacts[artifact_id] = ResolvedArtifact(
artifact_id=artifact_id,
project=proj_name,
package=pkg_name,
version=version_or_tag,
size=size,
download_url=f"{base_url}/api/v1/project/{proj_name}/{pkg_name}/+/{version_or_tag}",
)
# Start resolution from root
await _resolve_recursive_async(
root_artifact_id,
project_name,
package_name,
root_version,
root_size,
None,
)
# Build response in topological order
resolved_list = [resolved_artifacts[aid] for aid in resolution_order]
total_size = sum(r.size for r in resolved_list)
return DependencyResolutionResponse(
requested={
"project": project_name,
"package": package_name,
"ref": ref,
},
resolved=resolved_list,
missing=missing_dependencies,
fetched=fetched_artifacts,
total_size=total_size,
artifact_count=len(resolved_list),
)

View File

@@ -572,6 +572,258 @@ async def pypi_package_versions(
)
async def fetch_and_cache_pypi_package(
db: Session,
storage: S3Storage,
http_client: httpx.AsyncClient,
package_name: str,
filename: str,
download_url: str,
expected_sha256: Optional[str] = None,
) -> Optional[dict]:
"""
Fetch a PyPI package from upstream and cache it in Orchard.
This is the core caching logic extracted from pypi_download_file() for reuse
by the registry client during auto-fetch dependency resolution.
Args:
db: Database session
storage: S3 storage instance
http_client: Async HTTP client for making requests
package_name: Normalized package name (e.g., 'requests')
filename: Package filename (e.g., 'requests-2.31.0-py3-none-any.whl')
download_url: Full URL to download from upstream
expected_sha256: Optional SHA256 to verify download integrity
Returns:
Dict with artifact_id, size, version, already_cached if successful.
None if the fetch failed.
"""
# Normalize package name
normalized_name = re.sub(r'[-_.]+', '-', package_name).lower()
# Check if we already have this URL cached
url_hash = hashlib.sha256(download_url.encode()).hexdigest()
cached_url = db.query(CachedUrl).filter(CachedUrl.url_hash == url_hash).first()
if cached_url:
# Already cached - return existing artifact info
artifact = db.query(Artifact).filter(Artifact.id == cached_url.artifact_id).first()
if artifact:
version = _extract_pypi_version(filename)
logger.info(f"PyPI fetch: {filename} already cached (artifact {artifact.id[:12]})")
return {
"artifact_id": artifact.id,
"size": artifact.size,
"version": version,
"already_cached": True,
}
# Get upstream sources for auth headers
sources = _get_pypi_upstream_sources(db)
matched_source = sources[0] if sources else None
headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
if matched_source:
headers.update(_build_auth_headers(matched_source))
auth = _get_basic_auth(matched_source) if matched_source else None
download_timeout = httpx.Timeout(connect=30.0, read=300.0, write=300.0, pool=30.0)
try:
logger.info(f"PyPI fetch: downloading {filename} from {download_url}")
response = await http_client.get(
download_url,
headers=headers,
auth=auth,
timeout=download_timeout,
)
# Handle redirects manually
redirect_count = 0
while response.status_code in (301, 302, 303, 307, 308) and redirect_count < 5:
redirect_url = response.headers.get('location')
if not redirect_url:
break
if not redirect_url.startswith('http'):
redirect_url = urljoin(download_url, redirect_url)
logger.debug(f"PyPI fetch: following redirect to {redirect_url}")
# Don't send auth to different hosts
redirect_headers = {"User-Agent": "Orchard-PyPI-Proxy/1.0"}
redirect_auth = None
if urlparse(redirect_url).netloc == urlparse(download_url).netloc:
redirect_headers.update(headers)
redirect_auth = auth
response = await http_client.get(
redirect_url,
headers=redirect_headers,
auth=redirect_auth,
follow_redirects=False,
timeout=download_timeout,
)
redirect_count += 1
if response.status_code != 200:
error_detail = _parse_upstream_error(response)
logger.warning(f"PyPI fetch: upstream returned {response.status_code} for {filename}: {error_detail}")
return None
content_type = response.headers.get('content-type', 'application/octet-stream')
# Stream to temp file to avoid loading large packages into memory
tmp_path = None
try:
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{filename}") as tmp_file:
tmp_path = tmp_file.name
async for chunk in response.aiter_bytes(chunk_size=65536):
tmp_file.write(chunk)
# Store in S3 from temp file (computes hash and deduplicates automatically)
with open(tmp_path, 'rb') as f:
result = storage.store(f)
sha256 = result.sha256
size = result.size
# Verify hash if expected
if expected_sha256 and sha256 != expected_sha256.lower():
logger.error(
f"PyPI fetch: hash mismatch for {filename}: "
f"expected {expected_sha256[:12]}, got {sha256[:12]}"
)
return None
# Extract dependencies from the temp file
extracted_deps = _extract_dependencies_from_file(tmp_path, filename)
if extracted_deps:
logger.info(f"PyPI fetch: extracted {len(extracted_deps)} dependencies from {filename}")
logger.info(f"PyPI fetch: downloaded {filename}, {size} bytes, sha256={sha256[:12]}")
finally:
# Clean up temp file
if tmp_path and os.path.exists(tmp_path):
os.unlink(tmp_path)
# Check if artifact already exists
existing = db.query(Artifact).filter(Artifact.id == sha256).first()
if existing:
existing.ref_count += 1
db.flush()
else:
new_artifact = Artifact(
id=sha256,
original_name=filename,
content_type=content_type,
size=size,
ref_count=1,
created_by="pypi-proxy",
s3_key=result.s3_key,
checksum_md5=result.md5,
checksum_sha1=result.sha1,
s3_etag=result.s3_etag,
)
db.add(new_artifact)
db.flush()
# Create/get system project and package
system_project = db.query(Project).filter(Project.name == "_pypi").first()
if not system_project:
system_project = Project(
name="_pypi",
description="System project for cached PyPI packages",
is_public=True,
is_system=True,
created_by="pypi-proxy",
)
db.add(system_project)
db.flush()
elif not system_project.is_system:
system_project.is_system = True
db.flush()
package = db.query(Package).filter(
Package.project_id == system_project.id,
Package.name == normalized_name,
).first()
if not package:
package = Package(
project_id=system_project.id,
name=normalized_name,
description=f"PyPI package: {normalized_name}",
format="pypi",
)
db.add(package)
db.flush()
# Extract and create version
version = _extract_pypi_version(filename)
if version and not filename.endswith('.metadata'):
existing_version = db.query(PackageVersion).filter(
PackageVersion.package_id == package.id,
PackageVersion.version == version,
).first()
if not existing_version:
pkg_version = PackageVersion(
package_id=package.id,
artifact_id=sha256,
version=version,
version_source="filename",
created_by="pypi-proxy",
)
db.add(pkg_version)
# Cache the URL mapping
existing_cached = db.query(CachedUrl).filter(CachedUrl.url_hash == url_hash).first()
if not existing_cached:
cached_url_record = CachedUrl(
url_hash=url_hash,
url=download_url,
artifact_id=sha256,
)
db.add(cached_url_record)
# Store extracted dependencies using batch operation
if extracted_deps:
seen_deps: dict[str, str] = {}
for dep_name, dep_version in extracted_deps:
if dep_name not in seen_deps:
seen_deps[dep_name] = dep_version if dep_version else "*"
deps_to_store = [
("_pypi", dep_name, dep_version)
for dep_name, dep_version in seen_deps.items()
]
repo = ArtifactRepository(db)
inserted = repo.batch_upsert_dependencies(sha256, deps_to_store)
if inserted > 0:
logger.debug(f"Stored {inserted} dependencies for {sha256[:12]}...")
db.commit()
return {
"artifact_id": sha256,
"size": size,
"version": version,
"already_cached": False,
}
except httpx.ConnectError as e:
logger.warning(f"PyPI fetch: connection failed for {filename}: {e}")
return None
except httpx.TimeoutException as e:
logger.warning(f"PyPI fetch: timeout for {filename}: {e}")
return None
except Exception as e:
logger.exception(f"PyPI fetch: error downloading {filename}")
return None
@router.get("/simple/{package_name}/{filename}")
async def pypi_download_file(
request: Request,

View File

@@ -0,0 +1,416 @@
"""
Registry client abstraction for upstream package registries.
Provides a pluggable interface for fetching packages from upstream registries
(PyPI, npm, Maven, etc.) during dependency resolution with auto-fetch enabled.
"""
import hashlib
import logging
import os
import re
import tempfile
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Optional, TYPE_CHECKING
from urllib.parse import urljoin, urlparse
import httpx
from packaging.specifiers import SpecifierSet, InvalidSpecifier
from packaging.version import Version, InvalidVersion
from sqlalchemy.orm import Session
if TYPE_CHECKING:
from .storage import S3Storage
from .http_client import HttpClientManager
logger = logging.getLogger(__name__)
@dataclass
class VersionInfo:
"""Information about a package version from an upstream registry."""
version: str
download_url: str
filename: str
sha256: Optional[str] = None
size: Optional[int] = None
content_type: Optional[str] = None
@dataclass
class FetchResult:
"""Result of fetching a package from upstream."""
artifact_id: str # SHA256 hash
size: int
version: str
filename: str
already_cached: bool = False
class RegistryClient(ABC):
"""Abstract base class for upstream registry clients."""
@property
@abstractmethod
def source_type(self) -> str:
"""Return the source type this client handles (e.g., 'pypi', 'npm')."""
pass
@abstractmethod
async def get_available_versions(self, package_name: str) -> List[str]:
"""
Get all available versions of a package from upstream.
Args:
package_name: The normalized package name
Returns:
List of version strings, sorted from oldest to newest
"""
pass
@abstractmethod
async def resolve_constraint(
self, package_name: str, constraint: str
) -> Optional[VersionInfo]:
"""
Find the best version matching a constraint.
Args:
package_name: The normalized package name
constraint: Version constraint (e.g., '>=1.9', '<2.0,>=1.5', '*')
Returns:
VersionInfo with download URL, or None if no matching version found
"""
pass
@abstractmethod
async def fetch_package(
self,
package_name: str,
version_info: VersionInfo,
db: Session,
storage: "S3Storage",
) -> Optional[FetchResult]:
"""
Fetch and cache a package from upstream.
Args:
package_name: The normalized package name
version_info: Version details including download URL
db: Database session for creating records
storage: S3 storage for caching the artifact
Returns:
FetchResult with artifact_id, or None if fetch failed
"""
pass
class PyPIRegistryClient(RegistryClient):
"""PyPI registry client using the JSON API."""
# Timeout configuration for PyPI requests
CONNECT_TIMEOUT = 30.0
READ_TIMEOUT = 60.0
DOWNLOAD_TIMEOUT = 300.0 # Longer timeout for file downloads
def __init__(
self,
http_client: httpx.AsyncClient,
upstream_sources: List,
pypi_api_url: str = "https://pypi.org/pypi",
):
"""
Initialize PyPI registry client.
Args:
http_client: Shared async HTTP client
upstream_sources: List of configured upstream sources for auth
pypi_api_url: Base URL for PyPI JSON API
"""
self.client = http_client
self.sources = upstream_sources
self.api_url = pypi_api_url
@property
def source_type(self) -> str:
return "pypi"
def _normalize_package_name(self, name: str) -> str:
"""Normalize a PyPI package name per PEP 503."""
return re.sub(r"[-_.]+", "-", name).lower()
def _get_auth_headers(self) -> dict:
"""Get authentication headers from configured sources."""
headers = {"User-Agent": "Orchard-Registry-Client/1.0"}
if self.sources:
source = self.sources[0]
if hasattr(source, "auth_type"):
if source.auth_type == "bearer":
password = (
source.get_password()
if hasattr(source, "get_password")
else getattr(source, "password", None)
)
if password:
headers["Authorization"] = f"Bearer {password}"
elif source.auth_type == "api_key":
custom_headers = (
source.get_headers()
if hasattr(source, "get_headers")
else {}
)
if custom_headers:
headers.update(custom_headers)
return headers
def _get_basic_auth(self) -> Optional[tuple]:
"""Get basic auth credentials if configured."""
if self.sources:
source = self.sources[0]
if hasattr(source, "auth_type") and source.auth_type == "basic":
username = getattr(source, "username", None)
if username:
password = (
source.get_password()
if hasattr(source, "get_password")
else getattr(source, "password", "")
)
return (username, password or "")
return None
async def get_available_versions(self, package_name: str) -> List[str]:
"""Get all available versions from PyPI JSON API."""
normalized = self._normalize_package_name(package_name)
url = f"{self.api_url}/{normalized}/json"
headers = self._get_auth_headers()
auth = self._get_basic_auth()
timeout = httpx.Timeout(self.READ_TIMEOUT, connect=self.CONNECT_TIMEOUT)
try:
response = await self.client.get(
url, headers=headers, auth=auth, timeout=timeout
)
if response.status_code == 404:
logger.debug(f"Package {normalized} not found on PyPI")
return []
if response.status_code != 200:
logger.warning(
f"PyPI API returned {response.status_code} for {normalized}"
)
return []
data = response.json()
releases = data.get("releases", {})
# Filter to valid versions and sort
versions = []
for v in releases.keys():
try:
Version(v)
versions.append(v)
except InvalidVersion:
continue
versions.sort(key=lambda x: Version(x))
return versions
except httpx.RequestError as e:
logger.warning(f"Failed to query PyPI for {normalized}: {e}")
return []
except Exception as e:
logger.warning(f"Error parsing PyPI response for {normalized}: {e}")
return []
async def resolve_constraint(
self, package_name: str, constraint: str
) -> Optional[VersionInfo]:
"""Find best version matching constraint from PyPI."""
normalized = self._normalize_package_name(package_name)
url = f"{self.api_url}/{normalized}/json"
headers = self._get_auth_headers()
auth = self._get_basic_auth()
timeout = httpx.Timeout(self.READ_TIMEOUT, connect=self.CONNECT_TIMEOUT)
try:
response = await self.client.get(
url, headers=headers, auth=auth, timeout=timeout
)
if response.status_code == 404:
logger.debug(f"Package {normalized} not found on PyPI")
return None
if response.status_code != 200:
logger.warning(
f"PyPI API returned {response.status_code} for {normalized}"
)
return None
data = response.json()
releases = data.get("releases", {})
# Handle wildcard - return latest version
if constraint == "*":
latest_version = data.get("info", {}).get("version")
if latest_version and latest_version in releases:
return self._get_version_info(
normalized, latest_version, releases[latest_version]
)
return None
# Parse constraint
try:
specifier = SpecifierSet(constraint)
except InvalidSpecifier:
# Invalid constraint - treat as wildcard
logger.warning(
f"Invalid version constraint '{constraint}' for {normalized}, "
"treating as wildcard"
)
latest_version = data.get("info", {}).get("version")
if latest_version and latest_version in releases:
return self._get_version_info(
normalized, latest_version, releases[latest_version]
)
return None
# Find matching versions
matching = []
for v_str, files in releases.items():
if not files: # Skip versions with no files
continue
try:
v = Version(v_str)
if v in specifier:
matching.append((v_str, v, files))
except InvalidVersion:
continue
if not matching:
logger.debug(
f"No versions of {normalized} match constraint '{constraint}'"
)
return None
# Sort by version and return highest match
matching.sort(key=lambda x: x[1], reverse=True)
best_version, _, best_files = matching[0]
return self._get_version_info(normalized, best_version, best_files)
except httpx.RequestError as e:
logger.warning(f"Failed to query PyPI for {normalized}: {e}")
return None
except Exception as e:
logger.warning(f"Error resolving {normalized}@{constraint}: {e}")
return None
def _get_version_info(
self, package_name: str, version: str, files: List[dict]
) -> Optional[VersionInfo]:
"""Extract download info from PyPI release files."""
if not files:
return None
# Prefer wheel over sdist
wheel_file = None
sdist_file = None
for f in files:
filename = f.get("filename", "")
if filename.endswith(".whl"):
# Prefer platform-agnostic wheels
if "py3-none-any" in filename or wheel_file is None:
wheel_file = f
elif filename.endswith(".tar.gz") and sdist_file is None:
sdist_file = f
selected = wheel_file or sdist_file
if not selected:
# Fall back to first available file
selected = files[0]
return VersionInfo(
version=version,
download_url=selected.get("url", ""),
filename=selected.get("filename", ""),
sha256=selected.get("digests", {}).get("sha256"),
size=selected.get("size"),
content_type="application/zip"
if selected.get("filename", "").endswith(".whl")
else "application/gzip",
)
async def fetch_package(
self,
package_name: str,
version_info: VersionInfo,
db: Session,
storage: "S3Storage",
) -> Optional[FetchResult]:
"""Fetch and cache a PyPI package."""
# Import here to avoid circular imports
from .pypi_proxy import fetch_and_cache_pypi_package
normalized = self._normalize_package_name(package_name)
logger.info(
f"Fetching {normalized}=={version_info.version} from upstream PyPI"
)
result = await fetch_and_cache_pypi_package(
db=db,
storage=storage,
http_client=self.client,
package_name=normalized,
filename=version_info.filename,
download_url=version_info.download_url,
expected_sha256=version_info.sha256,
)
if result is None:
return None
return FetchResult(
artifact_id=result["artifact_id"],
size=result["size"],
version=version_info.version,
filename=version_info.filename,
already_cached=result.get("already_cached", False),
)
def get_registry_client(
source_type: str,
http_client: httpx.AsyncClient,
upstream_sources: List,
) -> Optional[RegistryClient]:
"""
Factory function to get a registry client for a source type.
Args:
source_type: The registry type ('pypi', 'npm', etc.)
http_client: Shared async HTTP client
upstream_sources: List of configured upstream sources
Returns:
RegistryClient for the source type, or None if not supported
"""
if source_type == "pypi":
# Filter to PyPI sources
pypi_sources = [s for s in upstream_sources if getattr(s, "source_type", "") == "pypi"]
return PyPIRegistryClient(http_client, pypi_sources)
# Future: Add npm, maven, etc.
logger.debug(f"No registry client available for source type: {source_type}")
return None

View File

@@ -141,6 +141,7 @@ from .dependencies import (
get_reverse_dependencies,
check_circular_dependencies,
resolve_dependencies,
resolve_dependencies_with_fetch,
InvalidEnsureFileError,
CircularDependencyError,
DependencyConflictError,
@@ -7025,12 +7026,17 @@ def get_package_reverse_dependencies(
response_model=DependencyResolutionResponse,
tags=["dependencies"],
)
def resolve_artifact_dependencies(
async def resolve_artifact_dependencies(
project_name: str,
package_name: str,
ref: str,
request: Request,
auto_fetch: bool = Query(
False,
description="Fetch missing dependencies from upstream registries (e.g., PyPI)"
),
db: Session = Depends(get_db),
storage: S3Storage = Depends(get_storage),
current_user: Optional[User] = Depends(get_current_user_optional),
):
"""
@@ -7039,6 +7045,16 @@ def resolve_artifact_dependencies(
Returns a flat list of all artifacts needed, in topological order
(dependencies before dependents). Includes download URLs for each artifact.
**Parameters:**
- **auto_fetch**: When true, attempts to fetch missing dependencies from
upstream registries (PyPI for _pypi project packages). Default is false
for fast, network-free resolution.
**Response Fields:**
- **resolved**: All artifacts in dependency order with download URLs
- **missing**: Dependencies that couldn't be resolved (with fetch status if auto_fetch=true)
- **fetched**: Artifacts that were fetched from upstream during this request
**Error Responses:**
- 404: Artifact or dependency not found
- 409: Circular dependency or version conflict detected
@@ -7050,7 +7066,39 @@ def resolve_artifact_dependencies(
base_url = str(request.base_url).rstrip("/")
try:
if auto_fetch:
# Use async resolution with auto-fetch
from .registry_client import get_registry_client
from .pypi_proxy import _get_pypi_upstream_sources
settings = get_settings()
# Get HTTP client from app state
http_client = request.app.state.http_client.get_client()
# Get upstream sources for registry clients
pypi_sources = _get_pypi_upstream_sources(db)
# Build registry clients
registry_clients = {}
pypi_client = get_registry_client("pypi", http_client, pypi_sources)
if pypi_client:
registry_clients["_pypi"] = pypi_client
return await resolve_dependencies_with_fetch(
db=db,
project_name=project_name,
package_name=package_name,
ref=ref,
base_url=base_url,
storage=storage,
registry_clients=registry_clients,
max_fetch_depth=settings.auto_fetch_max_depth,
)
else:
# Fast, synchronous resolution without network calls
return resolve_dependencies(db, project_name, package_name, ref, base_url)
except DependencyNotFoundError as e:
raise HTTPException(
status_code=404,

View File

@@ -892,6 +892,8 @@ class MissingDependency(BaseModel):
package: str
constraint: Optional[str] = None
required_by: Optional[str] = None
fetch_attempted: bool = False # True if auto-fetch was attempted
fetch_error: Optional[str] = None # Error message if fetch failed
class DependencyResolutionResponse(BaseModel):
@@ -899,6 +901,7 @@ class DependencyResolutionResponse(BaseModel):
requested: Dict[str, str] # project, package, ref
resolved: List[ResolvedArtifact]
missing: List[MissingDependency] = []
fetched: List[ResolvedArtifact] = [] # Artifacts fetched from upstream during resolution
total_size: int
artifact_count: int

View File

@@ -1067,3 +1067,277 @@ class TestConflictDetection:
finally:
for pkg in [pkg_app, pkg_lib_a, pkg_lib_b, pkg_common]:
integration_client.delete(f"/api/v1/project/{test_project}/packages/{pkg}")
class TestAutoFetchDependencies:
"""Tests for auto-fetch functionality in dependency resolution.
These tests verify:
- Resolution with auto_fetch=false (default) behavior is unchanged
- Resolution with auto_fetch=true attempts to fetch missing dependencies
- Proper handling of missing/non-existent packages
- Response schema includes fetched artifacts list
"""
@pytest.mark.integration
def test_resolve_auto_fetch_false_is_default(
self, integration_client, test_package, unique_test_id
):
"""Test that auto_fetch=false is the default and behaves as before."""
project_name, package_name = test_package
# Upload a simple artifact without dependencies
content = unique_content("autofetch-default", unique_test_id, "nodeps")
files = {"file": ("default.tar.gz", BytesIO(content), "application/gzip")}
response = integration_client.post(
f"/api/v1/project/{project_name}/{package_name}/upload",
files=files,
data={"version": f"v1.0.0-{unique_test_id}"},
)
assert response.status_code == 200
# Resolve without auto_fetch param (should default to false)
response = integration_client.get(
f"/api/v1/project/{project_name}/{package_name}/+/v1.0.0-{unique_test_id}/resolve"
)
assert response.status_code == 200
data = response.json()
# Should have empty fetched list
assert data.get("fetched", []) == []
assert data["artifact_count"] == 1
@pytest.mark.integration
def test_resolve_auto_fetch_explicit_false(
self, integration_client, test_package, unique_test_id
):
"""Test that auto_fetch=false works explicitly."""
project_name, package_name = test_package
content = unique_content("autofetch-explicit-false", unique_test_id, "nodeps")
files = {"file": ("explicit.tar.gz", BytesIO(content), "application/gzip")}
response = integration_client.post(
f"/api/v1/project/{project_name}/{package_name}/upload",
files=files,
data={"version": f"v2.0.0-{unique_test_id}"},
)
assert response.status_code == 200
# Resolve with explicit auto_fetch=false
response = integration_client.get(
f"/api/v1/project/{project_name}/{package_name}/+/v2.0.0-{unique_test_id}/resolve",
params={"auto_fetch": "false"},
)
assert response.status_code == 200
data = response.json()
assert data.get("fetched", []) == []
@pytest.mark.integration
def test_resolve_auto_fetch_true_no_missing_deps(
self, integration_client, test_project, unique_test_id
):
"""Test that auto_fetch=true works when all deps are already cached."""
pkg_a = f"fetch-a-{unique_test_id}"
pkg_b = f"fetch-b-{unique_test_id}"
for pkg in [pkg_a, pkg_b]:
response = integration_client.post(
f"/api/v1/project/{test_project}/packages",
json={"name": pkg}
)
assert response.status_code == 200
try:
# Upload B (no deps)
content_b = unique_content("B", unique_test_id, "fetch")
files = {"file": ("b.tar.gz", BytesIO(content_b), "application/gzip")}
response = integration_client.post(
f"/api/v1/project/{test_project}/{pkg_b}/upload",
files=files,
data={"version": "1.0.0"},
)
assert response.status_code == 200
# Upload A (depends on B)
ensure_a = yaml.dump({
"dependencies": [
{"project": test_project, "package": pkg_b, "version": "1.0.0"}
]
})
content_a = unique_content("A", unique_test_id, "fetch")
files = {
"file": ("a.tar.gz", BytesIO(content_a), "application/gzip"),
"ensure": ("orchard.ensure", BytesIO(ensure_a.encode()), "application/x-yaml"),
}
response = integration_client.post(
f"/api/v1/project/{test_project}/{pkg_a}/upload",
files=files,
data={"version": "1.0.0"},
)
assert response.status_code == 200
# Resolve with auto_fetch=true - should work since deps are cached
response = integration_client.get(
f"/api/v1/project/{test_project}/{pkg_a}/+/1.0.0/resolve",
params={"auto_fetch": "true"},
)
assert response.status_code == 200
data = response.json()
# Should resolve successfully
assert data["artifact_count"] == 2
# Nothing fetched since everything was cached
assert len(data.get("fetched", [])) == 0
# No missing deps
assert len(data.get("missing", [])) == 0
finally:
for pkg in [pkg_a, pkg_b]:
integration_client.delete(f"/api/v1/project/{test_project}/packages/{pkg}")
@pytest.mark.integration
def test_resolve_missing_dep_with_auto_fetch_false(
self, integration_client, test_package, unique_test_id
):
"""Test that missing deps are reported when auto_fetch=false."""
project_name, package_name = test_package
# Create _pypi system project if it doesn't exist
response = integration_client.get("/api/v1/projects/_pypi")
if response.status_code == 404:
response = integration_client.post(
"/api/v1/projects",
json={"name": "_pypi", "description": "System project for PyPI packages"}
)
# May fail if already exists or can't create - that's ok
# Upload artifact with dependency on _pypi package that doesn't exist locally
ensure_content = yaml.dump({
"dependencies": [
{"project": "_pypi", "package": "nonexistent-pkg-xyz123", "version": ">=1.0.0"}
]
})
content = unique_content("missing-pypi", unique_test_id, "dep")
files = {
"file": ("missing-pypi-dep.tar.gz", BytesIO(content), "application/gzip"),
"ensure": ("orchard.ensure", BytesIO(ensure_content.encode()), "application/x-yaml"),
}
response = integration_client.post(
f"/api/v1/project/{project_name}/{package_name}/upload",
files=files,
data={"version": f"v3.0.0-{unique_test_id}"},
)
# Upload should succeed - validation is loose for system projects
if response.status_code == 200:
# Resolve without auto_fetch - should report missing
response = integration_client.get(
f"/api/v1/project/{project_name}/{package_name}/+/v3.0.0-{unique_test_id}/resolve",
params={"auto_fetch": "false"},
)
assert response.status_code == 200
data = response.json()
# Should have missing dependencies
assert len(data.get("missing", [])) >= 1
# Verify missing dependency structure
missing = data["missing"][0]
assert missing["project"] == "_pypi"
assert missing["package"] == "nonexistent-pkg-xyz123"
# Without auto_fetch, these should be false/None
assert missing.get("fetch_attempted", False) is False
@pytest.mark.integration
def test_resolve_response_schema_has_fetched_field(
self, integration_client, test_package, unique_test_id
):
"""Test that the resolve response always includes the fetched field."""
project_name, package_name = test_package
content = unique_content("schema-check", unique_test_id, "nodeps")
files = {"file": ("schema.tar.gz", BytesIO(content), "application/gzip")}
response = integration_client.post(
f"/api/v1/project/{project_name}/{package_name}/upload",
files=files,
data={"version": f"v4.0.0-{unique_test_id}"},
)
assert response.status_code == 200
# Check both auto_fetch modes include fetched field
for auto_fetch in ["false", "true"]:
response = integration_client.get(
f"/api/v1/project/{project_name}/{package_name}/+/v4.0.0-{unique_test_id}/resolve",
params={"auto_fetch": auto_fetch},
)
assert response.status_code == 200
data = response.json()
# Required fields
assert "requested" in data
assert "resolved" in data
assert "missing" in data
assert "fetched" in data # New field
assert "total_size" in data
assert "artifact_count" in data
# Types
assert isinstance(data["fetched"], list)
assert isinstance(data["missing"], list)
@pytest.mark.integration
def test_missing_dep_schema_has_fetch_fields(
self, integration_client, test_package, unique_test_id
):
"""Test that missing dependency entries have fetch_attempted and fetch_error fields."""
project_name, package_name = test_package
# Create a dependency on a non-existent package in a real project
dep_project_name = f"dep-test-{unique_test_id}"
response = integration_client.post(
"/api/v1/projects", json={"name": dep_project_name}
)
assert response.status_code == 200
try:
ensure_content = yaml.dump({
"dependencies": [
{"project": dep_project_name, "package": "nonexistent-pkg", "version": "1.0.0"}
]
})
content = unique_content("missing-schema", unique_test_id, "check")
files = {
"file": ("missing-schema.tar.gz", BytesIO(content), "application/gzip"),
"ensure": ("orchard.ensure", BytesIO(ensure_content.encode()), "application/x-yaml"),
}
response = integration_client.post(
f"/api/v1/project/{project_name}/{package_name}/upload",
files=files,
data={"version": f"v5.0.0-{unique_test_id}"},
)
assert response.status_code == 200
# Resolve
response = integration_client.get(
f"/api/v1/project/{project_name}/{package_name}/+/v5.0.0-{unique_test_id}/resolve",
params={"auto_fetch": "true"},
)
assert response.status_code == 200
data = response.json()
# Should have missing dependencies
assert len(data.get("missing", [])) >= 1
# Check schema for missing dependency
missing = data["missing"][0]
assert "project" in missing
assert "package" in missing
assert "constraint" in missing
assert "required_by" in missing
# New fields
assert "fetch_attempted" in missing
assert "fetch_error" in missing # May be None
finally:
integration_client.delete(f"/api/v1/projects/{dep_project_name}")

View File

@@ -185,56 +185,6 @@ h2 {
color: var(--warning-color, #f59e0b);
}
/* Usage Section */
.usage-section {
margin-top: 32px;
background: var(--bg-secondary);
}
.usage-section h3 {
margin-bottom: 12px;
color: var(--text-primary);
font-size: 1rem;
font-weight: 600;
}
.usage-section p {
color: var(--text-secondary);
margin-bottom: 12px;
font-size: 0.875rem;
}
.usage-section pre {
background: #0d0d0f;
border: 1px solid var(--border-primary);
padding: 16px 20px;
border-radius: var(--radius-md);
overflow-x: auto;
margin-bottom: 16px;
}
.usage-section code {
font-family: 'JetBrains Mono', 'Fira Code', 'Consolas', monospace;
font-size: 0.8125rem;
color: #e2e8f0;
}
/* Syntax highlighting for code blocks */
.usage-section pre {
position: relative;
}
.usage-section pre::before {
content: 'bash';
position: absolute;
top: 8px;
right: 12px;
font-size: 0.6875rem;
color: var(--text-muted);
text-transform: uppercase;
letter-spacing: 0.05em;
}
/* Copy button for code blocks (optional enhancement) */
.code-block {
position: relative;

View File

@@ -696,18 +696,6 @@ function PackagePage() {
</div>
)}
<div className="usage-section card">
<h3>Usage</h3>
<p>Download artifacts using:</p>
<pre>
<code>curl -O {window.location.origin}/api/v1/project/{projectName}/{packageName}/+/latest</code>
</pre>
<p>Or with a specific version:</p>
<pre>
<code>curl -O {window.location.origin}/api/v1/project/{projectName}/{packageName}/+/1.0.0</code>
</pre>
</div>
{/* Dependency Graph Modal */}
{showGraph && selectedArtifact && (
<DependencyGraph