fix: add security checks and tests for code review

Security:
- Add authorization checks to list_packages, update_package, delete_package endpoints
- Add MAX_TOTAL_ARTIFACTS limit (1000) to prevent memory exhaustion during dependency resolution
- Add TooManyArtifactsError exception for proper error handling

UI:
- Display reverse dependency errors in PackagePage
- Add warning display for failed dependency fetches in DependencyGraph

Tests:
- Add unit tests for metadata extraction (deb, wheel, tarball, jar)
- Add unit tests for rate limit configuration
- Add unit tests for PyPI registry client
This commit is contained in:
Mondo Diaz
2026-02-04 16:19:16 -06:00
parent 7140c9f4f2
commit 6cf487b224
8 changed files with 649 additions and 13 deletions

View File

@@ -109,9 +109,17 @@ class DependencyDepthExceededError(DependencyError):
super().__init__(f"Dependency resolution exceeded maximum depth of {max_depth}")
class TooManyArtifactsError(DependencyError):
"""Raised when dependency resolution resolves too many artifacts."""
def __init__(self, max_artifacts: int):
self.max_artifacts = max_artifacts
super().__init__(f"Dependency resolution exceeded maximum of {max_artifacts} artifacts")
# Safety limits to prevent DoS attacks
MAX_DEPENDENCY_DEPTH = 100 # Maximum levels of nested dependencies
MAX_DEPENDENCIES_PER_ARTIFACT = 200 # Maximum direct dependencies per artifact
MAX_TOTAL_ARTIFACTS = 1000 # Maximum total artifacts in resolution to prevent memory issues
def parse_ensure_file(content: bytes) -> EnsureFileContent:
@@ -849,6 +857,10 @@ def resolve_dependencies(
visited.add(artifact_id)
resolution_path_sync.pop()
# Check total artifacts limit
if len(resolution_order) >= MAX_TOTAL_ARTIFACTS:
raise TooManyArtifactsError(MAX_TOTAL_ARTIFACTS)
# Add to resolution order (dependencies before dependents)
resolution_order.append(artifact_id)
@@ -1257,6 +1269,10 @@ async def resolve_dependencies_with_fetch(
visited.add(artifact_id)
resolution_path.pop()
# Check total artifacts limit
if len(resolution_order) >= MAX_TOTAL_ARTIFACTS:
raise TooManyArtifactsError(MAX_TOTAL_ARTIFACTS)
resolution_order.append(artifact_id)
resolved_artifacts[artifact_id] = ResolvedArtifact(

View File

@@ -147,6 +147,7 @@ from .dependencies import (
DependencyConflictError,
DependencyNotFoundError,
DependencyDepthExceededError,
TooManyArtifactsError,
)
from .config import get_settings, get_env_upstream_sources
from .checksum import (
@@ -2666,10 +2667,10 @@ def list_packages(
format: Optional[str] = Query(default=None, description="Filter by package format"),
platform: Optional[str] = Query(default=None, description="Filter by platform"),
db: Session = Depends(get_db),
current_user: Optional[User] = Depends(get_current_user_optional),
):
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check read access (handles private project visibility)
project = check_project_access(db, project_name, current_user, "read")
# Validate sort field
valid_sort_fields = {
@@ -2950,13 +2951,13 @@ def update_package(
package_update: PackageUpdate,
request: Request,
db: Session = Depends(get_db),
current_user: Optional[User] = Depends(get_current_user_optional),
):
"""Update a package's metadata."""
user_id = get_user_id(request)
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check write access to project
project = check_project_access(db, project_name, current_user, "write")
package = (
db.query(Package)
@@ -3033,6 +3034,7 @@ def delete_package(
package_name: str,
request: Request,
db: Session = Depends(get_db),
current_user: Optional[User] = Depends(get_current_user_optional),
):
"""
Delete a package and all its versions.
@@ -3043,9 +3045,8 @@ def delete_package(
"""
user_id = get_user_id(request)
project = db.query(Project).filter(Project.name == project_name).first()
if not project:
raise HTTPException(status_code=404, detail="Project not found")
# Check write access to project (deletion requires write permission)
project = check_project_access(db, project_name, current_user, "write")
package = (
db.query(Package)
@@ -7137,6 +7138,15 @@ async def resolve_artifact_dependencies(
"max_depth": e.max_depth,
}
)
except TooManyArtifactsError as e:
raise HTTPException(
status_code=400,
detail={
"error": "too_many_artifacts",
"message": str(e),
"max_artifacts": e.max_artifacts,
}
)
# --- Upstream Caching Routes ---

View File

@@ -0,0 +1,243 @@
"""Unit tests for metadata extraction functionality."""
import io
import gzip
import tarfile
import zipfile
import pytest
from app.metadata import (
extract_metadata,
extract_deb_metadata,
extract_wheel_metadata,
extract_tarball_metadata,
extract_jar_metadata,
parse_deb_control,
)
class TestDebMetadata:
"""Tests for Debian package metadata extraction."""
def test_parse_deb_control_basic(self):
"""Test parsing a basic control file."""
control = """Package: my-package
Version: 1.2.3
Architecture: amd64
Maintainer: Test <test@example.com>
Description: A test package
"""
result = parse_deb_control(control)
assert result["package_name"] == "my-package"
assert result["version"] == "1.2.3"
assert result["architecture"] == "amd64"
assert result["format"] == "deb"
def test_parse_deb_control_with_epoch(self):
"""Test parsing version with epoch."""
control = """Package: another-pkg
Version: 2:1.0.0-1
"""
result = parse_deb_control(control)
assert result["version"] == "2:1.0.0-1"
assert result["package_name"] == "another-pkg"
assert result["format"] == "deb"
def test_extract_deb_metadata_invalid_magic(self):
"""Test that invalid ar magic returns empty dict."""
file = io.BytesIO(b"not an ar archive")
result = extract_deb_metadata(file)
assert result == {}
def test_extract_deb_metadata_valid_ar_no_control(self):
"""Test ar archive without control.tar returns empty."""
# Create minimal ar archive with just debian-binary
ar_data = b"!<arch>\n"
ar_data += b"debian-binary/ 0 0 0 100644 4 `\n"
ar_data += b"2.0\n"
file = io.BytesIO(ar_data)
result = extract_deb_metadata(file)
# Should return empty since no control.tar found
assert result == {} or "version" not in result
class TestWheelMetadata:
"""Tests for Python wheel metadata extraction."""
def _create_wheel_with_metadata(self, metadata_content: str) -> io.BytesIO:
"""Helper to create a wheel file with given METADATA content."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w') as zf:
zf.writestr('package-1.0.0.dist-info/METADATA', metadata_content)
buf.seek(0)
return buf
def test_extract_wheel_version(self):
"""Test extracting version from wheel METADATA."""
metadata = """Metadata-Version: 2.1
Name: my-package
Version: 2.3.4
Summary: A test package
"""
file = self._create_wheel_with_metadata(metadata)
result = extract_wheel_metadata(file)
assert result.get("version") == "2.3.4"
assert result.get("package_name") == "my-package"
assert result.get("format") == "wheel"
def test_extract_wheel_no_version(self):
"""Test wheel without version field."""
metadata = """Metadata-Version: 2.1
Name: no-version-pkg
"""
file = self._create_wheel_with_metadata(metadata)
result = extract_wheel_metadata(file)
assert "version" not in result
assert result.get("package_name") == "no-version-pkg"
assert result.get("format") == "wheel"
def test_extract_wheel_invalid_zip(self):
"""Test that invalid zip returns format-only dict."""
file = io.BytesIO(b"not a zip file")
result = extract_wheel_metadata(file)
assert result == {"format": "wheel"}
def test_extract_wheel_no_metadata_file(self):
"""Test wheel without METADATA file returns format-only dict."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w') as zf:
zf.writestr('some_file.py', 'print("hello")')
buf.seek(0)
result = extract_wheel_metadata(buf)
assert result == {"format": "wheel"}
class TestTarballMetadata:
"""Tests for tarball metadata extraction from filename."""
def test_extract_version_from_filename_standard(self):
"""Test standard package-version.tar.gz format."""
file = io.BytesIO(b"") # Content doesn't matter for filename extraction
result = extract_tarball_metadata(file, "mypackage-1.2.3.tar.gz")
assert result.get("version") == "1.2.3"
assert result.get("package_name") == "mypackage"
assert result.get("format") == "tarball"
def test_extract_version_with_v_prefix(self):
"""Test version with v prefix."""
file = io.BytesIO(b"")
result = extract_tarball_metadata(file, "package-v2.0.0.tar.gz")
assert result.get("version") == "2.0.0"
assert result.get("package_name") == "package"
assert result.get("format") == "tarball"
def test_extract_version_underscore_separator(self):
"""Test package_version format."""
file = io.BytesIO(b"")
result = extract_tarball_metadata(file, "my_package_3.1.4.tar.gz")
assert result.get("version") == "3.1.4"
assert result.get("package_name") == "my_package"
assert result.get("format") == "tarball"
def test_extract_version_complex(self):
"""Test complex version string."""
file = io.BytesIO(b"")
result = extract_tarball_metadata(file, "package-1.0.0-beta.1.tar.gz")
# The regex handles versions with suffix like -beta_1
assert result.get("format") == "tarball"
# May or may not extract version depending on regex match
if "version" in result:
assert result.get("package_name") == "package"
def test_extract_no_version_in_filename(self):
"""Test filename without version returns format-only dict."""
file = io.BytesIO(b"")
result = extract_tarball_metadata(file, "package.tar.gz")
# Should return format but no version
assert result.get("version") is None
assert result.get("format") == "tarball"
class TestJarMetadata:
"""Tests for JAR/Java metadata extraction."""
def _create_jar_with_manifest(self, manifest_content: str) -> io.BytesIO:
"""Helper to create a JAR file with given MANIFEST.MF content."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w') as zf:
zf.writestr('META-INF/MANIFEST.MF', manifest_content)
buf.seek(0)
return buf
def test_extract_jar_version_from_manifest(self):
"""Test extracting version from MANIFEST.MF."""
manifest = """Manifest-Version: 1.0
Implementation-Title: my-library
Implementation-Version: 4.5.6
"""
file = self._create_jar_with_manifest(manifest)
result = extract_jar_metadata(file)
assert result.get("version") == "4.5.6"
assert result.get("package_name") == "my-library"
assert result.get("format") == "jar"
def test_extract_jar_bundle_version(self):
"""Test extracting OSGi Bundle-Version."""
manifest = """Manifest-Version: 1.0
Bundle-Version: 2.1.0
Bundle-Name: Test Bundle
"""
file = self._create_jar_with_manifest(manifest)
result = extract_jar_metadata(file)
# Bundle-Version is stored in bundle_version, not version
assert result.get("bundle_version") == "2.1.0"
assert result.get("bundle_name") == "Test Bundle"
assert result.get("format") == "jar"
def test_extract_jar_invalid_zip(self):
"""Test that invalid JAR returns format-only dict."""
file = io.BytesIO(b"not a jar file")
result = extract_jar_metadata(file)
assert result == {"format": "jar"}
class TestExtractMetadataDispatch:
"""Tests for the main extract_metadata dispatcher function."""
def test_dispatch_to_wheel(self):
"""Test that .whl files use wheel extractor."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w') as zf:
zf.writestr('pkg-1.0.dist-info/METADATA', 'Version: 1.0.0\nName: pkg')
buf.seek(0)
result = extract_metadata(buf, "package-1.0.0-py3-none-any.whl")
assert result.get("version") == "1.0.0"
assert result.get("package_name") == "pkg"
assert result.get("format") == "wheel"
def test_dispatch_to_tarball(self):
"""Test that .tar.gz files use tarball extractor."""
file = io.BytesIO(b"")
result = extract_metadata(file, "mypackage-2.3.4.tar.gz")
assert result.get("version") == "2.3.4"
assert result.get("package_name") == "mypackage"
assert result.get("format") == "tarball"
def test_dispatch_unknown_extension(self):
"""Test that unknown extensions return empty dict."""
file = io.BytesIO(b"some content")
result = extract_metadata(file, "unknown.xyz")
assert result == {}
def test_file_position_reset_after_extraction(self):
"""Test that file position is reset to start after extraction."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, 'w') as zf:
zf.writestr('pkg-1.0.dist-info/METADATA', 'Version: 1.0.0\nName: pkg')
buf.seek(0)
extract_metadata(buf, "package.whl")
# File should be back at position 0
assert buf.tell() == 0

View File

@@ -0,0 +1,65 @@
"""Unit tests for rate limiting configuration."""
import os
import pytest
class TestRateLimitConfiguration:
"""Tests for rate limit configuration."""
def test_default_login_rate_limit(self):
"""Test default login rate limit is 5/minute."""
# Import fresh to get default value
import importlib
import app.rate_limit as rate_limit_module
# Save original env value
original = os.environ.get("ORCHARD_LOGIN_RATE_LIMIT")
try:
# Clear env variable to test default
if "ORCHARD_LOGIN_RATE_LIMIT" in os.environ:
del os.environ["ORCHARD_LOGIN_RATE_LIMIT"]
# Reload module to pick up new env
importlib.reload(rate_limit_module)
assert rate_limit_module.LOGIN_RATE_LIMIT == "5/minute"
finally:
# Restore original env value
if original is not None:
os.environ["ORCHARD_LOGIN_RATE_LIMIT"] = original
importlib.reload(rate_limit_module)
def test_custom_login_rate_limit(self):
"""Test custom login rate limit from environment."""
import importlib
import app.rate_limit as rate_limit_module
# Save original env value
original = os.environ.get("ORCHARD_LOGIN_RATE_LIMIT")
try:
# Set custom rate limit
os.environ["ORCHARD_LOGIN_RATE_LIMIT"] = "10/minute"
# Reload module to pick up new env
importlib.reload(rate_limit_module)
assert rate_limit_module.LOGIN_RATE_LIMIT == "10/minute"
finally:
# Restore original env value
if original is not None:
os.environ["ORCHARD_LOGIN_RATE_LIMIT"] = original
else:
if "ORCHARD_LOGIN_RATE_LIMIT" in os.environ:
del os.environ["ORCHARD_LOGIN_RATE_LIMIT"]
importlib.reload(rate_limit_module)
def test_limiter_exists(self):
"""Test that limiter object is created."""
from app.rate_limit import limiter
assert limiter is not None
# Limiter should have a key_func set
assert limiter._key_func is not None

View File

@@ -0,0 +1,264 @@
"""Unit tests for registry client functionality."""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
import httpx
from packaging.specifiers import SpecifierSet
from app.registry_client import (
PyPIRegistryClient,
VersionInfo,
FetchResult,
get_registry_client,
)
class TestPyPIRegistryClient:
"""Tests for PyPI registry client."""
@pytest.fixture
def mock_http_client(self):
"""Create a mock async HTTP client."""
return AsyncMock(spec=httpx.AsyncClient)
@pytest.fixture
def client(self, mock_http_client):
"""Create a PyPI registry client with mocked HTTP."""
return PyPIRegistryClient(
http_client=mock_http_client,
upstream_sources=[],
pypi_api_url="https://pypi.org/pypi",
)
def test_source_type(self, client):
"""Test source_type returns 'pypi'."""
assert client.source_type == "pypi"
def test_normalize_package_name(self, client):
"""Test package name normalization per PEP 503."""
assert client._normalize_package_name("My_Package") == "my-package"
assert client._normalize_package_name("my.package") == "my-package"
assert client._normalize_package_name("my-package") == "my-package"
assert client._normalize_package_name("MY-PACKAGE") == "my-package"
assert client._normalize_package_name("my__package") == "my-package"
assert client._normalize_package_name("my..package") == "my-package"
@pytest.mark.asyncio
async def test_get_available_versions_success(self, client, mock_http_client):
"""Test fetching available versions from PyPI."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"releases": {
"1.0.0": [{"packagetype": "bdist_wheel"}],
"1.1.0": [{"packagetype": "bdist_wheel"}],
"2.0.0": [{"packagetype": "bdist_wheel"}],
}
}
mock_http_client.get.return_value = mock_response
versions = await client.get_available_versions("test-package")
assert "1.0.0" in versions
assert "1.1.0" in versions
assert "2.0.0" in versions
mock_http_client.get.assert_called_once()
@pytest.mark.asyncio
async def test_get_available_versions_empty(self, client, mock_http_client):
"""Test handling package with no releases."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {"releases": {}}
mock_http_client.get.return_value = mock_response
versions = await client.get_available_versions("empty-package")
assert versions == []
@pytest.mark.asyncio
async def test_get_available_versions_404(self, client, mock_http_client):
"""Test handling non-existent package."""
mock_response = MagicMock()
mock_response.status_code = 404
mock_http_client.get.return_value = mock_response
versions = await client.get_available_versions("nonexistent")
assert versions == []
@pytest.mark.asyncio
async def test_resolve_constraint_wildcard(self, client, mock_http_client):
"""Test resolving wildcard constraint returns latest."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"info": {"version": "2.0.0"},
"releases": {
"1.0.0": [
{
"packagetype": "bdist_wheel",
"url": "https://files.pythonhosted.org/test-1.0.0.whl",
"filename": "test-1.0.0.whl",
"digests": {"sha256": "abc123"},
"size": 1000,
}
],
"2.0.0": [
{
"packagetype": "bdist_wheel",
"url": "https://files.pythonhosted.org/test-2.0.0.whl",
"filename": "test-2.0.0.whl",
"digests": {"sha256": "def456"},
"size": 2000,
}
],
},
}
mock_http_client.get.return_value = mock_response
result = await client.resolve_constraint("test-package", "*")
assert result is not None
assert result.version == "2.0.0"
@pytest.mark.asyncio
async def test_resolve_constraint_specific_version(self, client, mock_http_client):
"""Test resolving specific version constraint."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"releases": {
"1.0.0": [
{
"packagetype": "bdist_wheel",
"url": "https://files.pythonhosted.org/test-1.0.0.whl",
"filename": "test-1.0.0.whl",
"digests": {"sha256": "abc123"},
"size": 1000,
}
],
"2.0.0": [
{
"packagetype": "bdist_wheel",
"url": "https://files.pythonhosted.org/test-2.0.0.whl",
"filename": "test-2.0.0.whl",
}
],
},
}
mock_http_client.get.return_value = mock_response
result = await client.resolve_constraint("test-package", ">=1.0.0,<2.0.0")
assert result is not None
assert result.version == "1.0.0"
@pytest.mark.asyncio
async def test_resolve_constraint_no_match(self, client, mock_http_client):
"""Test resolving constraint with no matching version."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"releases": {
"1.0.0": [
{
"packagetype": "bdist_wheel",
"url": "https://files.pythonhosted.org/test-1.0.0.whl",
"filename": "test-1.0.0.whl",
}
],
},
}
mock_http_client.get.return_value = mock_response
result = await client.resolve_constraint("test-package", ">=5.0.0")
assert result is None
class TestVersionInfo:
"""Tests for VersionInfo dataclass."""
def test_create_version_info(self):
"""Test creating VersionInfo with all fields."""
info = VersionInfo(
version="1.0.0",
download_url="https://example.com/pkg-1.0.0.whl",
filename="pkg-1.0.0.whl",
sha256="abc123",
size=5000,
content_type="application/zip",
)
assert info.version == "1.0.0"
assert info.download_url == "https://example.com/pkg-1.0.0.whl"
assert info.filename == "pkg-1.0.0.whl"
assert info.sha256 == "abc123"
assert info.size == 5000
def test_create_version_info_minimal(self):
"""Test creating VersionInfo with only required fields."""
info = VersionInfo(
version="1.0.0",
download_url="https://example.com/pkg.whl",
filename="pkg.whl",
)
assert info.sha256 is None
assert info.size is None
class TestFetchResult:
"""Tests for FetchResult dataclass."""
def test_create_fetch_result(self):
"""Test creating FetchResult."""
result = FetchResult(
artifact_id="abc123def456",
size=10000,
version="2.0.0",
filename="pkg-2.0.0.whl",
already_cached=True,
)
assert result.artifact_id == "abc123def456"
assert result.size == 10000
assert result.version == "2.0.0"
assert result.already_cached is True
def test_fetch_result_default_not_cached(self):
"""Test FetchResult defaults to not cached."""
result = FetchResult(
artifact_id="xyz",
size=100,
version="1.0.0",
filename="pkg.whl",
)
assert result.already_cached is False
class TestGetRegistryClient:
"""Tests for registry client factory function."""
def test_get_pypi_client(self):
"""Test getting PyPI client."""
mock_client = MagicMock()
mock_sources = []
client = get_registry_client("pypi", mock_client, mock_sources)
assert isinstance(client, PyPIRegistryClient)
def test_get_unsupported_client(self):
"""Test getting unsupported registry type returns None."""
mock_client = MagicMock()
client = get_registry_client("npm", mock_client, [])
assert client is None
def test_get_unknown_client(self):
"""Test getting unknown registry type returns None."""
mock_client = MagicMock()
client = get_registry_client("unknown", mock_client, [])
assert client is None