Metadata database tracks all uploads with project, package, tag, and timestamp queryable via API

This commit is contained in:
Mondo Diaz
2026-01-07 12:31:44 -06:00
parent 81458b3bcb
commit 2f1891cf01
24 changed files with 5044 additions and 2123 deletions

View File

View File

@@ -0,0 +1,271 @@
"""
Unit tests for SQLAlchemy models.
Tests cover:
- Model instantiation and defaults
- Property aliases (sha256, format_metadata)
- Relationship definitions
- Constraint definitions
"""
import pytest
import uuid
from datetime import datetime
class TestArtifactModel:
"""Tests for the Artifact model."""
@pytest.mark.unit
def test_artifact_sha256_property(self):
"""Test sha256 property is an alias for id."""
from app.models import Artifact
artifact = Artifact(
id="a" * 64,
size=1024,
created_by="test-user",
s3_key="fruits/aa/aa/test",
)
assert artifact.sha256 == artifact.id
assert artifact.sha256 == "a" * 64
@pytest.mark.unit
def test_artifact_format_metadata_alias(self):
"""Test format_metadata is an alias for artifact_metadata."""
from app.models import Artifact
test_metadata = {"format": "tarball", "version": "1.0.0"}
artifact = Artifact(
id="b" * 64,
size=2048,
created_by="test-user",
s3_key="fruits/bb/bb/test",
artifact_metadata=test_metadata,
)
assert artifact.format_metadata == test_metadata
assert artifact.format_metadata == artifact.artifact_metadata
@pytest.mark.unit
def test_artifact_format_metadata_setter(self):
"""Test format_metadata setter updates artifact_metadata."""
from app.models import Artifact
artifact = Artifact(
id="c" * 64,
size=512,
created_by="test-user",
s3_key="fruits/cc/cc/test",
)
new_metadata = {"type": "rpm", "arch": "x86_64"}
artifact.format_metadata = new_metadata
assert artifact.artifact_metadata == new_metadata
assert artifact.format_metadata == new_metadata
@pytest.mark.unit
def test_artifact_default_ref_count(self):
"""Test artifact ref_count column has default value of 1."""
from app.models import Artifact
# Check the column definition has the right default
ref_count_col = Artifact.__table__.columns["ref_count"]
assert ref_count_col.default is not None
assert ref_count_col.default.arg == 1
@pytest.mark.unit
def test_artifact_default_metadata_is_dict(self):
"""Test artifact default metadata is an empty dict."""
from app.models import Artifact
artifact = Artifact(
id="e" * 64,
size=100,
created_by="test-user",
s3_key="fruits/ee/ee/test",
)
# Default might be None until saved, but the column default is dict
assert artifact.artifact_metadata is None or isinstance(
artifact.artifact_metadata, dict
)
class TestProjectModel:
"""Tests for the Project model."""
@pytest.mark.unit
def test_project_default_is_public(self):
"""Test project is_public column has default value of True."""
from app.models import Project
# Check the column definition has the right default
is_public_col = Project.__table__.columns["is_public"]
assert is_public_col.default is not None
assert is_public_col.default.arg is True
@pytest.mark.unit
def test_project_uuid_generation(self):
"""Test project generates UUID by default."""
from app.models import Project
project = Project(
name="uuid-test-project",
created_by="test-user",
)
# UUID should be set by default function
assert project.id is not None or hasattr(Project.id, "default")
class TestPackageModel:
"""Tests for the Package model."""
@pytest.mark.unit
def test_package_default_format(self):
"""Test package format column has default value of 'generic'."""
from app.models import Package
# Check the column definition has the right default
format_col = Package.__table__.columns["format"]
assert format_col.default is not None
assert format_col.default.arg == "generic"
@pytest.mark.unit
def test_package_default_platform(self):
"""Test package platform column has default value of 'any'."""
from app.models import Package
# Check the column definition has the right default
platform_col = Package.__table__.columns["platform"]
assert platform_col.default is not None
assert platform_col.default.arg == "any"
class TestTagModel:
"""Tests for the Tag model."""
@pytest.mark.unit
def test_tag_requires_package_id(self):
"""Test tag requires package_id."""
from app.models import Tag
tag = Tag(
name="v1.0.0",
package_id=uuid.uuid4(),
artifact_id="f" * 64,
created_by="test-user",
)
assert tag.package_id is not None
assert tag.artifact_id == "f" * 64
class TestTagHistoryModel:
"""Tests for the TagHistory model."""
@pytest.mark.unit
def test_tag_history_default_change_type(self):
"""Test tag history change_type column has default value of 'update'."""
from app.models import TagHistory
# Check the column definition has the right default
change_type_col = TagHistory.__table__.columns["change_type"]
assert change_type_col.default is not None
assert change_type_col.default.arg == "update"
@pytest.mark.unit
def test_tag_history_allows_null_old_artifact(self):
"""Test tag history allows null old_artifact_id (for create events)."""
from app.models import TagHistory
history = TagHistory(
tag_id=uuid.uuid4(),
old_artifact_id=None,
new_artifact_id="h" * 64,
change_type="create",
changed_by="test-user",
)
assert history.old_artifact_id is None
class TestUploadModel:
"""Tests for the Upload model."""
@pytest.mark.unit
def test_upload_default_deduplicated_is_false(self):
"""Test upload deduplicated column has default value of False."""
from app.models import Upload
# Check the column definition has the right default
deduplicated_col = Upload.__table__.columns["deduplicated"]
assert deduplicated_col.default is not None
assert deduplicated_col.default.arg is False
@pytest.mark.unit
def test_upload_default_checksum_verified_is_true(self):
"""Test upload checksum_verified column has default value of True."""
from app.models import Upload
# Check the column definition has the right default
checksum_verified_col = Upload.__table__.columns["checksum_verified"]
assert checksum_verified_col.default is not None
assert checksum_verified_col.default.arg is True
class TestAccessPermissionModel:
"""Tests for the AccessPermission model."""
@pytest.mark.unit
def test_access_permission_levels(self):
"""Test valid access permission levels."""
from app.models import AccessPermission
# This tests the check constraint values
valid_levels = ["read", "write", "admin"]
for level in valid_levels:
permission = AccessPermission(
project_id=uuid.uuid4(),
user_id="test-user",
level=level,
)
assert permission.level == level
class TestAuditLogModel:
"""Tests for the AuditLog model."""
@pytest.mark.unit
def test_audit_log_required_fields(self):
"""Test audit log has all required fields."""
from app.models import AuditLog
log = AuditLog(
action="project.create",
resource="/projects/test-project",
user_id="test-user",
)
assert log.action == "project.create"
assert log.resource == "/projects/test-project"
assert log.user_id == "test-user"
@pytest.mark.unit
def test_audit_log_optional_details(self):
"""Test audit log can have optional details JSON."""
from app.models import AuditLog
details = {"old_value": "v1", "new_value": "v2"}
log = AuditLog(
action="tag.update",
resource="/projects/test/packages/pkg/tags/latest",
user_id="test-user",
details=details,
)
assert log.details == details

