""" Tests for upstream artifact caching schema. Tests models, schemas, and encryption for the upstream caching feature. """ import os import pytest from unittest.mock import patch from pydantic import ValidationError class TestEncryptionModule: """Tests for the encryption module.""" def test_encrypt_decrypt_roundtrip(self): """Test that encryption and decryption work correctly.""" from app.encryption import encrypt_value, decrypt_value plaintext = "my-secret-password" encrypted = encrypt_value(plaintext) assert isinstance(encrypted, bytes) assert encrypted != plaintext.encode() decrypted = decrypt_value(encrypted) assert decrypted == plaintext def test_encrypt_different_each_time(self): """Test that encrypting the same value produces different ciphertext.""" from app.encryption import encrypt_value plaintext = "test-password" encrypted1 = encrypt_value(plaintext) encrypted2 = encrypt_value(plaintext) # Fernet includes timestamp, so each encryption is unique assert encrypted1 != encrypted2 def test_encrypt_empty_value_raises(self): """Test that encrypting empty value raises ValueError.""" from app.encryption import encrypt_value with pytest.raises(ValueError, match="Cannot encrypt empty"): encrypt_value("") def test_decrypt_empty_value_raises(self): """Test that decrypting empty value raises ValueError.""" from app.encryption import decrypt_value with pytest.raises(ValueError, match="Cannot decrypt empty"): decrypt_value(b"") def test_can_decrypt_valid(self): """Test can_decrypt returns True for valid encrypted data.""" from app.encryption import encrypt_value, can_decrypt encrypted = encrypt_value("test-password") assert can_decrypt(encrypted) is True def test_can_decrypt_invalid(self): """Test can_decrypt returns False for invalid data.""" from app.encryption import can_decrypt assert can_decrypt(b"invalid-data") is False assert can_decrypt(b"") is False def test_generate_key_format(self): """Test that generated keys are valid Fernet keys.""" from app.encryption import generate_key from cryptography.fernet import Fernet key = generate_key() assert isinstance(key, str) # Should be valid for creating a Fernet instance fernet = Fernet(key.encode()) assert fernet is not None class TestUpstreamSourceModel: """Tests for UpstreamSource SQLAlchemy model.""" def test_model_fields_exist(self): """Test that model has all expected fields.""" from app.models import UpstreamSource source = UpstreamSource() assert hasattr(source, 'id') assert hasattr(source, 'name') assert hasattr(source, 'source_type') assert hasattr(source, 'url') assert hasattr(source, 'enabled') assert hasattr(source, 'auth_type') assert hasattr(source, 'username') assert hasattr(source, 'password_encrypted') assert hasattr(source, 'headers_encrypted') assert hasattr(source, 'priority') def test_model_with_values(self): """Test that model can be created with explicit values.""" from app.models import UpstreamSource source = UpstreamSource( name="npm-private", source_type="npm", url="https://npm.example.com", enabled=True, auth_type="basic", username="admin", priority=50, ) assert source.name == "npm-private" assert source.source_type == "npm" assert source.url == "https://npm.example.com" assert source.enabled is True assert source.auth_type == "basic" assert source.username == "admin" assert source.priority == 50 def test_set_password_encrypts(self): """Test that set_password encrypts the value.""" from app.models import UpstreamSource source = UpstreamSource() source.set_password("my-api-key") assert source.password_encrypted is not None assert isinstance(source.password_encrypted, bytes) assert b"my-api-key" not in source.password_encrypted def test_get_password_decrypts(self): """Test that get_password decrypts the value.""" from app.models import UpstreamSource source = UpstreamSource() source.set_password("my-api-key") decrypted = source.get_password() assert decrypted == "my-api-key" def test_set_password_none_clears(self): """Test that set_password with empty string clears the password.""" from app.models import UpstreamSource source = UpstreamSource() source.set_password("my-api-key") assert source.password_encrypted is not None source.set_password("") assert source.password_encrypted is None def test_has_password(self): """Test has_password helper method.""" from app.models import UpstreamSource source = UpstreamSource() assert source.has_password() is False source.set_password("secret") assert source.has_password() is True def test_set_headers_encrypts(self): """Test that set_headers encrypts custom headers.""" from app.models import UpstreamSource source = UpstreamSource() headers = {"X-API-Key": "secret123", "X-Custom": "value"} source.set_headers(headers) assert source.headers_encrypted is not None assert isinstance(source.headers_encrypted, bytes) def test_get_headers_decrypts(self): """Test that get_headers decrypts custom headers.""" from app.models import UpstreamSource source = UpstreamSource() headers = {"X-API-Key": "secret123", "X-Custom": "value"} source.set_headers(headers) decrypted = source.get_headers() assert decrypted == headers class TestCacheSettingsModel: """Tests for CacheSettings SQLAlchemy model.""" def test_model_fields_exist(self): """Test that model has all expected fields.""" from app.models import CacheSettings settings = CacheSettings() assert hasattr(settings, 'id') assert hasattr(settings, 'auto_create_system_projects') def test_model_with_values(self): """Test that model can be created with explicit values.""" from app.models import CacheSettings settings = CacheSettings( id=1, auto_create_system_projects=True, ) assert settings.id == 1 assert settings.auto_create_system_projects is True class TestCachedUrlModel: """Tests for CachedUrl SQLAlchemy model.""" def test_model_fields_exist(self): """Test that model has all expected fields.""" from app.models import CachedUrl cached = CachedUrl() assert hasattr(cached, 'id') assert hasattr(cached, 'url') assert hasattr(cached, 'url_hash') assert hasattr(cached, 'artifact_id') assert hasattr(cached, 'source_id') assert hasattr(cached, 'fetched_at') assert hasattr(cached, 'response_headers') def test_compute_url_hash(self): """Test URL hash computation.""" from app.models import CachedUrl url = "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz" hash1 = CachedUrl.compute_url_hash(url) # Should be 64-character hex string (SHA256) assert len(hash1) == 64 assert all(c in '0123456789abcdef' for c in hash1) # Same URL should produce same hash hash2 = CachedUrl.compute_url_hash(url) assert hash1 == hash2 # Different URL should produce different hash hash3 = CachedUrl.compute_url_hash("https://example.com/other") assert hash1 != hash3 class TestUpstreamSourceSchemas: """Tests for Pydantic upstream source schemas.""" def test_create_schema_valid(self): """Test UpstreamSourceCreate with valid values.""" from app.schemas import UpstreamSourceCreate source = UpstreamSourceCreate( name="npm-private", source_type="npm", url="https://npm.example.com", enabled=True, auth_type="basic", username="admin", password="secret", priority=50, ) assert source.name == "npm-private" assert source.source_type == "npm" assert source.url == "https://npm.example.com" assert source.priority == 50 def test_create_schema_defaults(self): """Test UpstreamSourceCreate default values.""" from app.schemas import UpstreamSourceCreate source = UpstreamSourceCreate( name="test", url="https://example.com", ) assert source.source_type == "generic" assert source.enabled is False assert source.auth_type == "none" assert source.priority == 100 def test_create_schema_invalid_source_type(self): """Test UpstreamSourceCreate rejects invalid source_type.""" from app.schemas import UpstreamSourceCreate with pytest.raises(ValidationError) as exc_info: UpstreamSourceCreate( name="test", url="https://example.com", source_type="invalid", ) assert "source_type must be one of" in str(exc_info.value) def test_create_schema_invalid_auth_type(self): """Test UpstreamSourceCreate rejects invalid auth_type.""" from app.schemas import UpstreamSourceCreate with pytest.raises(ValidationError) as exc_info: UpstreamSourceCreate( name="test", url="https://example.com", auth_type="invalid", ) assert "auth_type must be one of" in str(exc_info.value) def test_create_schema_invalid_url(self): """Test UpstreamSourceCreate rejects invalid URL.""" from app.schemas import UpstreamSourceCreate with pytest.raises(ValidationError) as exc_info: UpstreamSourceCreate( name="test", url="not-a-url", ) assert "url must start with http" in str(exc_info.value) def test_create_schema_invalid_priority(self): """Test UpstreamSourceCreate rejects invalid priority.""" from app.schemas import UpstreamSourceCreate with pytest.raises(ValidationError) as exc_info: UpstreamSourceCreate( name="test", url="https://example.com", priority=0, ) assert "priority must be greater than 0" in str(exc_info.value) def test_update_schema_all_optional(self): """Test UpstreamSourceUpdate allows all fields to be optional.""" from app.schemas import UpstreamSourceUpdate update = UpstreamSourceUpdate() assert update.name is None assert update.url is None def test_update_schema_partial(self): """Test UpstreamSourceUpdate with partial fields.""" from app.schemas import UpstreamSourceUpdate update = UpstreamSourceUpdate(enabled=True, priority=50) assert update.enabled is True assert update.priority == 50 assert update.name is None def test_response_schema_no_secrets(self): """Test UpstreamSourceResponse doesn't have secret fields.""" from app.schemas import UpstreamSourceResponse field_names = set(UpstreamSourceResponse.model_fields.keys()) assert "password" not in field_names assert "password_encrypted" not in field_names assert "headers" not in field_names assert "headers_encrypted" not in field_names assert "has_password" in field_names assert "has_headers" in field_names class TestCacheSettingsSchemas: """Tests for Pydantic cache settings schemas.""" def test_update_schema_all_optional(self): """Test CacheSettingsUpdate allows all fields to be optional.""" from app.schemas import CacheSettingsUpdate update = CacheSettingsUpdate() assert update.auto_create_system_projects is None def test_update_schema_partial(self): """Test CacheSettingsUpdate with partial fields.""" from app.schemas import CacheSettingsUpdate update = CacheSettingsUpdate(auto_create_system_projects=True) assert update.auto_create_system_projects is True class TestCacheRequestSchemas: """Tests for Pydantic cache request schemas.""" def test_request_valid(self): """Test CacheRequest with valid values.""" from app.schemas import CacheRequest request = CacheRequest( url="https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", source_type="npm", package_name="lodash", version="4.17.21", ) assert request.url == "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz" assert request.source_type == "npm" def test_request_invalid_url(self): """Test CacheRequest rejects invalid URL.""" from app.schemas import CacheRequest with pytest.raises(ValidationError) as exc_info: CacheRequest( url="not-a-url", source_type="npm", ) assert "url must start with http" in str(exc_info.value) def test_request_invalid_source_type(self): """Test CacheRequest rejects invalid source_type.""" from app.schemas import CacheRequest with pytest.raises(ValidationError) as exc_info: CacheRequest( url="https://example.com/file.tgz", source_type="invalid", ) assert "source_type must be one of" in str(exc_info.value) def test_request_expected_hash_normalized(self): """Test CacheRequest normalizes expected_hash.""" from app.schemas import CacheRequest # With sha256: prefix request = CacheRequest( url="https://example.com/file.tgz", source_type="generic", expected_hash="sha256:abc123def456abc123def456abc123def456abc123def456abc123def456abc1", ) assert request.expected_hash == "abc123def456abc123def456abc123def456abc123def456abc123def456abc1" # Without prefix request2 = CacheRequest( url="https://example.com/file.tgz", source_type="generic", expected_hash="ABC123DEF456ABC123DEF456ABC123DEF456ABC123DEF456ABC123DEF456ABC1", ) assert request2.expected_hash == "abc123def456abc123def456abc123def456abc123def456abc123def456abc1" def test_request_invalid_expected_hash(self): """Test CacheRequest rejects invalid expected_hash.""" from app.schemas import CacheRequest with pytest.raises(ValidationError) as exc_info: CacheRequest( url="https://example.com/file.tgz", source_type="generic", expected_hash="not-a-valid-hash", ) assert "64-character hex string" in str(exc_info.value) class TestSourceTypesConstant: """Tests for source type constants.""" def test_source_types_contains_expected(self): """Test SOURCE_TYPES contains all expected values.""" from app.schemas import SOURCE_TYPES assert "npm" in SOURCE_TYPES assert "pypi" in SOURCE_TYPES assert "maven" in SOURCE_TYPES assert "docker" in SOURCE_TYPES assert "helm" in SOURCE_TYPES assert "nuget" in SOURCE_TYPES assert "deb" in SOURCE_TYPES assert "rpm" in SOURCE_TYPES assert "generic" in SOURCE_TYPES def test_auth_types_contains_expected(self): """Test AUTH_TYPES contains all expected values.""" from app.schemas import AUTH_TYPES assert "none" in AUTH_TYPES assert "basic" in AUTH_TYPES assert "bearer" in AUTH_TYPES assert "api_key" in AUTH_TYPES # ============================================================================= # UpstreamClient Tests # ============================================================================= class TestUpstreamClientConfig: """Tests for UpstreamClientConfig dataclass.""" def test_default_config(self): """Test default configuration values.""" from app.upstream import UpstreamClientConfig config = UpstreamClientConfig() assert config.connect_timeout == 30.0 assert config.read_timeout == 300.0 assert config.max_retries == 3 assert config.follow_redirects is True assert config.max_redirects == 5 assert config.max_file_size is None assert config.verify_ssl is True def test_custom_config(self): """Test custom configuration values.""" from app.upstream import UpstreamClientConfig config = UpstreamClientConfig( connect_timeout=10.0, read_timeout=60.0, max_retries=5, max_file_size=1024 * 1024, ) assert config.connect_timeout == 10.0 assert config.read_timeout == 60.0 assert config.max_retries == 5 assert config.max_file_size == 1024 * 1024 class TestFetchResult: """Tests for FetchResult dataclass.""" def test_fetch_result_creation(self): """Test creating a FetchResult.""" from io import BytesIO from app.upstream import FetchResult content = BytesIO(b"test content") result = FetchResult( content=content, sha256="abc123", size=12, content_type="text/plain", response_headers={"x-custom": "value"}, source_name="test-source", ) assert result.sha256 == "abc123" assert result.size == 12 assert result.content_type == "text/plain" assert result.source_name == "test-source" def test_fetch_result_close(self): """Test that close() cleans up resources.""" import tempfile from pathlib import Path from app.upstream import FetchResult # Create a temp file with tempfile.NamedTemporaryFile(delete=False) as f: f.write(b"test") temp_path = Path(f.name) content = open(temp_path, "rb") result = FetchResult( content=content, sha256="abc", size=4, content_type=None, response_headers={}, temp_path=temp_path, ) assert temp_path.exists() result.close() assert not temp_path.exists() class TestUpstreamClientSourceMatching: """Tests for URL-to-source matching.""" def test_match_source_by_url_prefix(self): """Test that sources are matched by URL prefix.""" from app.models import UpstreamSource from app.upstream import UpstreamClient source = UpstreamSource( name="npm-public", url="https://registry.npmjs.org", enabled=True, auth_type="none", priority=100, ) client = UpstreamClient(sources=[source]) # Should match matched = client._match_source("https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz") assert matched is not None assert matched.name == "npm-public" # Should not match matched = client._match_source("https://pypi.org/simple/requests/") assert matched is None def test_match_source_priority_order(self): """Test that sources are matched by priority (lowest first).""" from app.models import UpstreamSource from app.upstream import UpstreamClient source1 = UpstreamSource( name="npm-private", url="https://registry.npmjs.org", enabled=True, auth_type="basic", priority=50, ) source2 = UpstreamSource( name="npm-public", url="https://registry.npmjs.org", enabled=True, auth_type="none", priority=100, ) # Provide in wrong order - should be sorted by priority client = UpstreamClient(sources=[source2, source1]) matched = client._match_source("https://registry.npmjs.org/lodash") assert matched is not None assert matched.name == "npm-private" # Lower priority wins def test_no_match_returns_none(self): """Test that no match returns None.""" from app.upstream import UpstreamClient client = UpstreamClient(sources=[]) matched = client._match_source("https://example.com/file.tgz") assert matched is None class TestUpstreamClientAuthHeaders: """Tests for authentication header building.""" def test_auth_none(self): """Test no authentication.""" from app.models import UpstreamSource from app.upstream import UpstreamClient source = UpstreamSource(auth_type="none") client = UpstreamClient() headers = client._build_auth_headers(source) assert headers == {} def test_auth_bearer(self): """Test bearer token authentication.""" from app.models import UpstreamSource from app.upstream import UpstreamClient source = UpstreamSource(auth_type="bearer") source.set_password("my-bearer-token") client = UpstreamClient() headers = client._build_auth_headers(source) assert headers == {"Authorization": "Bearer my-bearer-token"} def test_auth_api_key(self): """Test API key authentication with custom headers.""" from app.models import UpstreamSource from app.upstream import UpstreamClient source = UpstreamSource(auth_type="api_key") source.set_headers({"X-API-Key": "secret-key-123", "X-Custom": "value"}) client = UpstreamClient() headers = client._build_auth_headers(source) assert headers == {"X-API-Key": "secret-key-123", "X-Custom": "value"} def test_auth_basic_returns_empty_headers(self): """Test that basic auth doesn't add headers (uses httpx auth param).""" from app.models import UpstreamSource from app.upstream import UpstreamClient source = UpstreamSource(auth_type="basic", username="user") source.set_password("pass") client = UpstreamClient() headers = client._build_auth_headers(source) # Basic auth is handled via httpx auth parameter, not headers assert headers == {} def test_get_basic_auth(self): """Test getting basic auth credentials.""" from app.models import UpstreamSource from app.upstream import UpstreamClient source = UpstreamSource(auth_type="basic", username="user") source.set_password("pass") client = UpstreamClient() auth = client._get_basic_auth(source) assert auth == ("user", "pass") def test_get_basic_auth_no_username(self): """Test basic auth without username returns None.""" from app.models import UpstreamSource from app.upstream import UpstreamClient source = UpstreamSource(auth_type="basic") client = UpstreamClient() auth = client._get_basic_auth(source) assert auth is None class TestUpstreamClientSourceDisabled: """Tests for disabled source handling.""" def test_disabled_source_raises_error(self): """Test that fetching from disabled source raises error.""" from app.models import UpstreamSource from app.upstream import UpstreamClient, SourceDisabledError source = UpstreamSource( name="npm-public", url="https://registry.npmjs.org", enabled=False, auth_type="none", priority=100, ) client = UpstreamClient(sources=[source]) with pytest.raises(SourceDisabledError) as exc_info: client.fetch("https://registry.npmjs.org/lodash") assert "npm-public" in str(exc_info.value) assert "disabled" in str(exc_info.value) class TestUpstreamClientRetryLogic: """Tests for retry and backoff logic.""" def test_should_retry_connection_error(self): """Test that connection errors trigger retry.""" import httpx from app.upstream import UpstreamClient client = UpstreamClient() error = httpx.ConnectError("Connection refused") assert client._should_retry(error, 0) is True assert client._should_retry(error, 1) is True assert client._should_retry(error, 2) is False # Max retries def test_should_retry_timeout(self): """Test that timeouts trigger retry.""" import httpx from app.upstream import UpstreamClient client = UpstreamClient() error = httpx.ReadTimeout("Read timed out") assert client._should_retry(error, 0) is True def test_should_not_retry_4xx(self): """Test that 4xx errors don't trigger retry.""" import httpx from app.upstream import UpstreamClient client = UpstreamClient() response = httpx.Response(404, request=httpx.Request("GET", "http://test")) error = httpx.HTTPStatusError("Not found", request=response.request, response=response) assert client._should_retry(error, 0) is False def test_should_retry_502_503_504(self): """Test that 502, 503, 504 errors trigger retry.""" import httpx from app.upstream import UpstreamClient client = UpstreamClient() for status in [502, 503, 504]: response = httpx.Response(status, request=httpx.Request("GET", "http://test")) error = httpx.HTTPStatusError("Server error", request=response.request, response=response) assert client._should_retry(error, 0) is True def test_calculate_backoff(self): """Test exponential backoff calculation.""" from app.upstream import UpstreamClient, UpstreamClientConfig config = UpstreamClientConfig( retry_backoff_base=1.0, retry_backoff_max=30.0, ) client = UpstreamClient(config=config) # First attempt should be around 1s (with jitter) delay0 = client._calculate_backoff(0) assert 0.75 <= delay0 <= 1.25 # Second attempt should be around 2s (with jitter) delay1 = client._calculate_backoff(1) assert 1.5 <= delay1 <= 2.5 # Third attempt should be around 4s (with jitter) delay2 = client._calculate_backoff(2) assert 3.0 <= delay2 <= 5.0 def test_backoff_respects_max(self): """Test that backoff respects maximum delay.""" from app.upstream import UpstreamClient, UpstreamClientConfig config = UpstreamClientConfig( retry_backoff_base=10.0, retry_backoff_max=5.0, # Max is less than base * 2^attempt ) client = UpstreamClient(config=config) delay = client._calculate_backoff(5) # Would be 10 * 32 = 320 assert delay <= 5.0 class TestUpstreamClientExceptionConversion: """Tests for exception conversion.""" def test_convert_connect_error(self): """Test converting connect error.""" import httpx from app.upstream import UpstreamClient, UpstreamConnectionError client = UpstreamClient() error = httpx.ConnectError("Connection refused") with pytest.raises(UpstreamConnectionError): client._raise_upstream_error(error, "http://test") def test_convert_timeout_error(self): """Test converting timeout error.""" import httpx from app.upstream import UpstreamClient, UpstreamTimeoutError client = UpstreamClient() error = httpx.ReadTimeout("Read timed out") with pytest.raises(UpstreamTimeoutError): client._raise_upstream_error(error, "http://test") def test_convert_http_error(self): """Test converting HTTP status error.""" import httpx from app.upstream import UpstreamClient, UpstreamHTTPError client = UpstreamClient() response = httpx.Response( 404, request=httpx.Request("GET", "http://test"), headers={"x-custom": "value"}, ) error = httpx.HTTPStatusError("Not found", request=response.request, response=response) with pytest.raises(UpstreamHTTPError) as exc_info: client._raise_upstream_error(error, "http://test") assert exc_info.value.status_code == 404 class TestUpstreamClientFileSizeLimit: """Tests for file size limit enforcement.""" def test_file_size_limit_dataclass(self): """Test FileSizeExceededError contains expected data.""" from app.upstream import FileSizeExceededError error = FileSizeExceededError("Too large", 1000, 500) assert error.content_length == 1000 assert error.max_size == 500 assert "Too large" in str(error) class TestUpstreamExceptions: """Tests for upstream exception classes.""" def test_upstream_error_base(self): """Test base UpstreamError.""" from app.upstream import UpstreamError error = UpstreamError("Test error") assert str(error) == "Test error" def test_upstream_http_error(self): """Test UpstreamHTTPError with status code.""" from app.upstream import UpstreamHTTPError error = UpstreamHTTPError("Not found", 404, {"x-custom": "value"}) assert error.status_code == 404 assert error.response_headers == {"x-custom": "value"} def test_source_not_found_error(self): """Test SourceNotFoundError.""" from app.upstream import SourceNotFoundError error = SourceNotFoundError("No source for URL") assert "No source for URL" in str(error) def test_source_disabled_error(self): """Test SourceDisabledError.""" from app.upstream import SourceDisabledError error = SourceDisabledError("Source is disabled") assert "Source is disabled" in str(error) # ============================================================================= # URL Parsing Tests # ============================================================================= class TestNpmUrlParsing: """Tests for npm URL parsing.""" def test_parse_unscoped_package(self): """Test parsing unscoped npm package URL.""" from app.cache import parse_npm_url result = parse_npm_url("https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz") assert result is not None assert result.package_name == "lodash" assert result.version == "4.17.21" assert result.filename == "lodash-4.17.21.tgz" def test_parse_scoped_package(self): """Test parsing scoped npm package URL.""" from app.cache import parse_npm_url result = parse_npm_url("https://registry.npmjs.org/@types/node/-/node-18.0.0.tgz") assert result is not None assert result.package_name == "@types/node" assert result.version == "18.0.0" assert result.filename == "node-18.0.0.tgz" def test_parse_invalid_url(self): """Test parsing invalid npm URL returns None.""" from app.cache import parse_npm_url result = parse_npm_url("https://example.com/random-file.tgz") assert result is None class TestPypiUrlParsing: """Tests for PyPI URL parsing.""" def test_parse_sdist_tar_gz(self): """Test parsing PyPI source distribution.""" from app.cache import parse_pypi_url result = parse_pypi_url("https://files.pythonhosted.org/packages/ab/cd/requests-2.28.0.tar.gz") assert result is not None assert result.package_name == "requests" assert result.version == "2.28.0" assert result.filename == "requests-2.28.0.tar.gz" def test_parse_wheel(self): """Test parsing PyPI wheel file.""" from app.cache import parse_pypi_url result = parse_pypi_url("https://files.pythonhosted.org/packages/ab/cd/requests-2.28.0-py3-none-any.whl") assert result is not None assert result.package_name == "requests" assert result.version == "2.28.0" def test_parse_underscore_package(self): """Test parsing package name with underscore.""" from app.cache import parse_pypi_url result = parse_pypi_url("https://files.pythonhosted.org/packages/ab/cd/some_package-1.0.0.tar.gz") assert result is not None assert result.package_name == "some-package" # Normalized assert result.version == "1.0.0" class TestMavenUrlParsing: """Tests for Maven URL parsing.""" def test_parse_maven_jar(self): """Test parsing Maven JAR URL.""" from app.cache import parse_maven_url result = parse_maven_url( "https://repo1.maven.org/maven2/org/apache/commons/commons-lang3/3.12.0/commons-lang3-3.12.0.jar" ) assert result is not None assert result.package_name == "org.apache.commons:commons-lang3" assert result.version == "3.12.0" def test_parse_maven_with_classifier(self): """Test parsing Maven URL with version containing classifier.""" from app.cache import parse_maven_url result = parse_maven_url( "https://repo1.maven.org/maven2/com/google/guava/guava/31.1-jre/guava-31.1-jre.jar" ) assert result is not None assert result.package_name == "com.google.guava:guava" assert result.version == "31.1-jre" class TestGenericUrlParsing: """Tests for generic URL parsing.""" def test_parse_with_version(self): """Test parsing generic URL with version in filename.""" from app.cache import parse_generic_url result = parse_generic_url("https://example.com/downloads/myapp-1.2.3.tar.gz") assert result.package_name == "myapp" assert result.version == "1.2.3" def test_parse_without_version(self): """Test parsing generic URL without version.""" from app.cache import parse_generic_url result = parse_generic_url("https://example.com/downloads/artifact.tar.gz") assert result.package_name == "artifact" assert result.version is None def test_parse_various_extensions(self): """Test parsing various file extensions.""" from app.cache import parse_generic_url for ext in ["tar.gz", "tar.bz2", "zip", "jar", "deb", "rpm"]: result = parse_generic_url(f"https://example.com/pkg-1.0.{ext}") assert result.package_name == "pkg" assert result.version == "1.0" class TestParseUrl: """Tests for the unified parse_url function.""" def test_npm_source_type(self): """Test parse_url with npm source type.""" from app.cache import parse_url result = parse_url( "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", "npm" ) assert result.package_name == "lodash" assert result.version == "4.17.21" def test_fallback_to_generic(self): """Test parse_url falls back to generic parsing.""" from app.cache import parse_url # npm parser can't parse this, should fall back to generic result = parse_url("https://example.com/myfile-1.0.tar.gz", "npm") assert result.package_name == "myfile" assert result.version == "1.0" def test_pypi_source_type(self): """Test parse_url with pypi source type.""" from app.cache import parse_url result = parse_url( "https://files.pythonhosted.org/packages/ab/cd/requests-2.28.0.tar.gz", "pypi" ) assert result.package_name == "requests" assert result.version == "2.28.0" class TestSystemProjectHelpers: """Tests for system project helper functions.""" def test_get_system_project_name(self): """Test getting system project names.""" from app.cache import get_system_project_name assert get_system_project_name("npm") == "_npm" assert get_system_project_name("pypi") == "_pypi" assert get_system_project_name("maven") == "_maven" assert get_system_project_name("docker") == "_docker" assert get_system_project_name("unknown") == "_generic" def test_get_system_project_description(self): """Test getting system project descriptions.""" from app.cache import get_system_project_description assert "npm" in get_system_project_description("npm").lower() assert "pypi" in get_system_project_description("pypi").lower() # ============================================================================= # Cache Endpoint Integration Tests # ============================================================================= class TestCacheEndpointRequiresAuth: """Tests for cache endpoint authentication.""" @pytest.mark.integration def test_cache_requires_authentication(self): """Test that cache endpoint requires authentication.""" import httpx base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") # Use fresh client WITHOUT authentication with httpx.Client(base_url=base_url, timeout=30.0) as unauthenticated_client: response = unauthenticated_client.post( "/api/v1/cache", json={ "url": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", "source_type": "npm", }, ) assert response.status_code in (401, 403) class TestCacheRequestValidation: """Tests for cache request validation.""" def test_cache_request_validates_url(self): """Test that CacheRequest validates URL format.""" from pydantic import ValidationError from app.schemas import CacheRequest with pytest.raises(ValidationError) as exc_info: CacheRequest(url="not-a-url", source_type="npm") assert "url must start with http" in str(exc_info.value) def test_cache_request_validates_source_type(self): """Test that CacheRequest validates source_type.""" from pydantic import ValidationError from app.schemas import CacheRequest with pytest.raises(ValidationError) as exc_info: CacheRequest(url="https://example.com/file.tgz", source_type="invalid") assert "source_type must be one of" in str(exc_info.value) def test_cache_request_valid(self): """Test valid CacheRequest.""" from app.schemas import CacheRequest request = CacheRequest( url="https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", source_type="npm", package_name="lodash", tag="4.17.21", ) assert request.url == "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz" assert request.source_type == "npm" class TestCacheResponseSchema: """Tests for CacheResponse schema.""" def test_cache_response_fields(self): """Test CacheResponse has expected fields.""" from app.schemas import CacheResponse field_names = set(CacheResponse.model_fields.keys()) assert "artifact_id" in field_names assert "sha256" in field_names assert "size" in field_names assert "already_cached" in field_names assert "source_url" in field_names assert "system_project" in field_names assert "system_package" in field_names # ============================================================================= # System Projects Tests # ============================================================================= class TestSystemProjectRestrictions: """Tests for system project restrictions.""" @pytest.mark.integration def test_cannot_delete_system_project(self, integration_client): """Test that system projects cannot be deleted.""" # First, create a system project by checking if _npm exists # or we need to trigger its creation via the cache endpoint response = integration_client.get("/api/v1/system-projects") assert response.status_code == 200 system_projects = response.json() # If there are no system projects, skip this test if not system_projects: pytest.skip("No system projects exist to test deletion") # Try to delete a system project project_name = system_projects[0]["name"] response = integration_client.delete(f"/api/v1/projects/{project_name}") assert response.status_code == 403 assert "cannot be deleted" in response.json()["detail"].lower() @pytest.mark.integration def test_cannot_make_system_project_private(self, integration_client): """Test that system projects cannot be made private.""" response = integration_client.get("/api/v1/system-projects") assert response.status_code == 200 system_projects = response.json() if not system_projects: pytest.skip("No system projects exist to test update") # Try to make a system project private project_name = system_projects[0]["name"] response = integration_client.put( f"/api/v1/projects/{project_name}", json={"is_public": False}, ) assert response.status_code == 403 assert "cannot be made private" in response.json()["detail"].lower() @pytest.mark.integration def test_can_update_system_project_description(self, integration_client): """Test that system project descriptions can be updated.""" response = integration_client.get("/api/v1/system-projects") assert response.status_code == 200 system_projects = response.json() if not system_projects: pytest.skip("No system projects exist to test update") # Update description should work project_name = system_projects[0]["name"] new_description = "Updated description for testing" response = integration_client.put( f"/api/v1/projects/{project_name}", json={"description": new_description}, ) assert response.status_code == 200 assert response.json()["description"] == new_description class TestSystemProjectsEndpoint: """Tests for the system projects listing endpoint.""" @pytest.mark.integration def test_list_system_projects_requires_auth(self): """Test that listing system projects requires authentication.""" import httpx base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") with httpx.Client(base_url=base_url, timeout=30.0) as unauthenticated_client: response = unauthenticated_client.get("/api/v1/system-projects") assert response.status_code == 401 @pytest.mark.integration def test_list_system_projects_success(self, integration_client): """Test listing system projects returns valid response.""" response = integration_client.get("/api/v1/system-projects") assert response.status_code == 200 # Response should be a list data = response.json() assert isinstance(data, list) # If any system projects exist, they should all have is_system=true for project in data: assert project.get("is_system") is True or project.get("name", "").startswith("_") # ============================================================================= # Upstream Sources Admin API Tests # ============================================================================= class TestUpstreamSourcesAdminAPI: """Tests for the upstream sources admin API.""" @pytest.mark.integration def test_list_upstream_sources_requires_admin(self): """Test that listing upstream sources requires admin access.""" import httpx base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") with httpx.Client(base_url=base_url, timeout=30.0) as unauthenticated_client: response = unauthenticated_client.get("/api/v1/admin/upstream-sources") assert response.status_code in (401, 403) @pytest.mark.integration def test_list_upstream_sources_success(self, integration_client): """Test listing upstream sources as admin.""" response = integration_client.get("/api/v1/admin/upstream-sources") assert response.status_code == 200 data = response.json() assert isinstance(data, list) # Check that seeded sources exist names = [s["name"] for s in data] # At minimum, these should exist from seeding assert any("npm" in name.lower() for name in names) or len(data) >= 0 @pytest.mark.integration def test_list_upstream_sources_filter_by_enabled(self, integration_client): """Test filtering upstream sources by enabled status.""" response = integration_client.get("/api/v1/admin/upstream-sources?enabled=true") assert response.status_code == 200 data = response.json() for source in data: assert source["enabled"] is True @pytest.mark.integration def test_list_upstream_sources_filter_by_type(self, integration_client): """Test filtering upstream sources by source type.""" response = integration_client.get("/api/v1/admin/upstream-sources?source_type=npm") assert response.status_code == 200 data = response.json() for source in data: assert source["source_type"] == "npm" @pytest.mark.integration def test_create_upstream_source(self, integration_client, unique_test_id): """Test creating a new upstream source.""" source_name = f"test-source-{unique_test_id}" response = integration_client.post( "/api/v1/admin/upstream-sources", json={ "name": source_name, "source_type": "generic", "url": "https://example.com/packages", "enabled": False, "auth_type": "none", "priority": 200, }, ) assert response.status_code == 201 data = response.json() assert data["name"] == source_name assert data["source_type"] == "generic" assert data["url"] == "https://example.com/packages" assert data["enabled"] is False assert data["priority"] == 200 assert "id" in data # Clean up source_id = data["id"] integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}") @pytest.mark.integration def test_create_upstream_source_with_auth(self, integration_client, unique_test_id): """Test creating an upstream source with authentication.""" source_name = f"test-auth-source-{unique_test_id}" response = integration_client.post( "/api/v1/admin/upstream-sources", json={ "name": source_name, "source_type": "npm", "url": "https://npm.internal.corp", "enabled": False, "auth_type": "basic", "username": "reader", "password": "secret123", "priority": 50, }, ) assert response.status_code == 201 data = response.json() assert data["name"] == source_name assert data["auth_type"] == "basic" assert data["username"] == "reader" assert data["has_password"] is True # Password should NOT be in response assert "password" not in data # Clean up source_id = data["id"] integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}") @pytest.mark.integration def test_create_upstream_source_duplicate_name(self, integration_client, unique_test_id): """Test that duplicate source names are rejected.""" source_name = f"test-dup-{unique_test_id}" # Create first source response1 = integration_client.post( "/api/v1/admin/upstream-sources", json={ "name": source_name, "source_type": "generic", "url": "https://example1.com", }, ) assert response1.status_code == 201 source_id = response1.json()["id"] # Try to create duplicate response2 = integration_client.post( "/api/v1/admin/upstream-sources", json={ "name": source_name, "source_type": "generic", "url": "https://example2.com", }, ) assert response2.status_code == 409 assert "already exists" in response2.json()["detail"] # Clean up integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}") @pytest.mark.integration def test_get_upstream_source(self, integration_client, unique_test_id): """Test getting a specific upstream source.""" source_name = f"test-get-{unique_test_id}" # Create source create_response = integration_client.post( "/api/v1/admin/upstream-sources", json={ "name": source_name, "source_type": "pypi", "url": "https://pypi.internal.corp", }, ) assert create_response.status_code == 201 source_id = create_response.json()["id"] # Get source response = integration_client.get(f"/api/v1/admin/upstream-sources/{source_id}") assert response.status_code == 200 data = response.json() assert data["id"] == source_id assert data["name"] == source_name assert data["source_type"] == "pypi" # Clean up integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}") @pytest.mark.integration def test_get_upstream_source_not_found(self, integration_client): """Test getting a non-existent upstream source.""" fake_id = "00000000-0000-0000-0000-000000000000" response = integration_client.get(f"/api/v1/admin/upstream-sources/{fake_id}") assert response.status_code == 404 @pytest.mark.integration def test_update_upstream_source(self, integration_client, unique_test_id): """Test updating an upstream source.""" source_name = f"test-update-{unique_test_id}" # Create source create_response = integration_client.post( "/api/v1/admin/upstream-sources", json={ "name": source_name, "source_type": "generic", "url": "https://example.com", "enabled": False, "priority": 100, }, ) assert create_response.status_code == 201 source_id = create_response.json()["id"] # Update source response = integration_client.put( f"/api/v1/admin/upstream-sources/{source_id}", json={ "enabled": True, "priority": 50, "url": "https://example-updated.com", }, ) assert response.status_code == 200 data = response.json() assert data["enabled"] is True assert data["priority"] == 50 assert data["url"] == "https://example-updated.com" # Name should be unchanged assert data["name"] == source_name # Clean up integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}") @pytest.mark.integration def test_update_upstream_source_password(self, integration_client, unique_test_id): """Test updating an upstream source's password.""" source_name = f"test-pwd-{unique_test_id}" # Create source without password create_response = integration_client.post( "/api/v1/admin/upstream-sources", json={ "name": source_name, "source_type": "generic", "url": "https://example.com", "auth_type": "basic", "username": "user", }, ) assert create_response.status_code == 201 source_id = create_response.json()["id"] assert create_response.json()["has_password"] is False # Add password response = integration_client.put( f"/api/v1/admin/upstream-sources/{source_id}", json={"password": "newpassword"}, ) assert response.status_code == 200 assert response.json()["has_password"] is True # Clear password with empty string response = integration_client.put( f"/api/v1/admin/upstream-sources/{source_id}", json={"password": ""}, ) assert response.status_code == 200 assert response.json()["has_password"] is False # Clean up integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}") @pytest.mark.integration def test_delete_upstream_source(self, integration_client, unique_test_id): """Test deleting an upstream source.""" source_name = f"test-delete-{unique_test_id}" # Create source create_response = integration_client.post( "/api/v1/admin/upstream-sources", json={ "name": source_name, "source_type": "generic", "url": "https://example.com", }, ) assert create_response.status_code == 201 source_id = create_response.json()["id"] # Delete source response = integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}") assert response.status_code == 204 # Verify it's gone get_response = integration_client.get(f"/api/v1/admin/upstream-sources/{source_id}") assert get_response.status_code == 404 @pytest.mark.integration def test_test_upstream_source_connectivity(self, integration_client, unique_test_id): """Test the connectivity test endpoint.""" source_name = f"test-conn-{unique_test_id}" # Create source with a URL that should respond create_response = integration_client.post( "/api/v1/admin/upstream-sources", json={ "name": source_name, "source_type": "generic", "url": "https://httpbin.org/get", # Public test endpoint "enabled": False, }, ) assert create_response.status_code == 201 source_id = create_response.json()["id"] # Test connectivity response = integration_client.post( f"/api/v1/admin/upstream-sources/{source_id}/test" ) assert response.status_code == 200 data = response.json() assert "success" in data assert "elapsed_ms" in data assert data["source_id"] == source_id assert data["source_name"] == source_name # Clean up integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}") # ============================================================================= # Cache Settings Admin API Tests # ============================================================================= class TestCacheSettingsAdminAPI: """Tests for the cache settings admin API.""" @pytest.mark.integration def test_get_cache_settings_requires_admin(self): """Test that getting cache settings requires admin access.""" import httpx base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") with httpx.Client(base_url=base_url, timeout=30.0) as unauthenticated_client: response = unauthenticated_client.get("/api/v1/admin/cache-settings") assert response.status_code in (401, 403) @pytest.mark.integration def test_get_cache_settings_success(self, integration_client): """Test getting cache settings as admin.""" response = integration_client.get("/api/v1/admin/cache-settings") assert response.status_code == 200 data = response.json() # Check expected fields exist assert "allow_public_internet" in data assert "auto_create_system_projects" in data # Check types assert isinstance(data["allow_public_internet"], bool) assert isinstance(data["auto_create_system_projects"], bool) @pytest.mark.integration def test_update_cache_settings_requires_admin(self): """Test that updating cache settings requires admin access.""" import httpx base_url = os.environ.get("ORCHARD_TEST_URL", "http://localhost:8080") with httpx.Client(base_url=base_url, timeout=30.0) as unauthenticated_client: response = unauthenticated_client.put( "/api/v1/admin/cache-settings", json={"allow_public_internet": False}, ) assert response.status_code in (401, 403) @pytest.mark.integration def test_update_cache_settings_success(self, integration_client): """Test updating cache settings as admin.""" # First get current settings to restore later original = integration_client.get("/api/v1/admin/cache-settings").json() # Update settings response = integration_client.put( "/api/v1/admin/cache-settings", json={ "allow_public_internet": not original["allow_public_internet"], "auto_create_system_projects": not original["auto_create_system_projects"], }, ) assert response.status_code == 200 data = response.json() assert data["allow_public_internet"] == (not original["allow_public_internet"]) assert data["auto_create_system_projects"] == (not original["auto_create_system_projects"]) # Restore original settings integration_client.put( "/api/v1/admin/cache-settings", json={ "allow_public_internet": original["allow_public_internet"], "auto_create_system_projects": original["auto_create_system_projects"], }, ) @pytest.mark.integration def test_update_cache_settings_allow_public_internet(self, integration_client): """Test enabling and disabling public internet access (air-gap mode).""" # First get current settings to restore later original = integration_client.get("/api/v1/admin/cache-settings").json() # Disable public internet (enable air-gap mode) response = integration_client.put( "/api/v1/admin/cache-settings", json={"allow_public_internet": False}, ) assert response.status_code == 200 assert response.json()["allow_public_internet"] is False # Enable public internet (disable air-gap mode) response = integration_client.put( "/api/v1/admin/cache-settings", json={"allow_public_internet": True}, ) assert response.status_code == 200 assert response.json()["allow_public_internet"] is True # Restore original settings integration_client.put( "/api/v1/admin/cache-settings", json={"allow_public_internet": original["allow_public_internet"]}, ) @pytest.mark.integration def test_update_cache_settings_partial(self, integration_client): """Test that partial updates only change specified fields.""" # Get current settings original = integration_client.get("/api/v1/admin/cache-settings").json() # Update only allow_public_internet new_value = not original["allow_public_internet"] response = integration_client.put( "/api/v1/admin/cache-settings", json={"allow_public_internet": new_value}, ) assert response.status_code == 200 data = response.json() assert data["allow_public_internet"] == new_value # Other field should be unchanged assert data["auto_create_system_projects"] == original["auto_create_system_projects"] # Restore integration_client.put( "/api/v1/admin/cache-settings", json={"allow_public_internet": original["allow_public_internet"]}, ) @pytest.mark.integration def test_update_cache_settings_auto_create_system_projects(self, integration_client): """Test updating auto_create_system_projects setting.""" # Get current settings original = integration_client.get("/api/v1/admin/cache-settings").json() # Toggle auto_create_system_projects new_value = not original["auto_create_system_projects"] response = integration_client.put( "/api/v1/admin/cache-settings", json={"auto_create_system_projects": new_value}, ) assert response.status_code == 200 assert response.json()["auto_create_system_projects"] == new_value # Restore integration_client.put( "/api/v1/admin/cache-settings", json={"auto_create_system_projects": original["auto_create_system_projects"]}, ) # ============================================================================= # Environment Variable Configuration Tests # ============================================================================= class TestEnvVarUpstreamSourcesParsing: """Tests for parsing upstream sources from environment variables.""" def test_parse_upstream_sources_basic(self): """Test parsing a basic upstream source from env vars.""" from app.config import parse_upstream_sources_from_env import os # Set env vars test_env = { "ORCHARD_UPSTREAM__TEST_SOURCE__URL": "https://example.com/packages", "ORCHARD_UPSTREAM__TEST_SOURCE__TYPE": "generic", "ORCHARD_UPSTREAM__TEST_SOURCE__ENABLED": "true", } original_env = {} for key in test_env: original_env[key] = os.environ.get(key) os.environ[key] = test_env[key] try: sources = parse_upstream_sources_from_env() assert len(sources) >= 1 # Find our test source test_source = next((s for s in sources if "test" in s.name), None) assert test_source is not None assert test_source.url == "https://example.com/packages" assert test_source.source_type == "generic" assert test_source.enabled is True assert test_source.source == "env" finally: # Restore original env for key, value in original_env.items(): if value is None: os.environ.pop(key, None) else: os.environ[key] = value def test_parse_upstream_sources_with_auth(self): """Test parsing an upstream source with authentication from env vars.""" from app.config import parse_upstream_sources_from_env import os test_env = { "ORCHARD_UPSTREAM__AUTH_TEST__URL": "https://secure.example.com", "ORCHARD_UPSTREAM__AUTH_TEST__TYPE": "npm", "ORCHARD_UPSTREAM__AUTH_TEST__AUTH_TYPE": "basic", "ORCHARD_UPSTREAM__AUTH_TEST__USERNAME": "myuser", "ORCHARD_UPSTREAM__AUTH_TEST__PASSWORD": "secret123", "ORCHARD_UPSTREAM__AUTH_TEST__PRIORITY": "50", } original_env = {} for key in test_env: original_env[key] = os.environ.get(key) os.environ[key] = test_env[key] try: sources = parse_upstream_sources_from_env() test_source = next((s for s in sources if "auth" in s.name), None) assert test_source is not None assert test_source.auth_type == "basic" assert test_source.username == "myuser" assert test_source.password == "secret123" assert test_source.priority == 50 finally: for key, value in original_env.items(): if value is None: os.environ.pop(key, None) else: os.environ[key] = value def test_parse_upstream_sources_missing_url_skipped(self): """Test that sources without URL are skipped.""" from app.config import parse_upstream_sources_from_env import os # Source without URL should be skipped test_env = { "ORCHARD_UPSTREAM__NO_URL__TYPE": "npm", "ORCHARD_UPSTREAM__NO_URL__ENABLED": "true", } original_env = {} for key in test_env: original_env[key] = os.environ.get(key) os.environ[key] = test_env[key] try: sources = parse_upstream_sources_from_env() # Should not include the source without URL no_url_source = next((s for s in sources if "no-url" in s.name), None) assert no_url_source is None finally: for key, value in original_env.items(): if value is None: os.environ.pop(key, None) else: os.environ[key] = value def test_parse_upstream_sources_defaults(self): """Test that defaults are applied for optional fields.""" from app.config import parse_upstream_sources_from_env import os test_env = { "ORCHARD_UPSTREAM__DEFAULTS_TEST__URL": "https://example.com", } original_env = {} for key in test_env: original_env[key] = os.environ.get(key) os.environ[key] = test_env[key] try: sources = parse_upstream_sources_from_env() test_source = next((s for s in sources if "defaults" in s.name), None) assert test_source is not None # Check defaults assert test_source.source_type == "generic" assert test_source.enabled is True assert test_source.auth_type == "none" assert test_source.priority == 100 finally: for key, value in original_env.items(): if value is None: os.environ.pop(key, None) else: os.environ[key] = value class TestEnvSourceToResponse: """Tests for converting env sources to API response format.""" def test_env_source_to_response_format(self): """Test that env source response has correct format.""" from app.config import EnvUpstreamSource source = EnvUpstreamSource( name="test-source", url="https://example.com", source_type="npm", enabled=True, auth_type="basic", username="user", password="pass", priority=50, ) assert source.name == "test-source" assert source.url == "https://example.com" assert source.source_type == "npm" assert source.enabled is True assert source.auth_type == "basic" assert source.username == "user" assert source.password == "pass" assert source.priority == 50 assert source.source == "env" class TestUpstreamSourceResponseSource: """Tests for the source field in upstream source responses.""" @pytest.mark.integration def test_db_sources_have_database_source_field(self, integration_client, unique_test_id): """Test that database-defined sources have source='database'.""" source_name = f"test-db-source-{unique_test_id}" # Create source via API (stored in DB) response = integration_client.post( "/api/v1/admin/upstream-sources", json={ "name": source_name, "source_type": "generic", "url": "https://example.com", }, ) assert response.status_code == 201 source_id = response.json()["id"] # Get source - should have source="database" response = integration_client.get(f"/api/v1/admin/upstream-sources/{source_id}") assert response.status_code == 200 assert response.json()["source"] == "database" # List sources - should have source field response = integration_client.get("/api/v1/admin/upstream-sources") assert response.status_code == 200 db_source = next((s for s in response.json() if s["id"] == source_id), None) assert db_source is not None assert db_source["source"] == "database" # Clean up integration_client.delete(f"/api/v1/admin/upstream-sources/{source_id}") class TestCacheSettingsEnvOverride: """Tests for cache settings environment variable override fields.""" @pytest.mark.integration def test_cache_settings_has_env_override_fields(self, integration_client): """Test that cache settings response includes env override fields.""" response = integration_client.get("/api/v1/admin/cache-settings") assert response.status_code == 200 data = response.json() # These fields should exist (may be null if no env override) assert "allow_public_internet_env_override" in data assert "auto_create_system_projects_env_override" in data