440 lines
14 KiB
Python
440 lines
14 KiB
Python
"""
|
|
Unit tests for S3 storage layer.
|
|
|
|
Tests cover:
|
|
- SHA256 hash calculation and consistency
|
|
- Hash format validation (64-char hex)
|
|
- S3 key generation pattern
|
|
- Deduplication behavior (_exists method)
|
|
- Storage result computation (MD5, SHA1, size)
|
|
- Edge cases (empty files, large files, binary content)
|
|
"""
|
|
|
|
import pytest
|
|
import hashlib
|
|
import io
|
|
from tests.factories import (
|
|
compute_sha256,
|
|
TEST_CONTENT_HELLO,
|
|
TEST_HASH_HELLO,
|
|
TEST_CONTENT_BINARY,
|
|
TEST_HASH_BINARY,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# Hash Computation Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestHashComputation:
|
|
"""Unit tests for hash calculation functionality."""
|
|
|
|
@pytest.mark.unit
|
|
def test_sha256_consistent_results(self):
|
|
"""Test SHA256 hash produces consistent results for identical content."""
|
|
content = b"test content for hashing"
|
|
|
|
# Compute hash multiple times
|
|
hash1 = compute_sha256(content)
|
|
hash2 = compute_sha256(content)
|
|
hash3 = compute_sha256(content)
|
|
|
|
assert hash1 == hash2 == hash3
|
|
|
|
@pytest.mark.unit
|
|
def test_sha256_different_content_different_hash(self):
|
|
"""Test SHA256 produces different hashes for different content."""
|
|
content1 = b"content version 1"
|
|
content2 = b"content version 2"
|
|
|
|
hash1 = compute_sha256(content1)
|
|
hash2 = compute_sha256(content2)
|
|
|
|
assert hash1 != hash2
|
|
|
|
@pytest.mark.unit
|
|
def test_sha256_format_64_char_hex(self):
|
|
"""Test SHA256 hash is always 64 character lowercase hexadecimal."""
|
|
test_cases = [
|
|
b"", # Empty
|
|
b"a", # Single char
|
|
b"Hello, World!", # Normal string
|
|
bytes(range(256)), # All byte values
|
|
b"x" * 10000, # Larger content
|
|
]
|
|
|
|
for content in test_cases:
|
|
hash_value = compute_sha256(content)
|
|
|
|
# Check length
|
|
assert len(hash_value) == 64, (
|
|
f"Hash length should be 64, got {len(hash_value)}"
|
|
)
|
|
|
|
# Check lowercase
|
|
assert hash_value == hash_value.lower(), "Hash should be lowercase"
|
|
|
|
# Check hexadecimal
|
|
assert all(c in "0123456789abcdef" for c in hash_value), (
|
|
"Hash should be hex"
|
|
)
|
|
|
|
@pytest.mark.unit
|
|
def test_sha256_known_value(self):
|
|
"""Test SHA256 produces expected hash for known input."""
|
|
assert compute_sha256(TEST_CONTENT_HELLO) == TEST_HASH_HELLO
|
|
|
|
@pytest.mark.unit
|
|
def test_sha256_binary_content(self):
|
|
"""Test SHA256 handles binary content correctly."""
|
|
assert compute_sha256(TEST_CONTENT_BINARY) == TEST_HASH_BINARY
|
|
|
|
# Test with null bytes
|
|
content_with_nulls = b"\x00\x00test\x00\x00"
|
|
hash_value = compute_sha256(content_with_nulls)
|
|
assert len(hash_value) == 64
|
|
|
|
@pytest.mark.unit
|
|
def test_sha256_streaming_computation(self):
|
|
"""Test SHA256 can be computed in chunks (streaming)."""
|
|
# Large content
|
|
chunk_size = 8192
|
|
total_size = chunk_size * 10 # 80KB
|
|
content = b"x" * total_size
|
|
|
|
# Direct computation
|
|
direct_hash = compute_sha256(content)
|
|
|
|
# Streaming computation
|
|
hasher = hashlib.sha256()
|
|
for i in range(0, total_size, chunk_size):
|
|
hasher.update(content[i : i + chunk_size])
|
|
streaming_hash = hasher.hexdigest()
|
|
|
|
assert direct_hash == streaming_hash
|
|
|
|
@pytest.mark.unit
|
|
def test_sha256_order_matters(self):
|
|
"""Test that content order affects hash (not just content set)."""
|
|
content1 = b"AB"
|
|
content2 = b"BA"
|
|
|
|
assert compute_sha256(content1) != compute_sha256(content2)
|
|
|
|
|
|
# =============================================================================
|
|
# Storage Hash Computation Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestStorageHashComputation:
|
|
"""Tests for hash computation in the storage layer."""
|
|
|
|
@pytest.mark.unit
|
|
def test_storage_computes_sha256(self, mock_storage):
|
|
"""Test storage layer correctly computes SHA256 hash."""
|
|
content = TEST_CONTENT_HELLO
|
|
file_obj = io.BytesIO(content)
|
|
|
|
result = mock_storage._store_simple(file_obj)
|
|
|
|
assert result.sha256 == TEST_HASH_HELLO
|
|
|
|
@pytest.mark.unit
|
|
def test_storage_computes_md5(self, mock_storage):
|
|
"""Test storage layer also computes MD5 hash."""
|
|
content = TEST_CONTENT_HELLO
|
|
file_obj = io.BytesIO(content)
|
|
|
|
result = mock_storage._store_simple(file_obj)
|
|
|
|
expected_md5 = hashlib.md5(content).hexdigest()
|
|
assert result.md5 == expected_md5
|
|
|
|
@pytest.mark.unit
|
|
def test_storage_computes_sha1(self, mock_storage):
|
|
"""Test storage layer also computes SHA1 hash."""
|
|
content = TEST_CONTENT_HELLO
|
|
file_obj = io.BytesIO(content)
|
|
|
|
result = mock_storage._store_simple(file_obj)
|
|
|
|
expected_sha1 = hashlib.sha1(content).hexdigest()
|
|
assert result.sha1 == expected_sha1
|
|
|
|
@pytest.mark.unit
|
|
def test_storage_returns_correct_size(self, mock_storage):
|
|
"""Test storage layer returns correct file size."""
|
|
content = b"test content with known size"
|
|
file_obj = io.BytesIO(content)
|
|
|
|
result = mock_storage._store_simple(file_obj)
|
|
|
|
assert result.size == len(content)
|
|
|
|
@pytest.mark.unit
|
|
def test_storage_generates_correct_s3_key(self, mock_storage):
|
|
"""Test storage layer generates correct S3 key pattern."""
|
|
content = TEST_CONTENT_HELLO
|
|
file_obj = io.BytesIO(content)
|
|
|
|
result = mock_storage._store_simple(file_obj)
|
|
|
|
# Key should be: fruits/{hash[:2]}/{hash[2:4]}/{hash}
|
|
expected_key = (
|
|
f"fruits/{TEST_HASH_HELLO[:2]}/{TEST_HASH_HELLO[2:4]}/{TEST_HASH_HELLO}"
|
|
)
|
|
assert result.s3_key == expected_key
|
|
|
|
|
|
# =============================================================================
|
|
# Hash Edge Cases
|
|
# =============================================================================
|
|
|
|
|
|
class TestHashEdgeCases:
|
|
"""Edge case tests for hash computation."""
|
|
|
|
@pytest.mark.unit
|
|
def test_hash_empty_content_rejected(self, mock_storage):
|
|
"""Test that empty content is rejected."""
|
|
from app.storage import HashComputationError
|
|
|
|
file_obj = io.BytesIO(b"")
|
|
|
|
with pytest.raises(HashComputationError):
|
|
mock_storage._store_simple(file_obj)
|
|
|
|
@pytest.mark.unit
|
|
def test_hash_large_file_streaming(self, mock_storage):
|
|
"""Test hash computation for large files uses streaming."""
|
|
# Create a 10MB file
|
|
size = 10 * 1024 * 1024
|
|
content = b"x" * size
|
|
file_obj = io.BytesIO(content)
|
|
|
|
result = mock_storage._store_simple(file_obj)
|
|
|
|
expected_hash = compute_sha256(content)
|
|
assert result.sha256 == expected_hash
|
|
|
|
@pytest.mark.unit
|
|
def test_hash_special_bytes(self):
|
|
"""Test hash handles all byte values correctly."""
|
|
# All possible byte values
|
|
content = bytes(range(256))
|
|
hash_value = compute_sha256(content)
|
|
|
|
assert len(hash_value) == 64
|
|
assert hash_value == TEST_HASH_BINARY
|
|
|
|
|
|
# =============================================================================
|
|
# S3 Existence Check Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestExistsMethod:
|
|
"""Tests for the _exists() method that checks S3 object existence."""
|
|
|
|
@pytest.mark.unit
|
|
def test_exists_returns_true_for_existing_key(self, mock_storage, mock_s3_client):
|
|
"""Test _exists() returns True when object exists."""
|
|
# Pre-populate the mock storage
|
|
test_key = "fruits/df/fd/test-hash"
|
|
mock_s3_client.objects[test_key] = b"content"
|
|
|
|
result = mock_storage._exists(test_key)
|
|
|
|
assert result is True
|
|
|
|
@pytest.mark.unit
|
|
def test_exists_returns_false_for_nonexistent_key(self, mock_storage):
|
|
"""Test _exists() returns False when object doesn't exist."""
|
|
result = mock_storage._exists("fruits/no/ne/nonexistent-key")
|
|
|
|
assert result is False
|
|
|
|
@pytest.mark.unit
|
|
def test_exists_handles_404_error(self, mock_storage):
|
|
"""Test _exists() handles 404 errors gracefully."""
|
|
# The mock client raises ClientError for nonexistent keys
|
|
result = mock_storage._exists("fruits/xx/yy/does-not-exist")
|
|
|
|
assert result is False
|
|
|
|
|
|
# =============================================================================
|
|
# S3 Key Generation Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestS3KeyGeneration:
|
|
"""Tests for S3 key pattern generation."""
|
|
|
|
@pytest.mark.unit
|
|
def test_s3_key_pattern(self):
|
|
"""Test S3 key follows pattern: fruits/{hash[:2]}/{hash[2:4]}/{hash}"""
|
|
test_hash = "abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
|
|
|
|
expected_key = f"fruits/{test_hash[:2]}/{test_hash[2:4]}/{test_hash}"
|
|
# Expected: fruits/ab/cd/abcdef1234567890...
|
|
|
|
assert expected_key == f"fruits/ab/cd/{test_hash}"
|
|
|
|
@pytest.mark.unit
|
|
def test_s3_key_generation_in_storage(self, mock_storage):
|
|
"""Test storage layer generates correct S3 key."""
|
|
content = TEST_CONTENT_HELLO
|
|
file_obj = io.BytesIO(content)
|
|
|
|
result = mock_storage._store_simple(file_obj)
|
|
|
|
expected_key = (
|
|
f"fruits/{TEST_HASH_HELLO[:2]}/{TEST_HASH_HELLO[2:4]}/{TEST_HASH_HELLO}"
|
|
)
|
|
assert result.s3_key == expected_key
|
|
|
|
@pytest.mark.unit
|
|
def test_s3_key_uses_sha256_hash(self, mock_storage):
|
|
"""Test S3 key is derived from SHA256 hash."""
|
|
content = b"unique test content for key test"
|
|
file_obj = io.BytesIO(content)
|
|
expected_hash = compute_sha256(content)
|
|
|
|
result = mock_storage._store_simple(file_obj)
|
|
|
|
# Key should contain the hash
|
|
assert expected_hash in result.s3_key
|
|
|
|
|
|
# =============================================================================
|
|
# Deduplication Behavior Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestDeduplicationBehavior:
|
|
"""Tests for deduplication (skip upload when exists)."""
|
|
|
|
@pytest.mark.unit
|
|
def test_skips_upload_when_exists(self, mock_storage, mock_s3_client):
|
|
"""Test storage skips S3 upload when artifact already exists."""
|
|
content = TEST_CONTENT_HELLO
|
|
s3_key = (
|
|
f"fruits/{TEST_HASH_HELLO[:2]}/{TEST_HASH_HELLO[2:4]}/{TEST_HASH_HELLO}"
|
|
)
|
|
|
|
# Pre-populate storage (simulate existing artifact)
|
|
mock_s3_client.objects[s3_key] = content
|
|
|
|
# Track put_object calls
|
|
original_put = mock_s3_client.put_object
|
|
put_called = []
|
|
|
|
def tracked_put(*args, **kwargs):
|
|
put_called.append(True)
|
|
return original_put(*args, **kwargs)
|
|
|
|
mock_s3_client.put_object = tracked_put
|
|
|
|
# Store the same content
|
|
file_obj = io.BytesIO(content)
|
|
result = mock_storage._store_simple(file_obj)
|
|
|
|
# put_object should NOT have been called (deduplication)
|
|
assert len(put_called) == 0
|
|
assert result.sha256 == TEST_HASH_HELLO
|
|
|
|
@pytest.mark.unit
|
|
def test_uploads_when_not_exists(self, mock_storage, mock_s3_client):
|
|
"""Test storage uploads to S3 when artifact doesn't exist."""
|
|
content = b"brand new unique content"
|
|
content_hash = compute_sha256(content)
|
|
s3_key = f"fruits/{content_hash[:2]}/{content_hash[2:4]}/{content_hash}"
|
|
|
|
# Ensure object doesn't exist
|
|
assert s3_key not in mock_s3_client.objects
|
|
|
|
# Store the content
|
|
file_obj = io.BytesIO(content)
|
|
result = mock_storage._store_simple(file_obj)
|
|
|
|
# Object should now exist in mock storage
|
|
assert s3_key in mock_s3_client.objects
|
|
assert mock_s3_client.objects[s3_key] == content
|
|
|
|
@pytest.mark.unit
|
|
def test_returns_same_hash_for_duplicate(self, mock_storage, mock_s3_client):
|
|
"""Test storing same content twice returns same hash."""
|
|
content = b"content to be stored twice"
|
|
|
|
# First store
|
|
file1 = io.BytesIO(content)
|
|
result1 = mock_storage._store_simple(file1)
|
|
|
|
# Second store (duplicate)
|
|
file2 = io.BytesIO(content)
|
|
result2 = mock_storage._store_simple(file2)
|
|
|
|
assert result1.sha256 == result2.sha256
|
|
assert result1.s3_key == result2.s3_key # gitleaks:allow
|
|
|
|
@pytest.mark.unit
|
|
def test_different_content_different_keys(self, mock_storage):
|
|
"""Test different content produces different S3 keys."""
|
|
content1 = b"first content"
|
|
content2 = b"second content"
|
|
|
|
file1 = io.BytesIO(content1)
|
|
result1 = mock_storage._store_simple(file1)
|
|
|
|
file2 = io.BytesIO(content2)
|
|
result2 = mock_storage._store_simple(file2)
|
|
|
|
assert result1.sha256 != result2.sha256
|
|
assert result1.s3_key != result2.s3_key # gitleaks:allow
|
|
|
|
|
|
# =============================================================================
|
|
# Deduplication Edge Cases
|
|
# =============================================================================
|
|
|
|
|
|
class TestDeduplicationEdgeCases:
|
|
"""Edge case tests for deduplication."""
|
|
|
|
@pytest.mark.unit
|
|
def test_same_content_different_filenames(self, mock_storage):
|
|
"""Test same content with different metadata is deduplicated."""
|
|
content = b"identical content"
|
|
|
|
# Store with "filename1"
|
|
file1 = io.BytesIO(content)
|
|
result1 = mock_storage._store_simple(file1)
|
|
|
|
# Store with "filename2" (same content)
|
|
file2 = io.BytesIO(content)
|
|
result2 = mock_storage._store_simple(file2)
|
|
|
|
# Both should have same hash (content-addressable)
|
|
assert result1.sha256 == result2.sha256
|
|
|
|
@pytest.mark.unit
|
|
def test_whitespace_only_difference(self, mock_storage):
|
|
"""Test content differing only by whitespace produces different hashes."""
|
|
content1 = b"test content"
|
|
content2 = b"test content" # Extra space
|
|
content3 = b"test content " # Trailing space
|
|
|
|
file1 = io.BytesIO(content1)
|
|
file2 = io.BytesIO(content2)
|
|
file3 = io.BytesIO(content3)
|
|
|
|
result1 = mock_storage._store_simple(file1)
|
|
result2 = mock_storage._store_simple(file2)
|
|
result3 = mock_storage._store_simple(file3)
|
|
|
|
# All should be different (content-addressable)
|
|
assert len({result1.sha256, result2.sha256, result3.sha256}) == 3
|