Fix httpx.Timeout configuration in PyPI proxy
This commit is contained in:
374
backend/tests/unit/test_cache_service.py
Normal file
374
backend/tests/unit/test_cache_service.py
Normal file
@@ -0,0 +1,374 @@
|
||||
"""Tests for CacheService."""
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, AsyncMock, patch
|
||||
|
||||
|
||||
class TestCacheCategory:
|
||||
"""Tests for cache category enum."""
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_immutable_categories_have_no_ttl(self):
|
||||
"""Immutable categories should return None for TTL."""
|
||||
from app.cache_service import CacheCategory, get_category_ttl
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
|
||||
assert get_category_ttl(CacheCategory.ARTIFACT_METADATA, settings) is None
|
||||
assert get_category_ttl(CacheCategory.ARTIFACT_DEPENDENCIES, settings) is None
|
||||
assert get_category_ttl(CacheCategory.DEPENDENCY_RESOLUTION, settings) is None
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_mutable_categories_have_ttl(self):
|
||||
"""Mutable categories should return configured TTL."""
|
||||
from app.cache_service import CacheCategory, get_category_ttl
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(
|
||||
cache_ttl_index=300,
|
||||
cache_ttl_upstream=3600,
|
||||
)
|
||||
|
||||
assert get_category_ttl(CacheCategory.PACKAGE_INDEX, settings) == 300
|
||||
assert get_category_ttl(CacheCategory.UPSTREAM_SOURCES, settings) == 3600
|
||||
|
||||
|
||||
class TestCacheService:
|
||||
"""Tests for Redis cache service."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_disabled_cache_returns_none(self):
|
||||
"""When Redis disabled, get() should return None."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
await cache.startup()
|
||||
|
||||
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key")
|
||||
|
||||
assert result is None
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_disabled_cache_set_is_noop(self):
|
||||
"""When Redis disabled, set() should be a no-op."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
await cache.startup()
|
||||
|
||||
# Should not raise
|
||||
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"test-value")
|
||||
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_cache_key_namespacing(self):
|
||||
"""Cache keys should be properly namespaced."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
|
||||
key = CacheService._make_key(CacheCategory.PACKAGE_INDEX, "pypi", "numpy")
|
||||
|
||||
assert key == "orchard:index:pypi:numpy"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_ping_returns_false_when_disabled(self):
|
||||
"""ping() should return False when Redis is disabled."""
|
||||
from app.cache_service import CacheService
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
await cache.startup()
|
||||
|
||||
result = await cache.ping()
|
||||
|
||||
assert result is False
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_enabled_property(self):
|
||||
"""enabled property should reflect Redis state."""
|
||||
from app.cache_service import CacheService
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
|
||||
assert cache.enabled is False
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_delete_is_noop_when_disabled(self):
|
||||
"""delete() should be a no-op when Redis is disabled."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
await cache.startup()
|
||||
|
||||
# Should not raise
|
||||
await cache.delete(CacheCategory.PACKAGE_INDEX, "test-key")
|
||||
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_invalidate_pattern_returns_zero_when_disabled(self):
|
||||
"""invalidate_pattern() should return 0 when Redis is disabled."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
await cache.startup()
|
||||
|
||||
result = await cache.invalidate_pattern(CacheCategory.PACKAGE_INDEX)
|
||||
|
||||
assert result == 0
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_startup_already_started(self):
|
||||
"""startup() should be idempotent."""
|
||||
from app.cache_service import CacheService
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
await cache.startup()
|
||||
await cache.startup() # Should not raise
|
||||
|
||||
assert cache._started is True
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_shutdown_not_started(self):
|
||||
"""shutdown() should handle not-started state."""
|
||||
from app.cache_service import CacheService
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=False)
|
||||
cache = CacheService(settings)
|
||||
|
||||
# Should not raise
|
||||
await cache.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_make_key_with_default_protocol(self):
|
||||
"""_make_key should work with default protocol."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
|
||||
key = CacheService._make_key(CacheCategory.ARTIFACT_METADATA, "default", "abc123")
|
||||
|
||||
assert key == "orchard:artifact:default:abc123"
|
||||
|
||||
|
||||
class TestCacheServiceWithMockedRedis:
|
||||
"""Tests for CacheService with mocked Redis client."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_get_returns_cached_value(self):
|
||||
"""get() should return cached value when available."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
# Mock the redis client
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.get.return_value = b"cached-data"
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key", "pypi")
|
||||
|
||||
assert result == b"cached-data"
|
||||
mock_redis.get.assert_called_once_with("orchard:index:pypi:test-key")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_set_with_ttl(self):
|
||||
"""set() should use setex for mutable categories."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True, cache_ttl_index=300)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"test-value", "pypi")
|
||||
|
||||
mock_redis.setex.assert_called_once_with(
|
||||
"orchard:index:pypi:test-key", 300, b"test-value"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_set_without_ttl(self):
|
||||
"""set() should use set (no expiry) for immutable categories."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
await cache.set(
|
||||
CacheCategory.ARTIFACT_METADATA, "abc123", b"metadata", "pypi"
|
||||
)
|
||||
|
||||
mock_redis.set.assert_called_once_with(
|
||||
"orchard:artifact:pypi:abc123", b"metadata"
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_delete_calls_redis_delete(self):
|
||||
"""delete() should call Redis delete."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
await cache.delete(CacheCategory.PACKAGE_INDEX, "test-key", "pypi")
|
||||
|
||||
mock_redis.delete.assert_called_once_with("orchard:index:pypi:test-key")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_invalidate_pattern_deletes_matching_keys(self):
|
||||
"""invalidate_pattern() should delete all matching keys."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
|
||||
# Create an async generator for scan_iter
|
||||
async def mock_scan_iter(match=None):
|
||||
for key in [b"orchard:index:pypi:numpy", b"orchard:index:pypi:requests"]:
|
||||
yield key
|
||||
|
||||
mock_redis.scan_iter = mock_scan_iter
|
||||
mock_redis.delete.return_value = 2
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
result = await cache.invalidate_pattern(CacheCategory.PACKAGE_INDEX, "*", "pypi")
|
||||
|
||||
assert result == 2
|
||||
mock_redis.delete.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_ping_returns_true_when_connected(self):
|
||||
"""ping() should return True when Redis responds."""
|
||||
from app.cache_service import CacheService
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.ping.return_value = True
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
result = await cache.ping()
|
||||
|
||||
assert result is True
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_get_handles_exception(self):
|
||||
"""get() should return None and log warning on exception."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.get.side_effect = Exception("Connection lost")
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
result = await cache.get(CacheCategory.PACKAGE_INDEX, "test-key")
|
||||
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_set_handles_exception(self):
|
||||
"""set() should log warning on exception."""
|
||||
from app.cache_service import CacheService, CacheCategory
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True, cache_ttl_index=300)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.setex.side_effect = Exception("Connection lost")
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
# Should not raise
|
||||
await cache.set(CacheCategory.PACKAGE_INDEX, "test-key", b"value")
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_ping_returns_false_on_exception(self):
|
||||
"""ping() should return False on exception."""
|
||||
from app.cache_service import CacheService
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(redis_enabled=True)
|
||||
cache = CacheService(settings)
|
||||
|
||||
mock_redis = AsyncMock()
|
||||
mock_redis.ping.side_effect = Exception("Connection lost")
|
||||
cache._redis = mock_redis
|
||||
cache._enabled = True
|
||||
cache._started = True
|
||||
|
||||
result = await cache.ping()
|
||||
|
||||
assert result is False
|
||||
|
||||
167
backend/tests/unit/test_db_utils.py
Normal file
167
backend/tests/unit/test_db_utils.py
Normal file
@@ -0,0 +1,167 @@
|
||||
"""Tests for database utility functions."""
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
class TestArtifactRepository:
|
||||
"""Tests for ArtifactRepository."""
|
||||
|
||||
def test_batch_dependency_values_formatting(self):
|
||||
"""batch_upsert_dependencies should format values correctly."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
deps = [
|
||||
("_pypi", "numpy", ">=1.21.0"),
|
||||
("_pypi", "requests", "*"),
|
||||
("myproject", "mylib", "==1.0.0"),
|
||||
]
|
||||
|
||||
values = ArtifactRepository._format_dependency_values("abc123", deps)
|
||||
|
||||
assert len(values) == 3
|
||||
assert values[0] == {
|
||||
"artifact_id": "abc123",
|
||||
"dependency_project": "_pypi",
|
||||
"dependency_package": "numpy",
|
||||
"version_constraint": ">=1.21.0",
|
||||
}
|
||||
assert values[2]["dependency_project"] == "myproject"
|
||||
|
||||
def test_empty_dependencies_returns_empty_list(self):
|
||||
"""Empty dependency list should return empty values."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
values = ArtifactRepository._format_dependency_values("abc123", [])
|
||||
|
||||
assert values == []
|
||||
|
||||
def test_format_dependency_values_preserves_special_characters(self):
|
||||
"""Version constraints with special characters should be preserved."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
deps = [
|
||||
("_pypi", "package-name", ">=1.0.0,<2.0.0"),
|
||||
("_pypi", "another_pkg", "~=1.4.2"),
|
||||
]
|
||||
|
||||
values = ArtifactRepository._format_dependency_values("hash123", deps)
|
||||
|
||||
assert values[0]["version_constraint"] == ">=1.0.0,<2.0.0"
|
||||
assert values[1]["version_constraint"] == "~=1.4.2"
|
||||
|
||||
def test_batch_upsert_dependencies_returns_zero_for_empty(self):
|
||||
"""batch_upsert_dependencies should return 0 for empty list without DB call."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
mock_db = MagicMock()
|
||||
repo = ArtifactRepository(mock_db)
|
||||
|
||||
result = repo.batch_upsert_dependencies("abc123", [])
|
||||
|
||||
assert result == 0
|
||||
# Verify no DB operations were performed
|
||||
mock_db.execute.assert_not_called()
|
||||
|
||||
def test_get_or_create_artifact_builds_correct_statement(self):
|
||||
"""get_or_create_artifact should use ON CONFLICT DO UPDATE."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
from app.models import Artifact
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_artifact = MagicMock()
|
||||
mock_artifact.ref_count = 1
|
||||
mock_result.scalar_one.return_value = mock_artifact
|
||||
mock_db.execute.return_value = mock_result
|
||||
|
||||
repo = ArtifactRepository(mock_db)
|
||||
artifact, created = repo.get_or_create_artifact(
|
||||
sha256="abc123def456",
|
||||
size=1024,
|
||||
filename="test.whl",
|
||||
content_type="application/zip",
|
||||
)
|
||||
|
||||
assert mock_db.execute.called
|
||||
assert created is True
|
||||
assert artifact == mock_artifact
|
||||
|
||||
def test_get_or_create_artifact_existing_not_created(self):
|
||||
"""get_or_create_artifact should return created=False for existing artifact."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_result = MagicMock()
|
||||
mock_artifact = MagicMock()
|
||||
mock_artifact.ref_count = 5 # Existing artifact with ref_count > 1
|
||||
mock_result.scalar_one.return_value = mock_artifact
|
||||
mock_db.execute.return_value = mock_result
|
||||
|
||||
repo = ArtifactRepository(mock_db)
|
||||
artifact, created = repo.get_or_create_artifact(
|
||||
sha256="abc123def456",
|
||||
size=1024,
|
||||
filename="test.whl",
|
||||
)
|
||||
|
||||
assert created is False
|
||||
|
||||
def test_get_cached_url_with_artifact_returns_tuple(self):
|
||||
"""get_cached_url_with_artifact should return (CachedUrl, Artifact) tuple."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_cached_url = MagicMock()
|
||||
mock_artifact = MagicMock()
|
||||
mock_db.query.return_value.join.return_value.filter.return_value.first.return_value = (
|
||||
mock_cached_url,
|
||||
mock_artifact,
|
||||
)
|
||||
|
||||
repo = ArtifactRepository(mock_db)
|
||||
result = repo.get_cached_url_with_artifact("url_hash_123")
|
||||
|
||||
assert result == (mock_cached_url, mock_artifact)
|
||||
|
||||
def test_get_cached_url_with_artifact_returns_none_when_not_found(self):
|
||||
"""get_cached_url_with_artifact should return None when URL not cached."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_db.query.return_value.join.return_value.filter.return_value.first.return_value = None
|
||||
|
||||
repo = ArtifactRepository(mock_db)
|
||||
result = repo.get_cached_url_with_artifact("nonexistent_hash")
|
||||
|
||||
assert result is None
|
||||
|
||||
def test_get_artifact_dependencies_returns_list(self):
|
||||
"""get_artifact_dependencies should return list of dependencies."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_dep1 = MagicMock()
|
||||
mock_dep2 = MagicMock()
|
||||
mock_db.query.return_value.filter.return_value.all.return_value = [
|
||||
mock_dep1,
|
||||
mock_dep2,
|
||||
]
|
||||
|
||||
repo = ArtifactRepository(mock_db)
|
||||
result = repo.get_artifact_dependencies("artifact_hash_123")
|
||||
|
||||
assert len(result) == 2
|
||||
assert result[0] == mock_dep1
|
||||
assert result[1] == mock_dep2
|
||||
|
||||
def test_get_artifact_dependencies_returns_empty_list(self):
|
||||
"""get_artifact_dependencies should return empty list when no dependencies."""
|
||||
from app.db_utils import ArtifactRepository
|
||||
|
||||
mock_db = MagicMock()
|
||||
mock_db.query.return_value.filter.return_value.all.return_value = []
|
||||
|
||||
repo = ArtifactRepository(mock_db)
|
||||
result = repo.get_artifact_dependencies("artifact_without_deps")
|
||||
|
||||
assert result == []
|
||||
194
backend/tests/unit/test_http_client.py
Normal file
194
backend/tests/unit/test_http_client.py
Normal file
@@ -0,0 +1,194 @@
|
||||
"""Tests for HttpClientManager."""
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, AsyncMock, patch
|
||||
|
||||
|
||||
class TestHttpClientManager:
|
||||
"""Tests for HTTP client pool management."""
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_manager_initializes_with_settings(self):
|
||||
"""Manager should initialize with config settings."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(
|
||||
http_max_connections=50,
|
||||
http_connect_timeout=15.0,
|
||||
)
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
assert manager.max_connections == 50
|
||||
assert manager.connect_timeout == 15.0
|
||||
assert manager._default_client is None # Not started yet
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_startup_creates_client(self):
|
||||
"""Startup should create the default async client."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
await manager.startup()
|
||||
|
||||
assert manager._default_client is not None
|
||||
|
||||
await manager.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_shutdown_closes_client(self):
|
||||
"""Shutdown should close all clients gracefully."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
await manager.startup()
|
||||
client = manager._default_client
|
||||
|
||||
await manager.shutdown()
|
||||
|
||||
assert manager._default_client is None
|
||||
assert client.is_closed
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_get_client_returns_default(self):
|
||||
"""get_client() should return the default client."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
await manager.startup()
|
||||
|
||||
client = manager.get_client()
|
||||
|
||||
assert client is manager._default_client
|
||||
|
||||
await manager.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_get_client_raises_if_not_started(self):
|
||||
"""get_client() should raise RuntimeError if manager not started."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
with pytest.raises(RuntimeError, match="not started"):
|
||||
manager.get_client()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_run_blocking_executes_in_thread_pool(self):
|
||||
"""run_blocking should execute sync functions in thread pool."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
import threading
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
await manager.startup()
|
||||
|
||||
main_thread = threading.current_thread()
|
||||
execution_thread = None
|
||||
|
||||
def blocking_func():
|
||||
nonlocal execution_thread
|
||||
execution_thread = threading.current_thread()
|
||||
return "result"
|
||||
|
||||
result = await manager.run_blocking(blocking_func)
|
||||
|
||||
assert result == "result"
|
||||
assert execution_thread is not main_thread
|
||||
|
||||
await manager.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_run_blocking_raises_if_not_started(self):
|
||||
"""run_blocking should raise RuntimeError if manager not started."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
with pytest.raises(RuntimeError, match="not started"):
|
||||
await manager.run_blocking(lambda: None)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_startup_idempotent(self):
|
||||
"""Calling startup multiple times should be safe."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
await manager.startup()
|
||||
client1 = manager._default_client
|
||||
|
||||
await manager.startup() # Should not create a new client
|
||||
client2 = manager._default_client
|
||||
|
||||
assert client1 is client2 # Same client instance
|
||||
|
||||
await manager.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_shutdown_idempotent(self):
|
||||
"""Calling shutdown multiple times should be safe."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
await manager.startup()
|
||||
await manager.shutdown()
|
||||
await manager.shutdown() # Should not raise
|
||||
|
||||
assert manager._default_client is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_properties_return_configured_values(self):
|
||||
"""Properties should return configured values."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings(
|
||||
http_max_connections=75,
|
||||
http_worker_threads=16,
|
||||
)
|
||||
manager = HttpClientManager(settings)
|
||||
await manager.startup()
|
||||
|
||||
assert manager.pool_size == 75
|
||||
assert manager.executor_max == 16
|
||||
|
||||
await manager.shutdown()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.unit
|
||||
async def test_active_connections_when_not_started(self):
|
||||
"""active_connections should return 0 when not started."""
|
||||
from app.http_client import HttpClientManager
|
||||
from app.config import Settings
|
||||
|
||||
settings = Settings()
|
||||
manager = HttpClientManager(settings)
|
||||
|
||||
assert manager.active_connections == 0
|
||||
243
backend/tests/unit/test_metadata.py
Normal file
243
backend/tests/unit/test_metadata.py
Normal file
@@ -0,0 +1,243 @@
|
||||
"""Unit tests for metadata extraction functionality."""
|
||||
|
||||
import io
|
||||
import gzip
|
||||
import tarfile
|
||||
import zipfile
|
||||
import pytest
|
||||
from app.metadata import (
|
||||
extract_metadata,
|
||||
extract_deb_metadata,
|
||||
extract_wheel_metadata,
|
||||
extract_tarball_metadata,
|
||||
extract_jar_metadata,
|
||||
parse_deb_control,
|
||||
)
|
||||
|
||||
|
||||
class TestDebMetadata:
|
||||
"""Tests for Debian package metadata extraction."""
|
||||
|
||||
def test_parse_deb_control_basic(self):
|
||||
"""Test parsing a basic control file."""
|
||||
control = """Package: my-package
|
||||
Version: 1.2.3
|
||||
Architecture: amd64
|
||||
Maintainer: Test <test@example.com>
|
||||
Description: A test package
|
||||
"""
|
||||
result = parse_deb_control(control)
|
||||
assert result["package_name"] == "my-package"
|
||||
assert result["version"] == "1.2.3"
|
||||
assert result["architecture"] == "amd64"
|
||||
assert result["format"] == "deb"
|
||||
|
||||
def test_parse_deb_control_with_epoch(self):
|
||||
"""Test parsing version with epoch."""
|
||||
control = """Package: another-pkg
|
||||
Version: 2:1.0.0-1
|
||||
"""
|
||||
result = parse_deb_control(control)
|
||||
assert result["version"] == "2:1.0.0-1"
|
||||
assert result["package_name"] == "another-pkg"
|
||||
assert result["format"] == "deb"
|
||||
|
||||
def test_extract_deb_metadata_invalid_magic(self):
|
||||
"""Test that invalid ar magic returns empty dict."""
|
||||
file = io.BytesIO(b"not an ar archive")
|
||||
result = extract_deb_metadata(file)
|
||||
assert result == {}
|
||||
|
||||
def test_extract_deb_metadata_valid_ar_no_control(self):
|
||||
"""Test ar archive without control.tar returns empty."""
|
||||
# Create minimal ar archive with just debian-binary
|
||||
ar_data = b"!<arch>\n"
|
||||
ar_data += b"debian-binary/ 0 0 0 100644 4 `\n"
|
||||
ar_data += b"2.0\n"
|
||||
|
||||
file = io.BytesIO(ar_data)
|
||||
result = extract_deb_metadata(file)
|
||||
# Should return empty since no control.tar found
|
||||
assert result == {} or "version" not in result
|
||||
|
||||
|
||||
class TestWheelMetadata:
|
||||
"""Tests for Python wheel metadata extraction."""
|
||||
|
||||
def _create_wheel_with_metadata(self, metadata_content: str) -> io.BytesIO:
|
||||
"""Helper to create a wheel file with given METADATA content."""
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, 'w') as zf:
|
||||
zf.writestr('package-1.0.0.dist-info/METADATA', metadata_content)
|
||||
buf.seek(0)
|
||||
return buf
|
||||
|
||||
def test_extract_wheel_version(self):
|
||||
"""Test extracting version from wheel METADATA."""
|
||||
metadata = """Metadata-Version: 2.1
|
||||
Name: my-package
|
||||
Version: 2.3.4
|
||||
Summary: A test package
|
||||
"""
|
||||
file = self._create_wheel_with_metadata(metadata)
|
||||
result = extract_wheel_metadata(file)
|
||||
assert result.get("version") == "2.3.4"
|
||||
assert result.get("package_name") == "my-package"
|
||||
assert result.get("format") == "wheel"
|
||||
|
||||
def test_extract_wheel_no_version(self):
|
||||
"""Test wheel without version field."""
|
||||
metadata = """Metadata-Version: 2.1
|
||||
Name: no-version-pkg
|
||||
"""
|
||||
file = self._create_wheel_with_metadata(metadata)
|
||||
result = extract_wheel_metadata(file)
|
||||
assert "version" not in result
|
||||
assert result.get("package_name") == "no-version-pkg"
|
||||
assert result.get("format") == "wheel"
|
||||
|
||||
def test_extract_wheel_invalid_zip(self):
|
||||
"""Test that invalid zip returns format-only dict."""
|
||||
file = io.BytesIO(b"not a zip file")
|
||||
result = extract_wheel_metadata(file)
|
||||
assert result == {"format": "wheel"}
|
||||
|
||||
def test_extract_wheel_no_metadata_file(self):
|
||||
"""Test wheel without METADATA file returns format-only dict."""
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, 'w') as zf:
|
||||
zf.writestr('some_file.py', 'print("hello")')
|
||||
buf.seek(0)
|
||||
result = extract_wheel_metadata(buf)
|
||||
assert result == {"format": "wheel"}
|
||||
|
||||
|
||||
class TestTarballMetadata:
|
||||
"""Tests for tarball metadata extraction from filename."""
|
||||
|
||||
def test_extract_version_from_filename_standard(self):
|
||||
"""Test standard package-version.tar.gz format."""
|
||||
file = io.BytesIO(b"") # Content doesn't matter for filename extraction
|
||||
result = extract_tarball_metadata(file, "mypackage-1.2.3.tar.gz")
|
||||
assert result.get("version") == "1.2.3"
|
||||
assert result.get("package_name") == "mypackage"
|
||||
assert result.get("format") == "tarball"
|
||||
|
||||
def test_extract_version_with_v_prefix(self):
|
||||
"""Test version with v prefix."""
|
||||
file = io.BytesIO(b"")
|
||||
result = extract_tarball_metadata(file, "package-v2.0.0.tar.gz")
|
||||
assert result.get("version") == "2.0.0"
|
||||
assert result.get("package_name") == "package"
|
||||
assert result.get("format") == "tarball"
|
||||
|
||||
def test_extract_version_underscore_separator(self):
|
||||
"""Test package_version format."""
|
||||
file = io.BytesIO(b"")
|
||||
result = extract_tarball_metadata(file, "my_package_3.1.4.tar.gz")
|
||||
assert result.get("version") == "3.1.4"
|
||||
assert result.get("package_name") == "my_package"
|
||||
assert result.get("format") == "tarball"
|
||||
|
||||
def test_extract_version_complex(self):
|
||||
"""Test complex version string."""
|
||||
file = io.BytesIO(b"")
|
||||
result = extract_tarball_metadata(file, "package-1.0.0-beta.1.tar.gz")
|
||||
# The regex handles versions with suffix like -beta_1
|
||||
assert result.get("format") == "tarball"
|
||||
# May or may not extract version depending on regex match
|
||||
if "version" in result:
|
||||
assert result.get("package_name") == "package"
|
||||
|
||||
def test_extract_no_version_in_filename(self):
|
||||
"""Test filename without version returns format-only dict."""
|
||||
file = io.BytesIO(b"")
|
||||
result = extract_tarball_metadata(file, "package.tar.gz")
|
||||
# Should return format but no version
|
||||
assert result.get("version") is None
|
||||
assert result.get("format") == "tarball"
|
||||
|
||||
|
||||
class TestJarMetadata:
|
||||
"""Tests for JAR/Java metadata extraction."""
|
||||
|
||||
def _create_jar_with_manifest(self, manifest_content: str) -> io.BytesIO:
|
||||
"""Helper to create a JAR file with given MANIFEST.MF content."""
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, 'w') as zf:
|
||||
zf.writestr('META-INF/MANIFEST.MF', manifest_content)
|
||||
buf.seek(0)
|
||||
return buf
|
||||
|
||||
def test_extract_jar_version_from_manifest(self):
|
||||
"""Test extracting version from MANIFEST.MF."""
|
||||
manifest = """Manifest-Version: 1.0
|
||||
Implementation-Title: my-library
|
||||
Implementation-Version: 4.5.6
|
||||
"""
|
||||
file = self._create_jar_with_manifest(manifest)
|
||||
result = extract_jar_metadata(file)
|
||||
assert result.get("version") == "4.5.6"
|
||||
assert result.get("package_name") == "my-library"
|
||||
assert result.get("format") == "jar"
|
||||
|
||||
def test_extract_jar_bundle_version(self):
|
||||
"""Test extracting OSGi Bundle-Version."""
|
||||
manifest = """Manifest-Version: 1.0
|
||||
Bundle-Version: 2.1.0
|
||||
Bundle-Name: Test Bundle
|
||||
"""
|
||||
file = self._create_jar_with_manifest(manifest)
|
||||
result = extract_jar_metadata(file)
|
||||
# Bundle-Version is stored in bundle_version, not version
|
||||
assert result.get("bundle_version") == "2.1.0"
|
||||
assert result.get("bundle_name") == "Test Bundle"
|
||||
assert result.get("format") == "jar"
|
||||
|
||||
def test_extract_jar_invalid_zip(self):
|
||||
"""Test that invalid JAR returns format-only dict."""
|
||||
file = io.BytesIO(b"not a jar file")
|
||||
result = extract_jar_metadata(file)
|
||||
assert result == {"format": "jar"}
|
||||
|
||||
|
||||
class TestExtractMetadataDispatch:
|
||||
"""Tests for the main extract_metadata dispatcher function."""
|
||||
|
||||
def test_dispatch_to_wheel(self):
|
||||
"""Test that .whl files use wheel extractor."""
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, 'w') as zf:
|
||||
zf.writestr('pkg-1.0.dist-info/METADATA', 'Version: 1.0.0\nName: pkg')
|
||||
buf.seek(0)
|
||||
|
||||
result = extract_metadata(buf, "package-1.0.0-py3-none-any.whl")
|
||||
assert result.get("version") == "1.0.0"
|
||||
assert result.get("package_name") == "pkg"
|
||||
assert result.get("format") == "wheel"
|
||||
|
||||
def test_dispatch_to_tarball(self):
|
||||
"""Test that .tar.gz files use tarball extractor."""
|
||||
file = io.BytesIO(b"")
|
||||
result = extract_metadata(file, "mypackage-2.3.4.tar.gz")
|
||||
assert result.get("version") == "2.3.4"
|
||||
assert result.get("package_name") == "mypackage"
|
||||
assert result.get("format") == "tarball"
|
||||
|
||||
def test_dispatch_unknown_extension(self):
|
||||
"""Test that unknown extensions return empty dict."""
|
||||
file = io.BytesIO(b"some content")
|
||||
result = extract_metadata(file, "unknown.xyz")
|
||||
assert result == {}
|
||||
|
||||
def test_file_position_reset_after_extraction(self):
|
||||
"""Test that file position is reset to start after extraction."""
|
||||
buf = io.BytesIO()
|
||||
with zipfile.ZipFile(buf, 'w') as zf:
|
||||
zf.writestr('pkg-1.0.dist-info/METADATA', 'Version: 1.0.0\nName: pkg')
|
||||
buf.seek(0)
|
||||
|
||||
extract_metadata(buf, "package.whl")
|
||||
|
||||
# File should be back at position 0
|
||||
assert buf.tell() == 0
|
||||
@@ -145,54 +145,6 @@ class TestPackageModel:
|
||||
assert platform_col.default.arg == "any"
|
||||
|
||||
|
||||
class TestTagModel:
|
||||
"""Tests for the Tag model."""
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_tag_requires_package_id(self):
|
||||
"""Test tag requires package_id."""
|
||||
from app.models import Tag
|
||||
|
||||
tag = Tag(
|
||||
name="v1.0.0",
|
||||
package_id=uuid.uuid4(),
|
||||
artifact_id="f" * 64,
|
||||
created_by="test-user",
|
||||
)
|
||||
|
||||
assert tag.package_id is not None
|
||||
assert tag.artifact_id == "f" * 64
|
||||
|
||||
|
||||
class TestTagHistoryModel:
|
||||
"""Tests for the TagHistory model."""
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_tag_history_default_change_type(self):
|
||||
"""Test tag history change_type column has default value of 'update'."""
|
||||
from app.models import TagHistory
|
||||
|
||||
# Check the column definition has the right default
|
||||
change_type_col = TagHistory.__table__.columns["change_type"]
|
||||
assert change_type_col.default is not None
|
||||
assert change_type_col.default.arg == "update"
|
||||
|
||||
@pytest.mark.unit
|
||||
def test_tag_history_allows_null_old_artifact(self):
|
||||
"""Test tag history allows null old_artifact_id (for create events)."""
|
||||
from app.models import TagHistory
|
||||
|
||||
history = TagHistory(
|
||||
tag_id=uuid.uuid4(),
|
||||
old_artifact_id=None,
|
||||
new_artifact_id="h" * 64,
|
||||
change_type="create",
|
||||
changed_by="test-user",
|
||||
)
|
||||
|
||||
assert history.old_artifact_id is None
|
||||
|
||||
|
||||
class TestUploadModel:
|
||||
"""Tests for the Upload model."""
|
||||
|
||||
|
||||
85
backend/tests/unit/test_pypi_proxy.py
Normal file
85
backend/tests/unit/test_pypi_proxy.py
Normal file
@@ -0,0 +1,85 @@
|
||||
"""Unit tests for PyPI proxy functionality."""
|
||||
|
||||
import pytest
|
||||
from app.pypi_proxy import _parse_requires_dist
|
||||
|
||||
|
||||
class TestParseRequiresDist:
|
||||
"""Tests for _parse_requires_dist function."""
|
||||
|
||||
def test_simple_package(self):
|
||||
"""Test parsing a simple package name."""
|
||||
name, version = _parse_requires_dist("numpy")
|
||||
assert name == "numpy"
|
||||
assert version is None
|
||||
|
||||
def test_package_with_version(self):
|
||||
"""Test parsing package with version constraint."""
|
||||
name, version = _parse_requires_dist("numpy>=1.21.0")
|
||||
assert name == "numpy"
|
||||
assert version == ">=1.21.0"
|
||||
|
||||
def test_package_with_parenthesized_version(self):
|
||||
"""Test parsing package with parenthesized version."""
|
||||
name, version = _parse_requires_dist("requests (>=2.25.0)")
|
||||
assert name == "requests"
|
||||
assert version == ">=2.25.0"
|
||||
|
||||
def test_package_with_python_version_marker(self):
|
||||
"""Test that python_version markers are preserved but marker stripped."""
|
||||
name, version = _parse_requires_dist("typing-extensions; python_version < '3.8'")
|
||||
assert name == "typing-extensions"
|
||||
assert version is None
|
||||
|
||||
def test_filters_extra_dependencies(self):
|
||||
"""Test that extra dependencies are filtered out."""
|
||||
# Extra dependencies should return (None, None)
|
||||
name, version = _parse_requires_dist("pytest; extra == 'test'")
|
||||
assert name is None
|
||||
assert version is None
|
||||
|
||||
name, version = _parse_requires_dist("sphinx; extra == 'docs'")
|
||||
assert name is None
|
||||
assert version is None
|
||||
|
||||
def test_filters_platform_specific_darwin(self):
|
||||
"""Test that macOS-specific dependencies are filtered out."""
|
||||
name, version = _parse_requires_dist("pyobjc; sys_platform == 'darwin'")
|
||||
assert name is None
|
||||
assert version is None
|
||||
|
||||
def test_filters_platform_specific_win32(self):
|
||||
"""Test that Windows-specific dependencies are filtered out."""
|
||||
name, version = _parse_requires_dist("pywin32; sys_platform == 'win32'")
|
||||
assert name is None
|
||||
assert version is None
|
||||
|
||||
def test_filters_platform_system_marker(self):
|
||||
"""Test that platform_system markers are filtered out."""
|
||||
name, version = _parse_requires_dist("jaraco-windows; platform_system == 'Windows'")
|
||||
assert name is None
|
||||
assert version is None
|
||||
|
||||
def test_normalizes_package_name(self):
|
||||
"""Test that package names are normalized (PEP 503)."""
|
||||
name, version = _parse_requires_dist("Typing_Extensions>=3.7.4")
|
||||
assert name == "typing-extensions"
|
||||
assert version == ">=3.7.4"
|
||||
|
||||
def test_complex_version_constraint(self):
|
||||
"""Test parsing complex version constraints."""
|
||||
name, version = _parse_requires_dist("gast!=0.5.0,!=0.5.1,!=0.5.2,>=0.2.1")
|
||||
assert name == "gast"
|
||||
assert version == "!=0.5.0,!=0.5.1,!=0.5.2,>=0.2.1"
|
||||
|
||||
def test_version_range(self):
|
||||
"""Test parsing version range constraints."""
|
||||
name, version = _parse_requires_dist("grpcio<2.0,>=1.24.3")
|
||||
assert name == "grpcio"
|
||||
assert version == "<2.0,>=1.24.3"
|
||||
|
||||
def test_tilde_version(self):
|
||||
"""Test parsing tilde version constraints."""
|
||||
name, version = _parse_requires_dist("tensorboard~=2.20.0")
|
||||
assert name == "tensorboard"
|
||||
assert version == "~=2.20.0"
|
||||
65
backend/tests/unit/test_rate_limit.py
Normal file
65
backend/tests/unit/test_rate_limit.py
Normal file
@@ -0,0 +1,65 @@
|
||||
"""Unit tests for rate limiting configuration."""
|
||||
|
||||
import os
|
||||
import pytest
|
||||
|
||||
|
||||
class TestRateLimitConfiguration:
|
||||
"""Tests for rate limit configuration."""
|
||||
|
||||
def test_default_login_rate_limit(self):
|
||||
"""Test default login rate limit is 5/minute."""
|
||||
# Import fresh to get default value
|
||||
import importlib
|
||||
import app.rate_limit as rate_limit_module
|
||||
|
||||
# Save original env value
|
||||
original = os.environ.get("ORCHARD_LOGIN_RATE_LIMIT")
|
||||
|
||||
try:
|
||||
# Clear env variable to test default
|
||||
if "ORCHARD_LOGIN_RATE_LIMIT" in os.environ:
|
||||
del os.environ["ORCHARD_LOGIN_RATE_LIMIT"]
|
||||
|
||||
# Reload module to pick up new env
|
||||
importlib.reload(rate_limit_module)
|
||||
|
||||
assert rate_limit_module.LOGIN_RATE_LIMIT == "5/minute"
|
||||
finally:
|
||||
# Restore original env value
|
||||
if original is not None:
|
||||
os.environ["ORCHARD_LOGIN_RATE_LIMIT"] = original
|
||||
importlib.reload(rate_limit_module)
|
||||
|
||||
def test_custom_login_rate_limit(self):
|
||||
"""Test custom login rate limit from environment."""
|
||||
import importlib
|
||||
import app.rate_limit as rate_limit_module
|
||||
|
||||
# Save original env value
|
||||
original = os.environ.get("ORCHARD_LOGIN_RATE_LIMIT")
|
||||
|
||||
try:
|
||||
# Set custom rate limit
|
||||
os.environ["ORCHARD_LOGIN_RATE_LIMIT"] = "10/minute"
|
||||
|
||||
# Reload module to pick up new env
|
||||
importlib.reload(rate_limit_module)
|
||||
|
||||
assert rate_limit_module.LOGIN_RATE_LIMIT == "10/minute"
|
||||
finally:
|
||||
# Restore original env value
|
||||
if original is not None:
|
||||
os.environ["ORCHARD_LOGIN_RATE_LIMIT"] = original
|
||||
else:
|
||||
if "ORCHARD_LOGIN_RATE_LIMIT" in os.environ:
|
||||
del os.environ["ORCHARD_LOGIN_RATE_LIMIT"]
|
||||
importlib.reload(rate_limit_module)
|
||||
|
||||
def test_limiter_exists(self):
|
||||
"""Test that limiter object is created."""
|
||||
from app.rate_limit import limiter
|
||||
|
||||
assert limiter is not None
|
||||
# Limiter should have a key_func set
|
||||
assert limiter._key_func is not None
|
||||
300
backend/tests/unit/test_registry_client.py
Normal file
300
backend/tests/unit/test_registry_client.py
Normal file
@@ -0,0 +1,300 @@
|
||||
"""Unit tests for registry client functionality."""
|
||||
|
||||
import pytest
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
import httpx
|
||||
from packaging.specifiers import SpecifierSet
|
||||
|
||||
from app.registry_client import (
|
||||
PyPIRegistryClient,
|
||||
VersionInfo,
|
||||
FetchResult,
|
||||
get_registry_client,
|
||||
)
|
||||
|
||||
|
||||
class TestPyPIRegistryClient:
|
||||
"""Tests for PyPI registry client."""
|
||||
|
||||
@pytest.fixture
|
||||
def mock_http_client(self):
|
||||
"""Create a mock async HTTP client."""
|
||||
return AsyncMock(spec=httpx.AsyncClient)
|
||||
|
||||
@pytest.fixture
|
||||
def client(self, mock_http_client):
|
||||
"""Create a PyPI registry client with mocked HTTP."""
|
||||
return PyPIRegistryClient(
|
||||
http_client=mock_http_client,
|
||||
upstream_sources=[],
|
||||
pypi_api_url="https://pypi.org/pypi",
|
||||
)
|
||||
|
||||
def test_source_type(self, client):
|
||||
"""Test source_type returns 'pypi'."""
|
||||
assert client.source_type == "pypi"
|
||||
|
||||
def test_normalize_package_name(self, client):
|
||||
"""Test package name normalization per PEP 503."""
|
||||
assert client._normalize_package_name("My_Package") == "my-package"
|
||||
assert client._normalize_package_name("my.package") == "my-package"
|
||||
assert client._normalize_package_name("my-package") == "my-package"
|
||||
assert client._normalize_package_name("MY-PACKAGE") == "my-package"
|
||||
assert client._normalize_package_name("my__package") == "my-package"
|
||||
assert client._normalize_package_name("my..package") == "my-package"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_available_versions_success(self, client, mock_http_client):
|
||||
"""Test fetching available versions from PyPI."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {
|
||||
"releases": {
|
||||
"1.0.0": [{"packagetype": "bdist_wheel"}],
|
||||
"1.1.0": [{"packagetype": "bdist_wheel"}],
|
||||
"2.0.0": [{"packagetype": "bdist_wheel"}],
|
||||
}
|
||||
}
|
||||
mock_http_client.get.return_value = mock_response
|
||||
|
||||
versions = await client.get_available_versions("test-package")
|
||||
|
||||
assert "1.0.0" in versions
|
||||
assert "1.1.0" in versions
|
||||
assert "2.0.0" in versions
|
||||
mock_http_client.get.assert_called_once()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_available_versions_empty(self, client, mock_http_client):
|
||||
"""Test handling package with no releases."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {"releases": {}}
|
||||
mock_http_client.get.return_value = mock_response
|
||||
|
||||
versions = await client.get_available_versions("empty-package")
|
||||
|
||||
assert versions == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_available_versions_404(self, client, mock_http_client):
|
||||
"""Test handling non-existent package."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 404
|
||||
mock_http_client.get.return_value = mock_response
|
||||
|
||||
versions = await client.get_available_versions("nonexistent")
|
||||
|
||||
assert versions == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resolve_constraint_wildcard(self, client, mock_http_client):
|
||||
"""Test resolving wildcard constraint returns latest."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {
|
||||
"info": {"version": "2.0.0"},
|
||||
"releases": {
|
||||
"1.0.0": [
|
||||
{
|
||||
"packagetype": "bdist_wheel",
|
||||
"url": "https://files.pythonhosted.org/test-1.0.0.whl",
|
||||
"filename": "test-1.0.0.whl",
|
||||
"digests": {"sha256": "abc123"},
|
||||
"size": 1000,
|
||||
}
|
||||
],
|
||||
"2.0.0": [
|
||||
{
|
||||
"packagetype": "bdist_wheel",
|
||||
"url": "https://files.pythonhosted.org/test-2.0.0.whl",
|
||||
"filename": "test-2.0.0.whl",
|
||||
"digests": {"sha256": "def456"},
|
||||
"size": 2000,
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
mock_http_client.get.return_value = mock_response
|
||||
|
||||
result = await client.resolve_constraint("test-package", "*")
|
||||
|
||||
assert result is not None
|
||||
assert result.version == "2.0.0"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resolve_constraint_specific_version(self, client, mock_http_client):
|
||||
"""Test resolving specific version constraint."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {
|
||||
"releases": {
|
||||
"1.0.0": [
|
||||
{
|
||||
"packagetype": "bdist_wheel",
|
||||
"url": "https://files.pythonhosted.org/test-1.0.0.whl",
|
||||
"filename": "test-1.0.0.whl",
|
||||
"digests": {"sha256": "abc123"},
|
||||
"size": 1000,
|
||||
}
|
||||
],
|
||||
"2.0.0": [
|
||||
{
|
||||
"packagetype": "bdist_wheel",
|
||||
"url": "https://files.pythonhosted.org/test-2.0.0.whl",
|
||||
"filename": "test-2.0.0.whl",
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
mock_http_client.get.return_value = mock_response
|
||||
|
||||
result = await client.resolve_constraint("test-package", ">=1.0.0,<2.0.0")
|
||||
|
||||
assert result is not None
|
||||
assert result.version == "1.0.0"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resolve_constraint_no_match(self, client, mock_http_client):
|
||||
"""Test resolving constraint with no matching version."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {
|
||||
"releases": {
|
||||
"1.0.0": [
|
||||
{
|
||||
"packagetype": "bdist_wheel",
|
||||
"url": "https://files.pythonhosted.org/test-1.0.0.whl",
|
||||
"filename": "test-1.0.0.whl",
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
mock_http_client.get.return_value = mock_response
|
||||
|
||||
result = await client.resolve_constraint("test-package", ">=5.0.0")
|
||||
|
||||
assert result is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_resolve_constraint_bare_version(self, client, mock_http_client):
|
||||
"""Test resolving bare version string as exact match."""
|
||||
mock_response = MagicMock()
|
||||
mock_response.status_code = 200
|
||||
mock_response.json.return_value = {
|
||||
"info": {"version": "2.0.0"},
|
||||
"releases": {
|
||||
"1.0.0": [
|
||||
{
|
||||
"packagetype": "bdist_wheel",
|
||||
"url": "https://files.pythonhosted.org/test-1.0.0.whl",
|
||||
"filename": "test-1.0.0.whl",
|
||||
"digests": {"sha256": "abc123"},
|
||||
"size": 1000,
|
||||
}
|
||||
],
|
||||
"2.0.0": [
|
||||
{
|
||||
"packagetype": "bdist_wheel",
|
||||
"url": "https://files.pythonhosted.org/test-2.0.0.whl",
|
||||
"filename": "test-2.0.0.whl",
|
||||
"digests": {"sha256": "def456"},
|
||||
"size": 2000,
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
mock_http_client.get.return_value = mock_response
|
||||
|
||||
# Bare version "1.0.0" should resolve to exactly 1.0.0, not latest
|
||||
result = await client.resolve_constraint("test-package", "1.0.0")
|
||||
|
||||
assert result is not None
|
||||
assert result.version == "1.0.0"
|
||||
|
||||
|
||||
class TestVersionInfo:
|
||||
"""Tests for VersionInfo dataclass."""
|
||||
|
||||
def test_create_version_info(self):
|
||||
"""Test creating VersionInfo with all fields."""
|
||||
info = VersionInfo(
|
||||
version="1.0.0",
|
||||
download_url="https://example.com/pkg-1.0.0.whl",
|
||||
filename="pkg-1.0.0.whl",
|
||||
sha256="abc123",
|
||||
size=5000,
|
||||
content_type="application/zip",
|
||||
)
|
||||
assert info.version == "1.0.0"
|
||||
assert info.download_url == "https://example.com/pkg-1.0.0.whl"
|
||||
assert info.filename == "pkg-1.0.0.whl"
|
||||
assert info.sha256 == "abc123"
|
||||
assert info.size == 5000
|
||||
|
||||
def test_create_version_info_minimal(self):
|
||||
"""Test creating VersionInfo with only required fields."""
|
||||
info = VersionInfo(
|
||||
version="1.0.0",
|
||||
download_url="https://example.com/pkg.whl",
|
||||
filename="pkg.whl",
|
||||
)
|
||||
assert info.sha256 is None
|
||||
assert info.size is None
|
||||
|
||||
|
||||
class TestFetchResult:
|
||||
"""Tests for FetchResult dataclass."""
|
||||
|
||||
def test_create_fetch_result(self):
|
||||
"""Test creating FetchResult."""
|
||||
result = FetchResult(
|
||||
artifact_id="abc123def456",
|
||||
size=10000,
|
||||
version="2.0.0",
|
||||
filename="pkg-2.0.0.whl",
|
||||
already_cached=True,
|
||||
)
|
||||
assert result.artifact_id == "abc123def456"
|
||||
assert result.size == 10000
|
||||
assert result.version == "2.0.0"
|
||||
assert result.already_cached is True
|
||||
|
||||
def test_fetch_result_default_not_cached(self):
|
||||
"""Test FetchResult defaults to not cached."""
|
||||
result = FetchResult(
|
||||
artifact_id="xyz",
|
||||
size=100,
|
||||
version="1.0.0",
|
||||
filename="pkg.whl",
|
||||
)
|
||||
assert result.already_cached is False
|
||||
|
||||
|
||||
class TestGetRegistryClient:
|
||||
"""Tests for registry client factory function."""
|
||||
|
||||
def test_get_pypi_client(self):
|
||||
"""Test getting PyPI client."""
|
||||
mock_client = MagicMock()
|
||||
mock_sources = []
|
||||
|
||||
client = get_registry_client("pypi", mock_client, mock_sources)
|
||||
|
||||
assert isinstance(client, PyPIRegistryClient)
|
||||
|
||||
def test_get_unsupported_client(self):
|
||||
"""Test getting unsupported registry type returns None."""
|
||||
mock_client = MagicMock()
|
||||
|
||||
client = get_registry_client("npm", mock_client, [])
|
||||
|
||||
assert client is None
|
||||
|
||||
def test_get_unknown_client(self):
|
||||
"""Test getting unknown registry type returns None."""
|
||||
mock_client = MagicMock()
|
||||
|
||||
client = get_registry_client("unknown", mock_client, [])
|
||||
|
||||
assert client is None
|
||||
Reference in New Issue
Block a user