- Fix upload response to return actual version (not requested version) when artifact already has a version in the package - Update ref_count tests to use multiple packages (one version per artifact per package design constraint) - Remove allow_public_internet references from upstream caching tests - Update consistency check test to not assert global system health - Add versions field to artifact schemas - Fix dependencies resolution to handle removed tag constraint
1906 lines
67 KiB
Python
1906 lines
67 KiB
Python
"""
|
|
Tests for upstream artifact caching schema.
|
|
|
|
Tests models, schemas, and encryption for the upstream caching feature.
|
|
"""
|
|
|
|
import os
|
|
import pytest
|
|
from unittest.mock import patch
|
|
from pydantic import ValidationError
|
|
|
|
|
|
class TestEncryptionModule:
|
|
"""Tests for the encryption module."""
|
|
|
|
def test_encrypt_decrypt_roundtrip(self):
|
|
"""Test that encryption and decryption work correctly."""
|
|
from app.encryption import encrypt_value, decrypt_value
|
|
|
|
plaintext = "my-secret-password"
|
|
encrypted = encrypt_value(plaintext)
|
|
|
|
assert isinstance(encrypted, bytes)
|
|
assert encrypted != plaintext.encode()
|
|
|
|
decrypted = decrypt_value(encrypted)
|
|
assert decrypted == plaintext
|
|
|
|
def test_encrypt_different_each_time(self):
|
|
"""Test that encrypting the same value produces different ciphertext."""
|
|
from app.encryption import encrypt_value
|
|
|
|
plaintext = "test-password"
|
|
encrypted1 = encrypt_value(plaintext)
|
|
encrypted2 = encrypt_value(plaintext)
|
|
|
|
# Fernet includes timestamp, so each encryption is unique
|
|
assert encrypted1 != encrypted2
|
|
|
|
def test_encrypt_empty_value_raises(self):
|
|
"""Test that encrypting empty value raises ValueError."""
|
|
from app.encryption import encrypt_value
|
|
|
|
with pytest.raises(ValueError, match="Cannot encrypt empty"):
|
|
encrypt_value("")
|
|
|
|
def test_decrypt_empty_value_raises(self):
|
|
"""Test that decrypting empty value raises ValueError."""
|
|
from app.encryption import decrypt_value
|
|
|
|
with pytest.raises(ValueError, match="Cannot decrypt empty"):
|
|
decrypt_value(b"")
|
|
|
|
def test_can_decrypt_valid(self):
|
|
"""Test can_decrypt returns True for valid encrypted data."""
|
|
from app.encryption import encrypt_value, can_decrypt
|
|
|
|
encrypted = encrypt_value("test-password")
|
|
assert can_decrypt(encrypted) is True
|
|
|
|
def test_can_decrypt_invalid(self):
|
|
"""Test can_decrypt returns False for invalid data."""
|
|
from app.encryption import can_decrypt
|
|
|
|
assert can_decrypt(b"invalid-data") is False
|
|
assert can_decrypt(b"") is False
|
|
|
|
def test_generate_key_format(self):
|
|
"""Test that generated keys are valid Fernet keys."""
|
|
from app.encryption import generate_key
|
|
from cryptography.fernet import Fernet
|
|
|
|
key = generate_key()
|
|
assert isinstance(key, str)
|
|
|
|
# Should be valid for creating a Fernet instance
|
|
fernet = Fernet(key.encode())
|
|
assert fernet is not None
|
|
|
|
|
|
class TestUpstreamSourceModel:
|
|
"""Tests for UpstreamSource SQLAlchemy model."""
|
|
|
|
def test_model_fields_exist(self):
|
|
"""Test that model has all expected fields."""
|
|
from app.models import UpstreamSource
|
|
|
|
source = UpstreamSource()
|
|
assert hasattr(source, 'id')
|
|
assert hasattr(source, 'name')
|
|
assert hasattr(source, 'source_type')
|
|
assert hasattr(source, 'url')
|
|
assert hasattr(source, 'enabled')
|
|
assert hasattr(source, 'auth_type')
|
|
assert hasattr(source, 'username')
|
|
assert hasattr(source, 'password_encrypted')
|
|
assert hasattr(source, 'headers_encrypted')
|
|
assert hasattr(source, 'priority')
|
|
|
|
def test_model_with_values(self):
|
|
"""Test that model can be created with explicit values."""
|
|
from app.models import UpstreamSource
|
|
|
|
source = UpstreamSource(
|
|
name="npm-private",
|
|
source_type="npm",
|
|
url="https://npm.example.com",
|
|
enabled=True,
|
|
auth_type="basic",
|
|
username="admin",
|
|
priority=50,
|
|
)
|
|
assert source.name == "npm-private"
|
|
assert source.source_type == "npm"
|
|
assert source.url == "https://npm.example.com"
|
|
assert source.enabled is True
|
|
assert source.auth_type == "basic"
|
|
assert source.username == "admin"
|
|
assert source.priority == 50
|
|
|
|
def test_set_password_encrypts(self):
|
|
"""Test that set_password encrypts the value."""
|
|
from app.models import UpstreamSource
|
|
|
|
source = UpstreamSource()
|
|
source.set_password("my-api-key")
|
|
|
|
assert source.password_encrypted is not None
|
|
assert isinstance(source.password_encrypted, bytes)
|
|
assert b"my-api-key" not in source.password_encrypted
|
|
|
|
def test_get_password_decrypts(self):
|
|
"""Test that get_password decrypts the value."""
|
|
from app.models import UpstreamSource
|
|
|
|
source = UpstreamSource()
|
|
source.set_password("my-api-key")
|
|
|
|
decrypted = source.get_password()
|
|
assert decrypted == "my-api-key"
|
|
|
|
def test_set_password_none_clears(self):
|
|
"""Test that set_password with empty string clears the password."""
|
|
from app.models import UpstreamSource
|
|
|
|
source = UpstreamSource()
|
|
source.set_password("my-api-key")
|
|
assert source.password_encrypted is not None
|
|
|
|
source.set_password("")
|
|
assert source.password_encrypted is None
|
|
|
|
def test_has_password(self):
|
|
"""Test has_password helper method."""
|
|
from app.models import UpstreamSource
|
|
|
|
source = UpstreamSource()
|
|
assert source.has_password() is False
|
|
|
|
source.set_password("secret")
|
|
assert source.has_password() is True
|
|
|
|
def test_set_headers_encrypts(self):
|
|
"""Test that set_headers encrypts custom headers."""
|
|
from app.models import UpstreamSource
|
|
|
|
source = UpstreamSource()
|
|
headers = {"X-API-Key": "secret123", "X-Custom": "value"}
|
|
source.set_headers(headers)
|
|
|
|
assert source.headers_encrypted is not None
|
|
assert isinstance(source.headers_encrypted, bytes)
|
|
|
|
def test_get_headers_decrypts(self):
|
|
"""Test that get_headers decrypts custom headers."""
|
|
from app.models import UpstreamSource
|
|
|
|
source = UpstreamSource()
|
|
headers = {"X-API-Key": "secret123", "X-Custom": "value"}
|
|
source.set_headers(headers)
|
|
|
|
decrypted = source.get_headers()
|
|
assert decrypted == headers
|
|
|
|
|
|
class TestCacheSettingsModel:
|
|
"""Tests for CacheSettings SQLAlchemy model."""
|
|
|
|
def test_model_fields_exist(self):
|
|
"""Test that model has all expected fields."""
|
|
from app.models import CacheSettings
|
|
|
|
settings = CacheSettings()
|
|
assert hasattr(settings, 'id')
|
|
assert hasattr(settings, 'auto_create_system_projects')
|
|
|
|
def test_model_with_values(self):
|
|
"""Test that model can be created with explicit values."""
|
|
from app.models import CacheSettings
|
|
|
|
settings = CacheSettings(
|
|
id=1,
|
|
auto_create_system_projects=True,
|
|
)
|
|
assert settings.id == 1
|
|
assert settings.auto_create_system_projects is True
|
|
|
|
|
|
class TestCachedUrlModel:
|
|
"""Tests for CachedUrl SQLAlchemy model."""
|
|
|
|
def test_model_fields_exist(self):
|
|
"""Test that model has all expected fields."""
|
|
from app.models import CachedUrl
|
|
|
|
cached = CachedUrl()
|
|
assert hasattr(cached, 'id')
|
|
assert hasattr(cached, 'url')
|
|
assert hasattr(cached, 'url_hash')
|
|
assert hasattr(cached, 'artifact_id')
|
|
assert hasattr(cached, 'source_id')
|
|
assert hasattr(cached, 'fetched_at')
|
|
assert hasattr(cached, 'response_headers')
|
|
|
|
def test_compute_url_hash(self):
|
|
"""Test URL hash computation."""
|
|
from app.models import CachedUrl
|
|
|
|
url = "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz"
|
|
hash1 = CachedUrl.compute_url_hash(url)
|
|
|
|
# Should be 64-character hex string (SHA256)
|
|
assert len(hash1) == 64
|
|
assert all(c in '0123456789abcdef' for c in hash1)
|
|
|
|
# Same URL should produce same hash
|
|
hash2 = CachedUrl.compute_url_hash(url)
|
|
assert hash1 == hash2
|
|
|
|
# Different URL should produce different hash
|
|
hash3 = CachedUrl.compute_url_hash("https://example.com/other")
|
|
assert hash1 != hash3
|
|
|
|
|
|
class TestUpstreamSourceSchemas:
|
|
"""Tests for Pydantic upstream source schemas."""
|
|
|
|
def test_create_schema_valid(self):
|
|
"""Test UpstreamSourceCreate with valid values."""
|
|
from app.schemas import UpstreamSourceCreate
|
|
|
|
source = UpstreamSourceCreate(
|
|
name="npm-private",
|
|
source_type="npm",
|
|
url="https://npm.example.com",
|
|
enabled=True,
|
|
auth_type="basic",
|
|
username="admin",
|
|
password="secret",
|
|
priority=50,
|
|
)
|
|
assert source.name == "npm-private"
|
|
assert source.source_type == "npm"
|
|
assert source.url == "https://npm.example.com"
|
|
assert source.priority == 50
|
|
|
|
def test_create_schema_defaults(self):
|
|
"""Test UpstreamSourceCreate default values."""
|
|
from app.schemas import UpstreamSourceCreate
|
|
|
|
source = UpstreamSourceCreate(
|
|
name="test",
|
|
url="https://example.com",
|
|
)
|
|
assert source.source_type == "generic"
|
|
assert source.enabled is False
|
|
assert source.auth_type == "none"
|
|
assert source.priority == 100
|
|
|
|
def test_create_schema_invalid_source_type(self):
|
|
"""Test UpstreamSourceCreate rejects invalid source_type."""
|
|
from app.schemas import UpstreamSourceCreate
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
UpstreamSourceCreate(
|
|
name="test",
|
|
url="https://example.com",
|
|
source_type="invalid",
|
|
)
|
|
assert "source_type must be one of" in str(exc_info.value)
|
|
|
|
def test_create_schema_invalid_auth_type(self):
|
|
"""Test UpstreamSourceCreate rejects invalid auth_type."""
|
|
from app.schemas import UpstreamSourceCreate
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
UpstreamSourceCreate(
|
|
name="test",
|
|
url="https://example.com",
|
|
auth_type="invalid",
|
|
)
|
|
assert "auth_type must be one of" in str(exc_info.value)
|
|
|
|
def test_create_schema_invalid_url(self):
|
|
"""Test UpstreamSourceCreate rejects invalid URL."""
|
|
from app.schemas import UpstreamSourceCreate
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
UpstreamSourceCreate(
|
|
name="test",
|
|
url="not-a-url",
|
|
)
|
|
assert "url must start with http" in str(exc_info.value)
|
|
|
|
def test_create_schema_invalid_priority(self):
|
|
"""Test UpstreamSourceCreate rejects invalid priority."""
|
|
from app.schemas import UpstreamSourceCreate
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
UpstreamSourceCreate(
|
|
name="test",
|
|
url="https://example.com",
|
|
priority=0,
|
|
)
|
|
assert "priority must be greater than 0" in str(exc_info.value)
|
|
|
|
def test_update_schema_all_optional(self):
|
|
"""Test UpstreamSourceUpdate allows all fields to be optional."""
|
|
from app.schemas import UpstreamSourceUpdate
|
|
|
|
update = UpstreamSourceUpdate()
|
|
assert update.name is None
|
|
assert update.url is None
|
|
|
|
def test_update_schema_partial(self):
|
|
"""Test UpstreamSourceUpdate with partial fields."""
|
|
from app.schemas import UpstreamSourceUpdate
|
|
|
|
update = UpstreamSourceUpdate(enabled=True, priority=50)
|
|
assert update.enabled is True
|
|
assert update.priority == 50
|
|
assert update.name is None
|
|
|
|
def test_response_schema_no_secrets(self):
|
|
"""Test UpstreamSourceResponse doesn't have secret fields."""
|
|
from app.schemas import UpstreamSourceResponse
|
|
|
|
field_names = set(UpstreamSourceResponse.model_fields.keys())
|
|
assert "password" not in field_names
|
|
assert "password_encrypted" not in field_names
|
|
assert "headers" not in field_names
|
|
assert "headers_encrypted" not in field_names
|
|
assert "has_password" in field_names
|
|
assert "has_headers" in field_names
|
|
|
|
|
|
class TestCacheSettingsSchemas:
|
|
"""Tests for Pydantic cache settings schemas."""
|
|
|
|
def test_update_schema_all_optional(self):
|
|
"""Test CacheSettingsUpdate allows all fields to be optional."""
|
|
from app.schemas import CacheSettingsUpdate
|
|
|
|
update = CacheSettingsUpdate()
|
|
assert update.auto_create_system_projects is None
|
|
|
|
def test_update_schema_partial(self):
|
|
"""Test CacheSettingsUpdate with partial fields."""
|
|
from app.schemas import CacheSettingsUpdate
|
|
|
|
update = CacheSettingsUpdate(auto_create_system_projects=True)
|
|
assert update.auto_create_system_projects is True
|
|
|
|
|
|
class TestCacheRequestSchemas:
|
|
"""Tests for Pydantic cache request schemas."""
|
|
|
|
def test_request_valid(self):
|
|
"""Test CacheRequest with valid values."""
|
|
from app.schemas import CacheRequest
|
|
|
|
request = CacheRequest(
|
|
url="https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
|
|
source_type="npm",
|
|
package_name="lodash",
|
|
version="4.17.21",
|
|
)
|
|
assert request.url == "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz"
|
|
assert request.source_type == "npm"
|
|
|
|
def test_request_invalid_url(self):
|
|
"""Test CacheRequest rejects invalid URL."""
|
|
from app.schemas import CacheRequest
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
CacheRequest(
|
|
url="not-a-url",
|
|
source_type="npm",
|
|
)
|
|
assert "url must start with http" in str(exc_info.value)
|
|
|
|
def test_request_invalid_source_type(self):
|
|
"""Test CacheRequest rejects invalid source_type."""
|
|
from app.schemas import CacheRequest
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
CacheRequest(
|
|
url="https://example.com/file.tgz",
|
|
source_type="invalid",
|
|
)
|
|
assert "source_type must be one of" in str(exc_info.value)
|
|
|
|
def test_request_expected_hash_normalized(self):
|
|
"""Test CacheRequest normalizes expected_hash."""
|
|
from app.schemas import CacheRequest
|
|
|
|
# With sha256: prefix
|
|
request = CacheRequest(
|
|
url="https://example.com/file.tgz",
|
|
source_type="generic",
|
|
expected_hash="sha256:abc123def456abc123def456abc123def456abc123def456abc123def456abc1",
|
|
)
|
|
assert request.expected_hash == "abc123def456abc123def456abc123def456abc123def456abc123def456abc1"
|
|
|
|
# Without prefix
|
|
request2 = CacheRequest(
|
|
url="https://example.com/file.tgz",
|
|
source_type="generic",
|
|
expected_hash="ABC123DEF456ABC123DEF456ABC123DEF456ABC123DEF456ABC123DEF456ABC1",
|
|
)
|
|
assert request2.expected_hash == "abc123def456abc123def456abc123def456abc123def456abc123def456abc1"
|
|
|
|
def test_request_invalid_expected_hash(self):
|
|
"""Test CacheRequest rejects invalid expected_hash."""
|
|
from app.schemas import CacheRequest
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
CacheRequest(
|
|
url="https://example.com/file.tgz",
|
|
source_type="generic",
|
|
expected_hash="not-a-valid-hash",
|
|
)
|
|
assert "64-character hex string" in str(exc_info.value)
|
|
|
|
|
|
class TestSourceTypesConstant:
|
|
"""Tests for source type constants."""
|
|
|
|
def test_source_types_contains_expected(self):
|
|
"""Test SOURCE_TYPES contains all expected values."""
|
|
from app.schemas import SOURCE_TYPES
|
|
|
|
assert "npm" in SOURCE_TYPES
|
|
assert "pypi" in SOURCE_TYPES
|
|
assert "maven" in SOURCE_TYPES
|
|
assert "docker" in SOURCE_TYPES
|
|
assert "helm" in SOURCE_TYPES
|
|
assert "nuget" in SOURCE_TYPES
|
|
assert "deb" in SOURCE_TYPES
|
|
assert "rpm" in SOURCE_TYPES
|
|
assert "generic" in SOURCE_TYPES
|
|
|
|
def test_auth_types_contains_expected(self):
|
|
"""Test AUTH_TYPES contains all expected values."""
|
|
from app.schemas import AUTH_TYPES
|
|
|
|
assert "none" in AUTH_TYPES
|
|
assert "basic" in AUTH_TYPES
|
|
assert "bearer" in AUTH_TYPES
|
|
assert "api_key" in AUTH_TYPES
|
|
|
|
|
|
# =============================================================================
|
|
# UpstreamClient Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestUpstreamClientConfig:
|
|
"""Tests for UpstreamClientConfig dataclass."""
|
|
|
|
def test_default_config(self):
|
|
"""Test default configuration values."""
|
|
from app.upstream import UpstreamClientConfig
|
|
|
|
config = UpstreamClientConfig()
|
|
assert config.connect_timeout == 30.0
|
|
assert config.read_timeout == 300.0
|
|
assert config.max_retries == 3
|
|
assert config.follow_redirects is True
|
|
assert config.max_redirects == 5
|
|
assert config.max_file_size is None
|
|
assert config.verify_ssl is True
|
|
|
|
def test_custom_config(self):
|
|
"""Test custom configuration values."""
|
|
from app.upstream import UpstreamClientConfig
|
|
|
|
config = UpstreamClientConfig(
|
|
connect_timeout=10.0,
|
|
read_timeout=60.0,
|
|
max_retries=5,
|
|
max_file_size=1024 * 1024,
|
|
)
|
|
assert config.connect_timeout == 10.0
|
|
assert config.read_timeout == 60.0
|
|
assert config.max_retries == 5
|
|
assert config.max_file_size == 1024 * 1024
|
|
|
|
|
|
class TestFetchResult:
|
|
"""Tests for FetchResult dataclass."""
|
|
|
|
def test_fetch_result_creation(self):
|
|
"""Test creating a FetchResult."""
|
|
from io import BytesIO
|
|
from app.upstream import FetchResult
|
|
|
|
content = BytesIO(b"test content")
|
|
result = FetchResult(
|
|
content=content,
|
|
sha256="abc123",
|
|
size=12,
|
|
content_type="text/plain",
|
|
response_headers={"x-custom": "value"},
|
|
source_name="test-source",
|
|
)
|
|
|
|
assert result.sha256 == "abc123"
|
|
assert result.size == 12
|
|
assert result.content_type == "text/plain"
|
|
assert result.source_name == "test-source"
|
|
|
|
def test_fetch_result_close(self):
|
|
"""Test that close() cleans up resources."""
|
|
import tempfile
|
|
from pathlib import Path
|
|
from app.upstream import FetchResult
|
|
|
|
# Create a temp file
|
|
with tempfile.NamedTemporaryFile(delete=False) as f:
|
|
f.write(b"test")
|
|
temp_path = Path(f.name)
|
|
|
|
content = open(temp_path, "rb")
|
|
result = FetchResult(
|
|
content=content,
|
|
sha256="abc",
|
|
size=4,
|
|
content_type=None,
|
|
response_headers={},
|
|
temp_path=temp_path,
|
|
)
|
|
|
|
assert temp_path.exists()
|
|
result.close()
|
|
assert not temp_path.exists()
|
|
|
|
|
|
class TestUpstreamClientSourceMatching:
|
|
"""Tests for URL-to-source matching."""
|
|
|
|
def test_match_source_by_url_prefix(self):
|
|
"""Test that sources are matched by URL prefix."""
|
|
from app.models import UpstreamSource
|
|
from app.upstream import UpstreamClient
|
|
|
|
source = UpstreamSource(
|
|
name="npm-public",
|
|
url="https://registry.npmjs.org",
|
|
enabled=True,
|
|
auth_type="none",
|
|
priority=100,
|
|
)
|
|
|
|
client = UpstreamClient(sources=[source])
|
|
|
|
# Should match
|
|
matched = client._match_source("https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz")
|
|
assert matched is not None
|
|
assert matched.name == "npm-public"
|
|
|
|
# Should not match
|
|
matched = client._match_source("https://pypi.org/simple/requests/")
|
|
assert matched is None
|
|
|
|
def test_match_source_priority_order(self):
|
|
"""Test that sources are matched by priority (lowest first)."""
|
|
from app.models import UpstreamSource
|
|
from app.upstream import UpstreamClient
|
|
|
|
source1 = UpstreamSource(
|
|
name="npm-private",
|
|
url="https://registry.npmjs.org",
|
|
enabled=True,
|
|
auth_type="basic",
|
|
priority=50,
|
|
)
|
|
source2 = UpstreamSource(
|
|
name="npm-public",
|
|
url="https://registry.npmjs.org",
|
|
enabled=True,
|
|
auth_type="none",
|
|
priority=100,
|
|
)
|
|
|
|
# Provide in wrong order - should be sorted by priority
|
|
client = UpstreamClient(sources=[source2, source1])
|
|
|
|
matched = client._match_source("https://registry.npmjs.org/lodash")
|
|
assert matched is not None
|
|
assert matched.name == "npm-private" # Lower priority wins
|
|
|
|
def test_no_match_returns_none(self):
|
|
"""Test that no match returns None."""
|
|
from app.upstream import UpstreamClient
|
|
|
|
client = UpstreamClient(sources=[])
|
|
matched = client._match_source("https://example.com/file.tgz")
|
|
assert matched is None
|
|
|
|
|
|
class TestUpstreamClientAuthHeaders:
|
|
"""Tests for authentication header building."""
|
|
|
|
def test_auth_none(self):
|
|
"""Test no authentication."""
|
|
from app.models import UpstreamSource
|
|
from app.upstream import UpstreamClient
|
|
|
|
source = UpstreamSource(auth_type="none")
|
|
client = UpstreamClient()
|
|
|
|
headers = client._build_auth_headers(source)
|
|
assert headers == {}
|
|
|
|
def test_auth_bearer(self):
|
|
"""Test bearer token authentication."""
|
|
from app.models import UpstreamSource
|
|
from app.upstream import UpstreamClient
|
|
|
|
source = UpstreamSource(auth_type="bearer")
|
|
source.set_password("my-bearer-token")
|
|
|
|
client = UpstreamClient()
|
|
headers = client._build_auth_headers(source)
|
|
|
|
assert headers == {"Authorization": "Bearer my-bearer-token"}
|
|
|
|
def test_auth_api_key(self):
|
|
"""Test API key authentication with custom headers."""
|
|
from app.models import UpstreamSource
|
|
from app.upstream import UpstreamClient
|
|
|
|
source = UpstreamSource(auth_type="api_key")
|
|
source.set_headers({"X-API-Key": "secret-key-123", "X-Custom": "value"})
|
|
|
|
client = UpstreamClient()
|
|
headers = client._build_auth_headers(source)
|
|
|
|
assert headers == {"X-API-Key": "secret-key-123", "X-Custom": "value"}
|
|
|
|
def test_auth_basic_returns_empty_headers(self):
|
|
"""Test that basic auth doesn't add headers (uses httpx auth param)."""
|
|
from app.models import UpstreamSource
|
|
from app.upstream import UpstreamClient
|
|
|
|
source = UpstreamSource(auth_type="basic", username="user")
|
|
source.set_password("pass")
|
|
|
|
client = UpstreamClient()
|
|
headers = client._build_auth_headers(source)
|
|
|
|
# Basic auth is handled via httpx auth parameter, not headers
|
|
assert headers == {}
|
|
|
|
def test_get_basic_auth(self):
|
|
"""Test getting basic auth credentials."""
|
|
from app.models import UpstreamSource
|
|
from app.upstream import UpstreamClient
|
|
|
|
source = UpstreamSource(auth_type="basic", username="user")
|
|
source.set_password("pass")
|
|
|
|
client = UpstreamClient()
|
|
auth = client._get_basic_auth(source)
|
|
|
|
assert auth == ("user", "pass")
|
|
|
|
def test_get_basic_auth_no_username(self):
|
|
"""Test basic auth without username returns None."""
|
|
from app.models import UpstreamSource
|
|
from app.upstream import UpstreamClient
|
|
|
|
source = UpstreamSource(auth_type="basic")
|
|
client = UpstreamClient()
|
|
auth = client._get_basic_auth(source)
|
|
|
|
assert auth is None
|
|
|
|
|
|
class TestUpstreamClientSourceDisabled:
|
|
"""Tests for disabled source handling."""
|
|
|
|
def test_disabled_source_raises_error(self):
|
|
"""Test that fetching from disabled source raises error."""
|
|
from app.models import UpstreamSource
|
|
from app.upstream import UpstreamClient, SourceDisabledError
|
|
|
|
source = UpstreamSource(
|
|
name="npm-public",
|
|
url="https://registry.npmjs.org",
|
|
enabled=False,
|
|
auth_type="none",
|
|
priority=100,
|
|
)
|
|
|
|
client = UpstreamClient(sources=[source])
|
|
|
|
with pytest.raises(SourceDisabledError) as exc_info:
|
|
client.fetch("https://registry.npmjs.org/lodash")
|
|
|
|
assert "npm-public" in str(exc_info.value)
|
|
assert "disabled" in str(exc_info.value)
|
|
|
|
|
|
class TestUpstreamClientRetryLogic:
|
|
"""Tests for retry and backoff logic."""
|
|
|
|
def test_should_retry_connection_error(self):
|
|
"""Test that connection errors trigger retry."""
|
|
import httpx
|
|
from app.upstream import UpstreamClient
|
|
|
|
client = UpstreamClient()
|
|
|
|
error = httpx.ConnectError("Connection refused")
|
|
assert client._should_retry(error, 0) is True
|
|
assert client._should_retry(error, 1) is True
|
|
assert client._should_retry(error, 2) is False # Max retries
|
|
|
|
def test_should_retry_timeout(self):
|
|
"""Test that timeouts trigger retry."""
|
|
import httpx
|
|
from app.upstream import UpstreamClient
|
|
|
|
client = UpstreamClient()
|
|
|
|
error = httpx.ReadTimeout("Read timed out")
|
|
assert client._should_retry(error, 0) is True
|
|
|
|
def test_should_not_retry_4xx(self):
|
|
"""Test that 4xx errors don't trigger retry."""
|
|
import httpx
|
|
from app.upstream import UpstreamClient
|
|
|
|
client = UpstreamClient()
|
|
|
|
response = httpx.Response(404, request=httpx.Request("GET", "http://test"))
|
|
error = httpx.HTTPStatusError("Not found", request=response.request, response=response)
|
|
assert client._should_retry(error, 0) is False
|
|
|
|
def test_should_retry_502_503_504(self):
|
|
"""Test that 502, 503, 504 errors trigger retry."""
|
|
import httpx
|
|
from app.upstream import UpstreamClient
|
|
|
|
client = UpstreamClient()
|
|
|
|
for status in [502, 503, 504]:
|
|
response = httpx.Response(status, request=httpx.Request("GET", "http://test"))
|
|
error = httpx.HTTPStatusError("Server error", request=response.request, response=response)
|
|
assert client._should_retry(error, 0) is True
|
|
|
|
def test_calculate_backoff(self):
|
|
"""Test exponential backoff calculation."""
|
|
from app.upstream import UpstreamClient, UpstreamClientConfig
|
|
|
|
config = UpstreamClientConfig(
|
|
retry_backoff_base=1.0,
|
|
retry_backoff_max=30.0,
|
|
)
|
|
client = UpstreamClient(config=config)
|
|
|
|
# First attempt should be around 1s (with jitter)
|
|
delay0 = client._calculate_backoff(0)
|
|
assert 0.75 <= delay0 <= 1.25
|
|
|
|
# Second attempt should be around 2s (with jitter)
|
|
delay1 = client._calculate_backoff(1)
|
|
assert 1.5 <= delay1 <= 2.5
|
|
|
|
# Third attempt should be around 4s (with jitter)
|
|
delay2 = client._calculate_backoff(2)
|
|
assert 3.0 <= delay2 <= 5.0
|
|
|
|
def test_backoff_respects_max(self):
|
|
"""Test that backoff respects maximum delay."""
|
|
from app.upstream import UpstreamClient, UpstreamClientConfig
|
|
|
|
config = UpstreamClientConfig(
|
|
retry_backoff_base=10.0,
|
|
retry_backoff_max=5.0, # Max is less than base * 2^attempt
|
|
)
|
|
client = UpstreamClient(config=config)
|
|
|
|
delay = client._calculate_backoff(5) # Would be 10 * 32 = 320
|
|
assert delay <= 5.0
|
|
|
|
|
|
class TestUpstreamClientExceptionConversion:
|
|
"""Tests for exception conversion."""
|
|
|
|
def test_convert_connect_error(self):
|
|
"""Test converting connect error."""
|
|
import httpx
|
|
from app.upstream import UpstreamClient, UpstreamConnectionError
|
|
|
|
client = UpstreamClient()
|
|
error = httpx.ConnectError("Connection refused")
|
|
|
|
with pytest.raises(UpstreamConnectionError):
|
|
client._raise_upstream_error(error, "http://test")
|
|
|
|
def test_convert_timeout_error(self):
|
|
"""Test converting timeout error."""
|
|
import httpx
|
|
from app.upstream import UpstreamClient, UpstreamTimeoutError
|
|
|
|
client = UpstreamClient()
|
|
error = httpx.ReadTimeout("Read timed out")
|
|
|
|
with pytest.raises(UpstreamTimeoutError):
|
|
client._raise_upstream_error(error, "http://test")
|
|
|
|
def test_convert_http_error(self):
|
|
"""Test converting HTTP status error."""
|
|
import httpx
|
|
from app.upstream import UpstreamClient, UpstreamHTTPError
|
|
|
|
client = UpstreamClient()
|
|
response = httpx.Response(
|
|
404,
|
|
request=httpx.Request("GET", "http://test"),
|
|
headers={"x-custom": "value"},
|
|
)
|
|
error = httpx.HTTPStatusError("Not found", request=response.request, response=response)
|
|
|
|
with pytest.raises(UpstreamHTTPError) as exc_info:
|
|
client._raise_upstream_error(error, "http://test")
|
|
|
|
assert exc_info.value.status_code == 404
|
|
|
|
|
|
class TestUpstreamClientFileSizeLimit:
|
|
"""Tests for file size limit enforcement."""
|
|
|
|
def test_file_size_limit_dataclass(self):
|
|
"""Test FileSizeExceededError contains expected data."""
|
|
from app.upstream import FileSizeExceededError
|
|
|
|
error = FileSizeExceededError("Too large", 1000, 500)
|
|
assert error.content_length == 1000
|
|
assert error.max_size == 500
|
|
assert "Too large" in str(error)
|
|
|
|
|
|
class TestUpstreamExceptions:
|
|
"""Tests for upstream exception classes."""
|
|
|
|
def test_upstream_error_base(self):
|
|
"""Test base UpstreamError."""
|
|
from app.upstream import UpstreamError
|
|
|
|
error = UpstreamError("Test error")
|
|
assert str(error) == "Test error"
|
|
|
|
def test_upstream_http_error(self):
|
|
"""Test UpstreamHTTPError with status code."""
|
|
from app.upstream import UpstreamHTTPError
|
|
|
|
error = UpstreamHTTPError("Not found", 404, {"x-custom": "value"})
|
|
assert error.status_code == 404
|
|
assert error.response_headers == {"x-custom": "value"}
|
|
|
|
def test_source_not_found_error(self):
|
|
"""Test SourceNotFoundError."""
|
|
from app.upstream import SourceNotFoundError
|
|
|
|
error = SourceNotFoundError("No source for URL")
|
|
assert "No source for URL" in str(error)
|
|
|
|
def test_source_disabled_error(self):
|
|
"""Test SourceDisabledError."""
|
|
from app.upstream import SourceDisabledError
|
|
|
|
error = SourceDisabledError("Source is disabled")
|
|
assert "Source is disabled" in str(error)
|
|
|
|
|
|
# =============================================================================
|
|
# URL Parsing Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestNpmUrlParsing:
|
|
"""Tests for npm URL parsing."""
|
|
|
|
def test_parse_unscoped_package(self):
|
|
"""Test parsing unscoped npm package URL."""
|
|
from app.cache import parse_npm_url
|
|
|
|
result = parse_npm_url("https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz")
|
|
assert result is not None
|
|
assert result.package_name == "lodash"
|
|
assert result.version == "4.17.21"
|
|
assert result.filename == "lodash-4.17.21.tgz"
|
|
|
|
def test_parse_scoped_package(self):
|
|
"""Test parsing scoped npm package URL."""
|
|
from app.cache import parse_npm_url
|
|
|
|
result = parse_npm_url("https://registry.npmjs.org/@types/node/-/node-18.0.0.tgz")
|
|
assert result is not None
|
|
assert result.package_name == "@types/node"
|
|
assert result.version == "18.0.0"
|
|
assert result.filename == "node-18.0.0.tgz"
|
|
|
|
def test_parse_invalid_url(self):
|
|
"""Test parsing invalid npm URL returns None."""
|
|
from app.cache import parse_npm_url
|
|
|
|
result = parse_npm_url("https://example.com/random-file.tgz")
|
|
assert result is None
|
|
|
|
|
|
class TestPypiUrlParsing:
|
|
"""Tests for PyPI URL parsing."""
|
|
|
|
def test_parse_sdist_tar_gz(self):
|
|
"""Test parsing PyPI source distribution."""
|
|
from app.cache import parse_pypi_url
|
|
|
|
result = parse_pypi_url("https://files.pythonhosted.org/packages/ab/cd/requests-2.28.0.tar.gz")
|
|
assert result is not None
|
|
assert result.package_name == "requests"
|
|
assert result.version == "2.28.0"
|
|
assert result.filename == "requests-2.28.0.tar.gz"
|
|
|
|
def test_parse_wheel(self):
|
|
"""Test parsing PyPI wheel file."""
|
|
from app.cache import parse_pypi_url
|
|
|
|
result = parse_pypi_url("https://files.pythonhosted.org/packages/ab/cd/requests-2.28.0-py3-none-any.whl")
|
|
assert result is not None
|
|
assert result.package_name == "requests"
|
|
assert result.version == "2.28.0"
|
|
|
|
def test_parse_underscore_package(self):
|
|
"""Test parsing package name with underscore."""
|
|
from app.cache import parse_pypi_url
|
|
|
|
result = parse_pypi_url("https://files.pythonhosted.org/packages/ab/cd/some_package-1.0.0.tar.gz")
|
|
assert result is not None
|
|
assert result.package_name == "some-package" # Normalized
|
|
assert result.version == "1.0.0"
|
|
|
|
|
|
class TestMavenUrlParsing:
|
|
"""Tests for Maven URL parsing."""
|
|
|
|
def test_parse_maven_jar(self):
|
|
"""Test parsing Maven JAR URL."""
|
|
from app.cache import parse_maven_url
|
|
|
|
result = parse_maven_url(
|
|
"https://repo1.maven.org/maven2/org/apache/commons/commons-lang3/3.12.0/commons-lang3-3.12.0.jar"
|
|
)
|
|
assert result is not None
|
|
assert result.package_name == "org.apache.commons:commons-lang3"
|
|
assert result.version == "3.12.0"
|
|
|
|
def test_parse_maven_with_classifier(self):
|
|
"""Test parsing Maven URL with version containing classifier."""
|
|
from app.cache import parse_maven_url
|
|
|
|
result = parse_maven_url(
|
|
"https://repo1.maven.org/maven2/com/google/guava/guava/31.1-jre/guava-31.1-jre.jar"
|
|
)
|
|
assert result is not None
|
|
assert result.package_name == "com.google.guava:guava"
|
|
assert result.version == "31.1-jre"
|
|
|
|
|
|
class TestGenericUrlParsing:
|
|
"""Tests for generic URL parsing."""
|
|
|
|
def test_parse_with_version(self):
|
|
"""Test parsing generic URL with version in filename."""
|
|
from app.cache import parse_generic_url
|
|
|
|
result = parse_generic_url("https://example.com/downloads/myapp-1.2.3.tar.gz")
|
|
assert result.package_name == "myapp"
|
|
assert result.version == "1.2.3"
|
|
|
|
def test_parse_without_version(self):
|
|
"""Test parsing generic URL without version."""
|
|
from app.cache import parse_generic_url
|
|
|
|
result = parse_generic_url("https://example.com/downloads/artifact.tar.gz")
|
|
assert result.package_name == "artifact"
|
|
assert result.version is None
|
|
|
|
def test_parse_various_extensions(self):
|
|
"""Test parsing various file extensions."""
|
|
from app.cache import parse_generic_url
|
|
|
|
for ext in ["tar.gz", "tar.bz2", "zip", "jar", "deb", "rpm"]:
|
|
result = parse_generic_url(f"https://example.com/pkg-1.0.{ext}")
|
|
assert result.package_name == "pkg"
|
|
assert result.version == "1.0"
|
|
|
|
|
|
class TestParseUrl:
|
|
"""Tests for the unified parse_url function."""
|
|
|
|
def test_npm_source_type(self):
|
|
"""Test parse_url with npm source type."""
|
|
from app.cache import parse_url
|
|
|
|
result = parse_url(
|
|
"https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
|
|
"npm"
|
|
)
|
|
assert result.package_name == "lodash"
|
|
assert result.version == "4.17.21"
|
|
|
|
def test_fallback_to_generic(self):
|
|
"""Test parse_url falls back to generic parsing."""
|
|
from app.cache import parse_url
|
|
|
|
# npm parser can't parse this, should fall back to generic
|
|
result = parse_url("https://example.com/myfile-1.0.tar.gz", "npm")
|
|
assert result.package_name == "myfile"
|
|
assert result.version == "1.0"
|
|
|
|
def test_pypi_source_type(self):
|
|
"""Test parse_url with pypi source type."""
|
|
from app.cache import parse_url
|
|
|
|
result = parse_url(
|
|
"https://files.pythonhosted.org/packages/ab/cd/requests-2.28.0.tar.gz",
|
|
"pypi"
|
|
)
|
|
assert result.package_name == "requests"
|
|
assert result.version == "2.28.0"
|
|
|
|
|
|
class TestSystemProjectHelpers:
|
|
"""Tests for system project helper functions."""
|
|
|
|
def test_get_system_project_name(self):
|
|
"""Test getting system project names."""
|
|
from app.cache import get_system_project_name
|
|
|
|
assert get_system_project_name("npm") == "_npm"
|
|
assert get_system_project_name("pypi") == "_pypi"
|
|
assert get_system_project_name("maven") == "_maven"
|
|
assert get_system_project_name("docker") == "_docker"
|
|
assert get_system_project_name("unknown") == "_generic"
|
|
|
|
def test_get_system_project_description(self):
|
|
"""Test getting system project descriptions."""
|
|
from app.cache import get_system_project_description
|
|
|
|
assert "npm" in get_system_project_description("npm").lower()
|
|
assert "pypi" in get_system_project_description("pypi").lower()
|
|
|
|
|
|
# =============================================================================
|
|
# Cache Endpoint Integration Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestCacheEndpointRequiresAuth:
|
|
"""Tests for cache endpoint authentication."""
|
|
|
|
@pytest.mark.integration
|
|
def test_cache_requires_authentication(self):
|
|
"""Test that cache endpoint requires authentication."""
|
|
import httpx
|
|
|
|
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
|
|
|
# Use fresh client WITHOUT authentication
|
|
with httpx.Client(base_url=base_url, timeout=30.0) as unauthenticated_client:
|
|
response = unauthenticated_client.post(
|
|
"/api/v1/cache",
|
|
json={
|
|
"url": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
|
|
"source_type": "npm",
|
|
},
|
|
)
|
|
assert response.status_code in (401, 403)
|
|
|
|
|
|
class TestCacheRequestValidation:
|
|
"""Tests for cache request validation."""
|
|
|
|
def test_cache_request_validates_url(self):
|
|
"""Test that CacheRequest validates URL format."""
|
|
from pydantic import ValidationError
|
|
from app.schemas import CacheRequest
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
CacheRequest(url="not-a-url", source_type="npm")
|
|
assert "url must start with http" in str(exc_info.value)
|
|
|
|
def test_cache_request_validates_source_type(self):
|
|
"""Test that CacheRequest validates source_type."""
|
|
from pydantic import ValidationError
|
|
from app.schemas import CacheRequest
|
|
|
|
with pytest.raises(ValidationError) as exc_info:
|
|
CacheRequest(url="https://example.com/file.tgz", source_type="invalid")
|
|
assert "source_type must be one of" in str(exc_info.value)
|
|
|
|
def test_cache_request_valid(self):
|
|
"""Test valid CacheRequest."""
|
|
from app.schemas import CacheRequest
|
|
|
|
request = CacheRequest(
|
|
url="https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz",
|
|
source_type="npm",
|
|
package_name="lodash",
|
|
version="4.17.21",
|
|
)
|
|
assert request.url == "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz"
|
|
assert request.source_type == "npm"
|
|
|
|
|
|
class TestCacheResponseSchema:
|
|
"""Tests for CacheResponse schema."""
|
|
|
|
def test_cache_response_fields(self):
|
|
"""Test CacheResponse has expected fields."""
|
|
from app.schemas import CacheResponse
|
|
|
|
field_names = set(CacheResponse.model_fields.keys())
|
|
assert "artifact_id" in field_names
|
|
assert "sha256" in field_names
|
|
assert "size" in field_names
|
|
assert "already_cached" in field_names
|
|
assert "source_url" in field_names
|
|
assert "system_project" in field_names
|
|
assert "system_package" in field_names
|
|
|
|
|
|
# =============================================================================
|
|
# System Projects Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestSystemProjectRestrictions:
|
|
"""Tests for system project restrictions."""
|
|
|
|
@pytest.mark.integration
|
|
def test_cannot_delete_system_project(self, integration_client):
|
|
"""Test that system projects cannot be deleted."""
|
|
# First, create a system project by checking if _npm exists
|
|
# or we need to trigger its creation via the cache endpoint
|
|
response = integration_client.get("/api/v1/system-projects")
|
|
assert response.status_code == 200
|
|
system_projects = response.json()
|
|
|
|
# If there are no system projects, skip this test
|
|
if not system_projects:
|
|
pytest.skip("No system projects exist to test deletion")
|
|
|
|
# Try to delete a system project
|
|
project_name = system_projects[0]["name"]
|
|
response = integration_client.delete(f"/api/v1/projects/{project_name}")
|
|
assert response.status_code == 403
|
|
assert "cannot be deleted" in response.json()["detail"].lower()
|
|
|
|
@pytest.mark.integration
|
|
def test_cannot_make_system_project_private(self, integration_client):
|
|
"""Test that system projects cannot be made private."""
|
|
response = integration_client.get("/api/v1/system-projects")
|
|
assert response.status_code == 200
|
|
system_projects = response.json()
|
|
|
|
if not system_projects:
|
|
pytest.skip("No system projects exist to test update")
|
|
|
|
# Try to make a system project private
|
|
project_name = system_projects[0]["name"]
|
|
response = integration_client.put(
|
|
f"/api/v1/projects/{project_name}",
|
|
json={"is_public": False},
|
|
)
|
|
assert response.status_code == 403
|
|
assert "cannot be made private" in response.json()["detail"].lower()
|
|
|
|
@pytest.mark.integration
|
|
def test_can_update_system_project_description(self, integration_client):
|
|
"""Test that system project descriptions can be updated."""
|
|
response = integration_client.get("/api/v1/system-projects")
|
|
assert response.status_code == 200
|
|
system_projects = response.json()
|
|
|
|
if not system_projects:
|
|
pytest.skip("No system projects exist to test update")
|
|
|
|
# Update description should work
|
|
project_name = system_projects[0]["name"]
|
|
new_description = "Updated description for testing"
|
|
response = integration_client.put(
|
|
f"/api/v1/projects/{project_name}",
|
|
json={"description": new_description},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["description"] == new_description
|
|
|
|
|
|
class TestSystemProjectsEndpoint:
|
|
"""Tests for the system projects listing endpoint."""
|
|
|
|
@pytest.mark.integration
|
|
def test_list_system_projects_requires_auth(self):
|
|
"""Test that listing system projects requires authentication."""
|
|
import httpx
|
|
|
|
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
|
|
|
with httpx.Client(base_url=base_url, timeout=30.0) as unauthenticated_client:
|
|
response = unauthenticated_client.get("/api/v1/system-projects")
|
|
assert response.status_code == 401
|
|
|
|
@pytest.mark.integration
|
|
def test_list_system_projects_success(self, integration_client):
|
|
"""Test listing system projects returns valid response."""
|
|
response = integration_client.get("/api/v1/system-projects")
|
|
assert response.status_code == 200
|
|
|
|
# Response should be a list
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
|
|
# If any system projects exist, they should all have is_system=true
|
|
for project in data:
|
|
assert project.get("is_system") is True or project.get("name", "").startswith("_")
|
|
|
|
|
|
# =============================================================================
|
|
# Upstream Sources Admin API Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestUpstreamSourcesAdminAPI:
|
|
"""Tests for the upstream sources admin API."""
|
|
|
|
@pytest.mark.integration
|
|
def test_list_upstream_sources_requires_admin(self):
|
|
"""Test that listing upstream sources requires admin access."""
|
|
import httpx
|
|
|
|
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
|
|
|
with httpx.Client(base_url=base_url, timeout=30.0) as unauthenticated_client:
|
|
response = unauthenticated_client.get("/api/v1/admin/upstream-sources")
|
|
assert response.status_code in (401, 403)
|
|
|
|
@pytest.mark.integration
|
|
def test_list_upstream_sources_success(self, integration_client):
|
|
"""Test listing upstream sources as admin."""
|
|
response = integration_client.get("/api/v1/admin/upstream-sources")
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
assert isinstance(data, list)
|
|
|
|
# Check that seeded sources exist
|
|
names = [s["name"] for s in data]
|
|
# At minimum, these should exist from seeding
|
|
assert any("npm" in name.lower() for name in names) or len(data) >= 0
|
|
|
|
@pytest.mark.integration
|
|
def test_list_upstream_sources_filter_by_enabled(self, integration_client):
|
|
"""Test filtering upstream sources by enabled status."""
|
|
response = integration_client.get("/api/v1/admin/upstream-sources?enabled=true")
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
for source in data:
|
|
assert source["enabled"] is True
|
|
|
|
@pytest.mark.integration
|
|
def test_list_upstream_sources_filter_by_type(self, integration_client):
|
|
"""Test filtering upstream sources by source type."""
|
|
response = integration_client.get("/api/v1/admin/upstream-sources?source_type=npm")
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
for source in data:
|
|
assert source["source_type"] == "npm"
|
|
|
|
@pytest.mark.integration
|
|
def test_create_upstream_source(self, integration_client, unique_test_id):
|
|
"""Test creating a new upstream source."""
|
|
source_name = f"test-source-{unique_test_id}"
|
|
|
|
response = integration_client.post(
|
|
"/api/v1/admin/upstream-sources",
|
|
json={
|
|
"name": source_name,
|
|
"source_type": "generic",
|
|
"url": "https://example.com/packages",
|
|
"enabled": False,
|
|
"auth_type": "none",
|
|
"priority": 200,
|
|
},
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
data = response.json()
|
|
assert data["name"] == source_name
|
|
assert data["source_type"] == "generic"
|
|
assert data["url"] == "https://example.com/packages"
|
|
assert data["enabled"] is False
|
|
assert data["priority"] == 200
|
|
assert "id" in data
|
|
|
|
# Clean up
|
|
source_id = data["id"]
|
|
integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}")
|
|
|
|
@pytest.mark.integration
|
|
def test_create_upstream_source_with_auth(self, integration_client, unique_test_id):
|
|
"""Test creating an upstream source with authentication."""
|
|
source_name = f"test-auth-source-{unique_test_id}"
|
|
|
|
response = integration_client.post(
|
|
"/api/v1/admin/upstream-sources",
|
|
json={
|
|
"name": source_name,
|
|
"source_type": "npm",
|
|
"url": "https://npm.internal.corp",
|
|
"enabled": False,
|
|
"auth_type": "basic",
|
|
"username": "reader",
|
|
"password": "secret123",
|
|
"priority": 50,
|
|
},
|
|
)
|
|
assert response.status_code == 201
|
|
|
|
data = response.json()
|
|
assert data["name"] == source_name
|
|
assert data["auth_type"] == "basic"
|
|
assert data["username"] == "reader"
|
|
assert data["has_password"] is True
|
|
# Password should NOT be in response
|
|
assert "password" not in data
|
|
|
|
# Clean up
|
|
source_id = data["id"]
|
|
integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}")
|
|
|
|
@pytest.mark.integration
|
|
def test_create_upstream_source_duplicate_name(self, integration_client, unique_test_id):
|
|
"""Test that duplicate source names are rejected."""
|
|
source_name = f"test-dup-{unique_test_id}"
|
|
|
|
# Create first source
|
|
response1 = integration_client.post(
|
|
"/api/v1/admin/upstream-sources",
|
|
json={
|
|
"name": source_name,
|
|
"source_type": "generic",
|
|
"url": "https://example1.com",
|
|
},
|
|
)
|
|
assert response1.status_code == 201
|
|
source_id = response1.json()["id"]
|
|
|
|
# Try to create duplicate
|
|
response2 = integration_client.post(
|
|
"/api/v1/admin/upstream-sources",
|
|
json={
|
|
"name": source_name,
|
|
"source_type": "generic",
|
|
"url": "https://example2.com",
|
|
},
|
|
)
|
|
assert response2.status_code == 409
|
|
assert "already exists" in response2.json()["detail"]
|
|
|
|
# Clean up
|
|
integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}")
|
|
|
|
@pytest.mark.integration
|
|
def test_get_upstream_source(self, integration_client, unique_test_id):
|
|
"""Test getting a specific upstream source."""
|
|
source_name = f"test-get-{unique_test_id}"
|
|
|
|
# Create source
|
|
create_response = integration_client.post(
|
|
"/api/v1/admin/upstream-sources",
|
|
json={
|
|
"name": source_name,
|
|
"source_type": "pypi",
|
|
"url": "https://pypi.internal.corp",
|
|
},
|
|
)
|
|
assert create_response.status_code == 201
|
|
source_id = create_response.json()["id"]
|
|
|
|
# Get source
|
|
response = integration_client.get(f"/api/v1/admin/upstream-sources/{source_id}")
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
assert data["id"] == source_id
|
|
assert data["name"] == source_name
|
|
assert data["source_type"] == "pypi"
|
|
|
|
# Clean up
|
|
integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}")
|
|
|
|
@pytest.mark.integration
|
|
def test_get_upstream_source_not_found(self, integration_client):
|
|
"""Test getting a non-existent upstream source."""
|
|
fake_id = "00000000-0000-0000-0000-000000000000"
|
|
response = integration_client.get(f"/api/v1/admin/upstream-sources/{fake_id}")
|
|
assert response.status_code == 404
|
|
|
|
@pytest.mark.integration
|
|
def test_update_upstream_source(self, integration_client, unique_test_id):
|
|
"""Test updating an upstream source."""
|
|
source_name = f"test-update-{unique_test_id}"
|
|
|
|
# Create source
|
|
create_response = integration_client.post(
|
|
"/api/v1/admin/upstream-sources",
|
|
json={
|
|
"name": source_name,
|
|
"source_type": "generic",
|
|
"url": "https://example.com",
|
|
"enabled": False,
|
|
"priority": 100,
|
|
},
|
|
)
|
|
assert create_response.status_code == 201
|
|
source_id = create_response.json()["id"]
|
|
|
|
# Update source
|
|
response = integration_client.put(
|
|
f"/api/v1/admin/upstream-sources/{source_id}",
|
|
json={
|
|
"enabled": True,
|
|
"priority": 50,
|
|
"url": "https://example-updated.com",
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
assert data["enabled"] is True
|
|
assert data["priority"] == 50
|
|
assert data["url"] == "https://example-updated.com"
|
|
# Name should be unchanged
|
|
assert data["name"] == source_name
|
|
|
|
# Clean up
|
|
integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}")
|
|
|
|
@pytest.mark.integration
|
|
def test_update_upstream_source_password(self, integration_client, unique_test_id):
|
|
"""Test updating an upstream source's password."""
|
|
source_name = f"test-pwd-{unique_test_id}"
|
|
|
|
# Create source without password
|
|
create_response = integration_client.post(
|
|
"/api/v1/admin/upstream-sources",
|
|
json={
|
|
"name": source_name,
|
|
"source_type": "generic",
|
|
"url": "https://example.com",
|
|
"auth_type": "basic",
|
|
"username": "user",
|
|
},
|
|
)
|
|
assert create_response.status_code == 201
|
|
source_id = create_response.json()["id"]
|
|
assert create_response.json()["has_password"] is False
|
|
|
|
# Add password
|
|
response = integration_client.put(
|
|
f"/api/v1/admin/upstream-sources/{source_id}",
|
|
json={"password": "newpassword"},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["has_password"] is True
|
|
|
|
# Clear password with empty string
|
|
response = integration_client.put(
|
|
f"/api/v1/admin/upstream-sources/{source_id}",
|
|
json={"password": ""},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["has_password"] is False
|
|
|
|
# Clean up
|
|
integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}")
|
|
|
|
@pytest.mark.integration
|
|
def test_delete_upstream_source(self, integration_client, unique_test_id):
|
|
"""Test deleting an upstream source."""
|
|
source_name = f"test-delete-{unique_test_id}"
|
|
|
|
# Create source
|
|
create_response = integration_client.post(
|
|
"/api/v1/admin/upstream-sources",
|
|
json={
|
|
"name": source_name,
|
|
"source_type": "generic",
|
|
"url": "https://example.com",
|
|
},
|
|
)
|
|
assert create_response.status_code == 201
|
|
source_id = create_response.json()["id"]
|
|
|
|
# Delete source
|
|
response = integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}")
|
|
assert response.status_code == 204
|
|
|
|
# Verify it's gone
|
|
get_response = integration_client.get(f"/api/v1/admin/upstream-sources/{source_id}")
|
|
assert get_response.status_code == 404
|
|
|
|
@pytest.mark.integration
|
|
def test_test_upstream_source_connectivity(self, integration_client, unique_test_id):
|
|
"""Test the connectivity test endpoint."""
|
|
source_name = f"test-conn-{unique_test_id}"
|
|
|
|
# Create source with a URL that should respond
|
|
create_response = integration_client.post(
|
|
"/api/v1/admin/upstream-sources",
|
|
json={
|
|
"name": source_name,
|
|
"source_type": "generic",
|
|
"url": "https://httpbin.org/get", # Public test endpoint
|
|
"enabled": False,
|
|
},
|
|
)
|
|
assert create_response.status_code == 201
|
|
source_id = create_response.json()["id"]
|
|
|
|
# Test connectivity
|
|
response = integration_client.post(
|
|
f"/api/v1/admin/upstream-sources/{source_id}/test"
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
assert "success" in data
|
|
assert "elapsed_ms" in data
|
|
assert data["source_id"] == source_id
|
|
assert data["source_name"] == source_name
|
|
|
|
# Clean up
|
|
integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}")
|
|
|
|
|
|
# =============================================================================
|
|
# Cache Settings Admin API Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestCacheSettingsAdminAPI:
|
|
"""Tests for the cache settings admin API."""
|
|
|
|
@pytest.mark.integration
|
|
def test_get_cache_settings_requires_admin(self):
|
|
"""Test that getting cache settings requires admin access."""
|
|
import httpx
|
|
|
|
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
|
|
|
with httpx.Client(base_url=base_url, timeout=30.0) as unauthenticated_client:
|
|
response = unauthenticated_client.get("/api/v1/admin/cache-settings")
|
|
assert response.status_code in (401, 403)
|
|
|
|
@pytest.mark.integration
|
|
def test_get_cache_settings_success(self, integration_client):
|
|
"""Test getting cache settings as admin."""
|
|
response = integration_client.get("/api/v1/admin/cache-settings")
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
# Check expected fields exist
|
|
assert "auto_create_system_projects" in data
|
|
|
|
# Check types
|
|
assert isinstance(data["auto_create_system_projects"], bool)
|
|
|
|
@pytest.mark.integration
|
|
def test_update_cache_settings_requires_admin(self):
|
|
"""Test that updating cache settings requires admin access."""
|
|
import httpx
|
|
|
|
base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080")
|
|
|
|
with httpx.Client(base_url=base_url, timeout=30.0) as unauthenticated_client:
|
|
response = unauthenticated_client.put(
|
|
"/api/v1/admin/cache-settings",
|
|
json={"auto_create_system_projects": False},
|
|
)
|
|
assert response.status_code in (401, 403)
|
|
|
|
@pytest.mark.integration
|
|
def test_update_cache_settings_success(self, integration_client):
|
|
"""Test updating cache settings as admin."""
|
|
# First get current settings to restore later
|
|
original = integration_client.get("/api/v1/admin/cache-settings").json()
|
|
|
|
# Update settings
|
|
response = integration_client.put(
|
|
"/api/v1/admin/cache-settings",
|
|
json={
|
|
"auto_create_system_projects": not original["auto_create_system_projects"],
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
assert data["auto_create_system_projects"] == (not original["auto_create_system_projects"])
|
|
|
|
# Restore original settings
|
|
integration_client.put(
|
|
"/api/v1/admin/cache-settings",
|
|
json={
|
|
"auto_create_system_projects": original["auto_create_system_projects"],
|
|
},
|
|
)
|
|
|
|
@pytest.mark.integration
|
|
def test_update_cache_settings_partial(self, integration_client):
|
|
"""Test that partial updates only change specified fields."""
|
|
# Get current settings
|
|
original = integration_client.get("/api/v1/admin/cache-settings").json()
|
|
|
|
# Update only auto_create_system_projects
|
|
new_value = not original["auto_create_system_projects"]
|
|
response = integration_client.put(
|
|
"/api/v1/admin/cache-settings",
|
|
json={"auto_create_system_projects": new_value},
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
assert data["auto_create_system_projects"] == new_value
|
|
|
|
# Restore
|
|
integration_client.put(
|
|
"/api/v1/admin/cache-settings",
|
|
json={"auto_create_system_projects": original["auto_create_system_projects"]},
|
|
)
|
|
|
|
@pytest.mark.integration
|
|
def test_update_cache_settings_auto_create_system_projects(self, integration_client):
|
|
"""Test updating auto_create_system_projects setting."""
|
|
# Get current settings
|
|
original = integration_client.get("/api/v1/admin/cache-settings").json()
|
|
|
|
# Toggle auto_create_system_projects
|
|
new_value = not original["auto_create_system_projects"]
|
|
response = integration_client.put(
|
|
"/api/v1/admin/cache-settings",
|
|
json={"auto_create_system_projects": new_value},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json()["auto_create_system_projects"] == new_value
|
|
|
|
# Restore
|
|
integration_client.put(
|
|
"/api/v1/admin/cache-settings",
|
|
json={"auto_create_system_projects": original["auto_create_system_projects"]},
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Environment Variable Configuration Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestEnvVarUpstreamSourcesParsing:
|
|
"""Tests for parsing upstream sources from environment variables."""
|
|
|
|
def test_parse_upstream_sources_basic(self):
|
|
"""Test parsing a basic upstream source from env vars."""
|
|
from app.config import parse_upstream_sources_from_env
|
|
import os
|
|
|
|
# Set env vars
|
|
test_env = {
|
|
"ORCHARD_UPSTREAM__TEST_SOURCE__URL": "https://example.com/packages",
|
|
"ORCHARD_UPSTREAM__TEST_SOURCE__TYPE": "generic",
|
|
"ORCHARD_UPSTREAM__TEST_SOURCE__ENABLED": "true",
|
|
}
|
|
|
|
original_env = {}
|
|
for key in test_env:
|
|
original_env[key] = os.environ.get(key)
|
|
os.environ[key] = test_env[key]
|
|
|
|
try:
|
|
sources = parse_upstream_sources_from_env()
|
|
assert len(sources) >= 1
|
|
|
|
# Find our test source
|
|
test_source = next((s for s in sources if "test" in s.name), None)
|
|
assert test_source is not None
|
|
assert test_source.url == "https://example.com/packages"
|
|
assert test_source.source_type == "generic"
|
|
assert test_source.enabled is True
|
|
assert test_source.source == "env"
|
|
finally:
|
|
# Restore original env
|
|
for key, value in original_env.items():
|
|
if value is None:
|
|
os.environ.pop(key, None)
|
|
else:
|
|
os.environ[key] = value
|
|
|
|
def test_parse_upstream_sources_with_auth(self):
|
|
"""Test parsing an upstream source with authentication from env vars."""
|
|
from app.config import parse_upstream_sources_from_env
|
|
import os
|
|
|
|
test_env = {
|
|
"ORCHARD_UPSTREAM__AUTH_TEST__URL": "https://secure.example.com",
|
|
"ORCHARD_UPSTREAM__AUTH_TEST__TYPE": "npm",
|
|
"ORCHARD_UPSTREAM__AUTH_TEST__AUTH_TYPE": "basic",
|
|
"ORCHARD_UPSTREAM__AUTH_TEST__USERNAME": "myuser",
|
|
"ORCHARD_UPSTREAM__AUTH_TEST__PASSWORD": "secret123",
|
|
"ORCHARD_UPSTREAM__AUTH_TEST__PRIORITY": "50",
|
|
}
|
|
|
|
original_env = {}
|
|
for key in test_env:
|
|
original_env[key] = os.environ.get(key)
|
|
os.environ[key] = test_env[key]
|
|
|
|
try:
|
|
sources = parse_upstream_sources_from_env()
|
|
test_source = next((s for s in sources if "auth" in s.name), None)
|
|
assert test_source is not None
|
|
assert test_source.auth_type == "basic"
|
|
assert test_source.username == "myuser"
|
|
assert test_source.password == "secret123"
|
|
assert test_source.priority == 50
|
|
finally:
|
|
for key, value in original_env.items():
|
|
if value is None:
|
|
os.environ.pop(key, None)
|
|
else:
|
|
os.environ[key] = value
|
|
|
|
def test_parse_upstream_sources_missing_url_skipped(self):
|
|
"""Test that sources without URL are skipped."""
|
|
from app.config import parse_upstream_sources_from_env
|
|
import os
|
|
|
|
# Source without URL should be skipped
|
|
test_env = {
|
|
"ORCHARD_UPSTREAM__NO_URL__TYPE": "npm",
|
|
"ORCHARD_UPSTREAM__NO_URL__ENABLED": "true",
|
|
}
|
|
|
|
original_env = {}
|
|
for key in test_env:
|
|
original_env[key] = os.environ.get(key)
|
|
os.environ[key] = test_env[key]
|
|
|
|
try:
|
|
sources = parse_upstream_sources_from_env()
|
|
# Should not include the source without URL
|
|
no_url_source = next((s for s in sources if "no-url" in s.name), None)
|
|
assert no_url_source is None
|
|
finally:
|
|
for key, value in original_env.items():
|
|
if value is None:
|
|
os.environ.pop(key, None)
|
|
else:
|
|
os.environ[key] = value
|
|
|
|
def test_parse_upstream_sources_defaults(self):
|
|
"""Test that defaults are applied for optional fields."""
|
|
from app.config import parse_upstream_sources_from_env
|
|
import os
|
|
|
|
test_env = {
|
|
"ORCHARD_UPSTREAM__DEFAULTS_TEST__URL": "https://example.com",
|
|
}
|
|
|
|
original_env = {}
|
|
for key in test_env:
|
|
original_env[key] = os.environ.get(key)
|
|
os.environ[key] = test_env[key]
|
|
|
|
try:
|
|
sources = parse_upstream_sources_from_env()
|
|
test_source = next((s for s in sources if "defaults" in s.name), None)
|
|
assert test_source is not None
|
|
# Check defaults
|
|
assert test_source.source_type == "generic"
|
|
assert test_source.enabled is True
|
|
assert test_source.auth_type == "none"
|
|
assert test_source.priority == 100
|
|
finally:
|
|
for key, value in original_env.items():
|
|
if value is None:
|
|
os.environ.pop(key, None)
|
|
else:
|
|
os.environ[key] = value
|
|
|
|
|
|
class TestEnvSourceToResponse:
|
|
"""Tests for converting env sources to API response format."""
|
|
|
|
def test_env_source_to_response_format(self):
|
|
"""Test that env source response has correct format."""
|
|
from app.config import EnvUpstreamSource
|
|
|
|
source = EnvUpstreamSource(
|
|
name="test-source",
|
|
url="https://example.com",
|
|
source_type="npm",
|
|
enabled=True,
|
|
auth_type="basic",
|
|
username="user",
|
|
password="pass",
|
|
priority=50,
|
|
)
|
|
|
|
assert source.name == "test-source"
|
|
assert source.url == "https://example.com"
|
|
assert source.source_type == "npm"
|
|
assert source.enabled is True
|
|
assert source.auth_type == "basic"
|
|
assert source.username == "user"
|
|
assert source.password == "pass"
|
|
assert source.priority == 50
|
|
assert source.source == "env"
|
|
|
|
|
|
class TestUpstreamSourceResponseSource:
|
|
"""Tests for the source field in upstream source responses."""
|
|
|
|
@pytest.mark.integration
|
|
def test_db_sources_have_database_source_field(self, integration_client, unique_test_id):
|
|
"""Test that database-defined sources have source='database'."""
|
|
source_name = f"test-db-source-{unique_test_id}"
|
|
|
|
# Create source via API (stored in DB)
|
|
response = integration_client.post(
|
|
"/api/v1/admin/upstream-sources",
|
|
json={
|
|
"name": source_name,
|
|
"source_type": "generic",
|
|
"url": "https://example.com",
|
|
},
|
|
)
|
|
assert response.status_code == 201
|
|
source_id = response.json()["id"]
|
|
|
|
# Get source - should have source="database"
|
|
response = integration_client.get(f"/api/v1/admin/upstream-sources/{source_id}")
|
|
assert response.status_code == 200
|
|
assert response.json()["source"] == "database"
|
|
|
|
# List sources - should have source field
|
|
response = integration_client.get("/api/v1/admin/upstream-sources")
|
|
assert response.status_code == 200
|
|
db_source = next((s for s in response.json() if s["id"] == source_id), None)
|
|
assert db_source is not None
|
|
assert db_source["source"] == "database"
|
|
|
|
# Clean up
|
|
integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}")
|
|
|
|
|
|
class TestCacheSettingsEnvOverride:
|
|
"""Tests for cache settings environment variable override fields."""
|
|
|
|
@pytest.mark.integration
|
|
def test_cache_settings_has_env_override_fields(self, integration_client):
|
|
"""Test that cache settings response includes env override fields."""
|
|
response = integration_client.get("/api/v1/admin/cache-settings")
|
|
assert response.status_code == 200
|
|
|
|
data = response.json()
|
|
# These fields should exist (may be null if no env override)
|
|
assert "auto_create_system_projects_env_override" in data
|