View File

@@ -0,0 +1,439 @@
"""
Unit tests for S3 storage layer.
Tests cover:
- SHA256 hash calculation and consistency
- Hash format validation (64-char hex)
- S3 key generation pattern
- Deduplication behavior (_exists method)
- Storage result computation (MD5, SHA1, size)
- Edge cases (empty files, large files, binary content)
"""
import pytest
import hashlib
import io
from tests.factories import (
compute_sha256,
TEST_CONTENT_HELLO,
TEST_HASH_HELLO,
TEST_CONTENT_BINARY,
TEST_HASH_BINARY,
)
# =============================================================================
# Hash Computation Tests
# =============================================================================
class TestHashComputation:
"""Unit tests for hash calculation functionality."""
@pytest.mark.unit
def test_sha256_consistent_results(self):
"""Test SHA256 hash produces consistent results for identical content."""
content = b"test content for hashing"
# Compute hash multiple times
hash1 = compute_sha256(content)
hash2 = compute_sha256(content)
hash3 = compute_sha256(content)
assert hash1 == hash2 == hash3
@pytest.mark.unit
def test_sha256_different_content_different_hash(self):
"""Test SHA256 produces different hashes for different content."""
content1 = b"content version 1"
content2 = b"content version 2"
hash1 = compute_sha256(content1)
hash2 = compute_sha256(content2)
assert hash1 != hash2
@pytest.mark.unit
def test_sha256_format_64_char_hex(self):
"""Test SHA256 hash is always 64 character lowercase hexadecimal."""
test_cases = [
b"", # Empty
b"a", # Single char
b"Hello, World!", # Normal string
bytes(range(256)), # All byte values
b"x" * 10000, # Larger content
]
for content in test_cases:
hash_value = compute_sha256(content)
# Check length
assert len(hash_value) == 64, (
f"Hash length should be 64, got {len(hash_value)}"
)
# Check lowercase
assert hash_value == hash_value.lower(), "Hash should be lowercase"
# Check hexadecimal
assert all(c in "0123456789abcdef" for c in hash_value), (
"Hash should be hex"
)
@pytest.mark.unit
def test_sha256_known_value(self):
"""Test SHA256 produces expected hash for known input."""
assert compute_sha256(TEST_CONTENT_HELLO) == TEST_HASH_HELLO
@pytest.mark.unit
def test_sha256_binary_content(self):
"""Test SHA256 handles binary content correctly."""
assert compute_sha256(TEST_CONTENT_BINARY) == TEST_HASH_BINARY
# Test with null bytes
content_with_nulls = b"\x00\x00test\x00\x00"
hash_value = compute_sha256(content_with_nulls)
assert len(hash_value) == 64
@pytest.mark.unit
def test_sha256_streaming_computation(self):
"""Test SHA256 can be computed in chunks (streaming)."""
# Large content
chunk_size = 8192
total_size = chunk_size * 10 # 80KB
content = b"x" * total_size
# Direct computation
direct_hash = compute_sha256(content)
# Streaming computation
hasher = hashlib.sha256()
for i in range(0, total_size, chunk_size):
hasher.update(content[i : i + chunk_size])
streaming_hash = hasher.hexdigest()
assert direct_hash == streaming_hash
@pytest.mark.unit
def test_sha256_order_matters(self):
"""Test that content order affects hash (not just content set)."""
content1 = b"AB"
content2 = b"BA"
assert compute_sha256(content1) != compute_sha256(content2)
# =============================================================================
# Storage Hash Computation Tests
# =============================================================================
class TestStorageHashComputation:
"""Tests for hash computation in the storage layer."""
@pytest.mark.unit
def test_storage_computes_sha256(self, mock_storage):
"""Test storage layer correctly computes SHA256 hash."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
assert result.sha256 == TEST_HASH_HELLO
@pytest.mark.unit
def test_storage_computes_md5(self, mock_storage):
"""Test storage layer also computes MD5 hash."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
expected_md5 = hashlib.md5(content).hexdigest()
assert result.md5 == expected_md5
@pytest.mark.unit
def test_storage_computes_sha1(self, mock_storage):
"""Test storage layer also computes SHA1 hash."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
expected_sha1 = hashlib.sha1(content).hexdigest()
assert result.sha1 == expected_sha1
@pytest.mark.unit
def test_storage_returns_correct_size(self, mock_storage):
"""Test storage layer returns correct file size."""
content = b"test content with known size"
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
assert result.size == len(content)
@pytest.mark.unit
def test_storage_generates_correct_s3_key(self, mock_storage):
"""Test storage layer generates correct S3 key pattern."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
# Key should be: fruits/{hash[:2]}/{hash[2:4]}/{hash}
expected_key = (
f"fruits/{TEST_HASH_HELLO[:2]}/{TEST_HASH_HELLO[2:4]}/{TEST_HASH_HELLO}"
)
assert result.s3_key == expected_key
# =============================================================================
# Hash Edge Cases
# =============================================================================
class TestHashEdgeCases:
"""Edge case tests for hash computation."""
@pytest.mark.unit
def test_hash_empty_content_rejected(self, mock_storage):
"""Test that empty content is rejected."""
from app.storage import HashComputationError
file_obj = io.BytesIO(b"")
with pytest.raises(HashComputationError):
mock_storage._store_simple(file_obj)
@pytest.mark.unit
def test_hash_large_file_streaming(self, mock_storage):
"""Test hash computation for large files uses streaming."""
# Create a 10MB file
size = 10 * 1024 * 1024
content = b"x" * size
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
expected_hash = compute_sha256(content)
assert result.sha256 == expected_hash
@pytest.mark.unit
def test_hash_special_bytes(self):
"""Test hash handles all byte values correctly."""
# All possible byte values
content = bytes(range(256))
hash_value = compute_sha256(content)
assert len(hash_value) == 64
assert hash_value == TEST_HASH_BINARY
# =============================================================================
# S3 Existence Check Tests
# =============================================================================
class TestExistsMethod:
"""Tests for the _exists() method that checks S3 object existence."""
@pytest.mark.unit
def test_exists_returns_true_for_existing_key(self, mock_storage, mock_s3_client):
"""Test _exists() returns True when object exists."""
# Pre-populate the mock storage
test_key = "fruits/df/fd/test-hash"
mock_s3_client.objects[test_key] = b"content"
result = mock_storage._exists(test_key)
assert result is True
@pytest.mark.unit
def test_exists_returns_false_for_nonexistent_key(self, mock_storage):
"""Test _exists() returns False when object doesn't exist."""
result = mock_storage._exists("fruits/no/ne/nonexistent-key")
assert result is False
@pytest.mark.unit
def test_exists_handles_404_error(self, mock_storage):
"""Test _exists() handles 404 errors gracefully."""
# The mock client raises ClientError for nonexistent keys
result = mock_storage._exists("fruits/xx/yy/does-not-exist")
assert result is False
# =============================================================================
# S3 Key Generation Tests
# =============================================================================
class TestS3KeyGeneration:
"""Tests for S3 key pattern generation."""
@pytest.mark.unit
def test_s3_key_pattern(self):
"""Test S3 key follows pattern: fruits/{hash[:2]}/{hash[2:4]}/{hash}"""
test_hash = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
expected_key = f"fruits/{test_hash[:2]}/{test_hash[2:4]}/{test_hash}"
# Expected: fruits/ab/cd/abcdef1234567890...
assert expected_key == f"fruits/ab/cd/{test_hash}"
@pytest.mark.unit
def test_s3_key_generation_in_storage(self, mock_storage):
"""Test storage layer generates correct S3 key."""
content = TEST_CONTENT_HELLO
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
expected_key = (
f"fruits/{TEST_HASH_HELLO[:2]}/{TEST_HASH_HELLO[2:4]}/{TEST_HASH_HELLO}"
)
assert result.s3_key == expected_key
@pytest.mark.unit
def test_s3_key_uses_sha256_hash(self, mock_storage):
"""Test S3 key is derived from SHA256 hash."""
content = b"unique test content for key test"
file_obj = io.BytesIO(content)
expected_hash = compute_sha256(content)
result = mock_storage._store_simple(file_obj)
# Key should contain the hash
assert expected_hash in result.s3_key
# =============================================================================
# Deduplication Behavior Tests
# =============================================================================
class TestDeduplicationBehavior:
"""Tests for deduplication (skip upload when exists)."""
@pytest.mark.unit
def test_skips_upload_when_exists(self, mock_storage, mock_s3_client):
"""Test storage skips S3 upload when artifact already exists."""
content = TEST_CONTENT_HELLO
s3_key = (
f"fruits/{TEST_HASH_HELLO[:2]}/{TEST_HASH_HELLO[2:4]}/{TEST_HASH_HELLO}"
)
# Pre-populate storage (simulate existing artifact)
mock_s3_client.objects[s3_key] = content
# Track put_object calls
original_put = mock_s3_client.put_object
put_called = []
def tracked_put(*args, **kwargs):
put_called.append(True)
return original_put(*args, **kwargs)
mock_s3_client.put_object = tracked_put
# Store the same content
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
# put_object should NOT have been called (deduplication)
assert len(put_called) == 0
assert result.sha256 == TEST_HASH_HELLO
@pytest.mark.unit
def test_uploads_when_not_exists(self, mock_storage, mock_s3_client):
"""Test storage uploads to S3 when artifact doesn't exist."""
content = b"brand new unique content"
content_hash = compute_sha256(content)
s3_key = f"fruits/{content_hash[:2]}/{content_hash[2:4]}/{content_hash}"
# Ensure object doesn't exist
assert s3_key not in mock_s3_client.objects
# Store the content
file_obj = io.BytesIO(content)
result = mock_storage._store_simple(file_obj)
# Object should now exist in mock storage
assert s3_key in mock_s3_client.objects
assert mock_s3_client.objects[s3_key] == content
@pytest.mark.unit
def test_returns_same_hash_for_duplicate(self, mock_storage, mock_s3_client):
"""Test storing same content twice returns same hash."""
content = b"content to be stored twice"
# First store
file1 = io.BytesIO(content)
result1 = mock_storage._store_simple(file1)
# Second store (duplicate)
file2 = io.BytesIO(content)
result2 = mock_storage._store_simple(file2)
assert result1.sha256 == result2.sha256
assert result1.s3_key == result2.s3_key
@pytest.mark.unit
def test_different_content_different_keys(self, mock_storage):
"""Test different content produces different S3 keys."""
content1 = b"first content"
content2 = b"second content"
file1 = io.BytesIO(content1)
result1 = mock_storage._store_simple(file1)
file2 = io.BytesIO(content2)
result2 = mock_storage._store_simple(file2)
assert result1.sha256 != result2.sha256
assert result1.s3_key != result2.s3_key
# =============================================================================
# Deduplication Edge Cases
# =============================================================================
class TestDeduplicationEdgeCases:
"""Edge case tests for deduplication."""
@pytest.mark.unit
def test_same_content_different_filenames(self, mock_storage):
"""Test same content with different metadata is deduplicated."""
content = b"identical content"
# Store with "filename1"
file1 = io.BytesIO(content)
result1 = mock_storage._store_simple(file1)
# Store with "filename2" (same content)
file2 = io.BytesIO(content)
result2 = mock_storage._store_simple(file2)
# Both should have same hash (content-addressable)
assert result1.sha256 == result2.sha256
@pytest.mark.unit
def test_whitespace_only_difference(self, mock_storage):
"""Test content differing only by whitespace produces different hashes."""
content1 = b"test content"
content2 = b"test content" # Extra space
content3 = b"test content " # Trailing space
file1 = io.BytesIO(content1)
file2 = io.BytesIO(content2)
file3 = io.BytesIO(content3)
result1 = mock_storage._store_simple(file1)
result2 = mock_storage._store_simple(file2)
result3 = mock_storage._store_simple(file3)
# All should be different (content-addressable)
assert len({result1.sha256, result2.sha256, result3.sha256}) == 3