Store SHA256 checksums with artifacts and add multiple hash support

- Add sha256 field to API responses as explicit alias of artifact id
- Add checksum_sha1 and s3_etag fields to artifacts table
- Compute MD5, SHA1, and capture S3 ETag during upload
- Update StorageResult to return all checksums from storage layer
- Add migration 003_checksum_fields.sql for existing databases
- Add Dockerfile.local and docker-compose.local.yml for local development
- Update schemas to include all checksum fields in responses
This commit is contained in:
Mondo Diaz
2025-12-15 13:03:58 -06:00
parent 0eb2deb4ca
commit b124b94b56
9 changed files with 358 additions and 41 deletions

View File

@@ -74,7 +74,9 @@ class Artifact(Base):
content_type = Column(String(255))
original_name = Column(String(1024))
checksum_md5 = Column(String(32)) # MD5 hash for additional verification
metadata = Column(JSON, default=dict) # Format-specific metadata
checksum_sha1 = Column(String(40)) # SHA1 hash for compatibility
s3_etag = Column(String(64)) # S3 ETag for verification
artifact_metadata = Column("metadata", JSON, default=dict) # Format-specific metadata (column name is 'metadata')
created_at = Column(DateTime(timezone=True), default=datetime.utcnow)
created_by = Column(String(255), nullable=False)
ref_count = Column(Integer, default=1)
@@ -83,6 +85,21 @@ class Artifact(Base):
tags = relationship("Tag", back_populates="artifact")
uploads = relationship("Upload", back_populates="artifact")
@property
def sha256(self) -> str:
"""Alias for id - the SHA256 hash of the artifact content"""
return self.id
@property
def format_metadata(self):
"""Alias for artifact_metadata - backward compatibility"""
return self.artifact_metadata
@format_metadata.setter
def format_metadata(self, value):
"""Alias setter for artifact_metadata - backward compatibility"""
self.artifact_metadata = value
__table_args__ = (
Index("idx_artifacts_created_at", "created_at"),
Index("idx_artifacts_created_by", "created_by"),

View File

@@ -520,40 +520,51 @@ def upload_artifact(
)
# Store file (uses multipart for large files)
sha256_hash, size, s3_key = storage.store(file.file, content_length)
storage_result = storage.store(file.file, content_length)
# Check if this is a deduplicated upload
deduplicated = False
# Create or update artifact record
artifact = db.query(Artifact).filter(Artifact.id == sha256_hash).first()
artifact = db.query(Artifact).filter(Artifact.id == storage_result.sha256).first()
if artifact:
artifact.ref_count += 1
deduplicated = True
# Merge metadata if new metadata was extracted
if file_metadata and artifact.format_metadata:
artifact.format_metadata = {**artifact.format_metadata, **file_metadata}
if file_metadata and artifact.artifact_metadata:
artifact.artifact_metadata = {**artifact.artifact_metadata, **file_metadata}
elif file_metadata:
artifact.format_metadata = file_metadata
artifact.artifact_metadata = file_metadata
# Update checksums if not already set
if not artifact.checksum_md5 and storage_result.md5:
artifact.checksum_md5 = storage_result.md5
if not artifact.checksum_sha1 and storage_result.sha1:
artifact.checksum_sha1 = storage_result.sha1
if not artifact.s3_etag and storage_result.s3_etag:
artifact.s3_etag = storage_result.s3_etag
else:
artifact = Artifact(
id=sha256_hash,
size=size,
id=storage_result.sha256,
size=storage_result.size,
content_type=file.content_type,
original_name=file.filename,
checksum_md5=storage_result.md5,
checksum_sha1=storage_result.sha1,
s3_etag=storage_result.s3_etag,
created_by=user_id,
s3_key=s3_key,
format_metadata=file_metadata or {},
s3_key=storage_result.s3_key,
artifact_metadata=file_metadata or {},
)
db.add(artifact)
# Record upload
upload = Upload(
artifact_id=sha256_hash,
artifact_id=storage_result.sha256,
package_id=package.id,
original_name=file.filename,
uploaded_by=user_id,
source_ip=request.client.host if request.client else None,
deduplicated=deduplicated,
)
db.add(upload)
@@ -561,13 +572,13 @@ def upload_artifact(
if tag:
existing_tag = db.query(Tag).filter(Tag.package_id == package.id, Tag.name == tag).first()
if existing_tag:
existing_tag.artifact_id = sha256_hash
existing_tag.artifact_id = storage_result.sha256
existing_tag.created_by = user_id
else:
new_tag = Tag(
package_id=package.id,
name=tag,
artifact_id=sha256_hash,
artifact_id=storage_result.sha256,
created_by=user_id,
)
db.add(new_tag)
@@ -575,12 +586,16 @@ def upload_artifact(
db.commit()
return UploadResponse(
artifact_id=sha256_hash,
size=size,
artifact_id=storage_result.sha256,
sha256=storage_result.sha256,
size=storage_result.size,
project=project_name,
package=package_name,
tag=tag,
format_metadata=artifact.format_metadata,
checksum_md5=storage_result.md5,
checksum_sha1=storage_result.sha1,
s3_etag=storage_result.s3_etag,
format_metadata=artifact.artifact_metadata,
deduplicated=deduplicated,
)

View File

@@ -99,9 +99,13 @@ class PackageDetailResponse(BaseModel):
# Artifact schemas
class ArtifactResponse(BaseModel):
id: str
sha256: str # Explicit SHA256 field (same as id)
size: int
content_type: Optional[str]
original_name: Optional[str]
checksum_md5: Optional[str] = None
checksum_sha1: Optional[str] = None
s3_etag: Optional[str] = None
created_at: datetime
created_by: str
ref_count: int
@@ -173,9 +177,13 @@ class ArtifactTagInfo(BaseModel):
class ArtifactDetailResponse(BaseModel):
"""Artifact with list of tags/packages referencing it"""
id: str
sha256: str # Explicit SHA256 field (same as id)
size: int
content_type: Optional[str]
original_name: Optional[str]
checksum_md5: Optional[str] = None
checksum_sha1: Optional[str] = None
s3_etag: Optional[str] = None
created_at: datetime
created_by: str
ref_count: int
@@ -189,9 +197,13 @@ class ArtifactDetailResponse(BaseModel):
class PackageArtifactResponse(BaseModel):
"""Artifact with tags for package artifact listing"""
id: str
sha256: str # Explicit SHA256 field (same as id)
size: int
content_type: Optional[str]
original_name: Optional[str]
checksum_md5: Optional[str] = None
checksum_sha1: Optional[str] = None
s3_etag: Optional[str] = None
created_at: datetime
created_by: str
format_metadata: Optional[Dict[str, Any]] = None
@@ -204,10 +216,14 @@ class PackageArtifactResponse(BaseModel):
# Upload response
class UploadResponse(BaseModel):
artifact_id: str
sha256: str # Explicit SHA256 field (same as artifact_id)
size: int
project: str
package: str
tag: Optional[str]
checksum_md5: Optional[str] = None
checksum_sha1: Optional[str] = None
s3_etag: Optional[str] = None
format_metadata: Optional[Dict[str, Any]] = None
deduplicated: bool = False

View File

@@ -1,6 +1,6 @@
import hashlib
import logging
from typing import BinaryIO, Tuple, Optional, Dict, Any, Generator
from typing import BinaryIO, Tuple, Optional, Dict, Any, Generator, NamedTuple
import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
@@ -18,6 +18,16 @@ MULTIPART_CHUNK_SIZE = 10 * 1024 * 1024
HASH_CHUNK_SIZE = 8 * 1024 * 1024
class StorageResult(NamedTuple):
"""Result of storing a file with all computed checksums"""
sha256: str
size: int
s3_key: str
md5: Optional[str] = None
sha1: Optional[str] = None
s3_etag: Optional[str] = None
class S3Storage:
def __init__(self):
config = Config(s3={"addressing_style": "path"} if settings.s3_use_path_style else {})
@@ -34,9 +44,9 @@ class S3Storage:
# Store active multipart uploads for resumable support
self._active_uploads: Dict[str, Dict[str, Any]] = {}
def store(self, file: BinaryIO, content_length: Optional[int] = None) -> Tuple[str, int, str]:
def store(self, file: BinaryIO, content_length: Optional[int] = None) -> StorageResult:
"""
Store a file and return its SHA256 hash, size, and s3_key.
Store a file and return StorageResult with all checksums.
Content-addressable: if the file already exists, just return the hash.
Uses multipart upload for files larger than MULTIPART_THRESHOLD.
"""
@@ -46,45 +56,76 @@ class S3Storage:
else:
return self._store_multipart(file, content_length)
def _store_simple(self, file: BinaryIO) -> Tuple[str, int, str]:
def _store_simple(self, file: BinaryIO) -> StorageResult:
"""Store a small file using simple put_object"""
# Read file and compute hash
# Read file and compute all hashes
content = file.read()
sha256_hash = hashlib.sha256(content).hexdigest()
md5_hash = hashlib.md5(content).hexdigest()
sha1_hash = hashlib.sha1(content).hexdigest()
size = len(content)
# Check if already exists
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
s3_etag = None
if not self._exists(s3_key):
self.client.put_object(
response = self.client.put_object(
Bucket=self.bucket,
Key=s3_key,
Body=content,
)
s3_etag = response.get("ETag", "").strip('"')
else:
# Get existing ETag
obj_info = self.get_object_info(s3_key)
if obj_info:
s3_etag = obj_info.get("etag", "").strip('"')
return sha256_hash, size, s3_key
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
)
def _store_multipart(self, file: BinaryIO, content_length: int) -> Tuple[str, int, str]:
def _store_multipart(self, file: BinaryIO, content_length: int) -> StorageResult:
"""Store a large file using S3 multipart upload with streaming hash computation"""
# First pass: compute hash by streaming through file
hasher = hashlib.sha256()
# First pass: compute all hashes by streaming through file
sha256_hasher = hashlib.sha256()
md5_hasher = hashlib.md5()
sha1_hasher = hashlib.sha1()
size = 0
# Read file in chunks to compute hash
# Read file in chunks to compute hashes
while True:
chunk = file.read(HASH_CHUNK_SIZE)
if not chunk:
break
hasher.update(chunk)
sha256_hasher.update(chunk)
md5_hasher.update(chunk)
sha1_hasher.update(chunk)
size += len(chunk)
sha256_hash = hasher.hexdigest()
sha256_hash = sha256_hasher.hexdigest()
md5_hash = md5_hasher.hexdigest()
sha1_hash = sha1_hasher.hexdigest()
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
# Check if already exists (deduplication)
if self._exists(s3_key):
return sha256_hash, size, s3_key
obj_info = self.get_object_info(s3_key)
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
)
# Seek back to start for upload
file.seek(0)
@@ -116,14 +157,22 @@ class S3Storage:
part_number += 1
# Complete multipart upload
self.client.complete_multipart_upload(
complete_response = self.client.complete_multipart_upload(
Bucket=self.bucket,
Key=s3_key,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
s3_etag = complete_response.get("ETag", "").strip('"')
return sha256_hash, size, s3_key
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
)
except Exception as e:
# Abort multipart upload on failure
@@ -135,33 +184,50 @@ class S3Storage:
)
raise
def store_streaming(self, chunks: Generator[bytes, None, None]) -> Tuple[str, int, str]:
def store_streaming(self, chunks: Generator[bytes, None, None]) -> StorageResult:
"""
Store a file from a stream of chunks.
First accumulates to compute hash, then uploads.
For truly large files, consider using initiate_resumable_upload instead.
"""
# Accumulate chunks and compute hash
hasher = hashlib.sha256()
# Accumulate chunks and compute all hashes
sha256_hasher = hashlib.sha256()
md5_hasher = hashlib.md5()
sha1_hasher = hashlib.sha1()
all_chunks = []
size = 0
for chunk in chunks:
hasher.update(chunk)
sha256_hasher.update(chunk)
md5_hasher.update(chunk)
sha1_hasher.update(chunk)
all_chunks.append(chunk)
size += len(chunk)
sha256_hash = hasher.hexdigest()
sha256_hash = sha256_hasher.hexdigest()
md5_hash = md5_hasher.hexdigest()
sha1_hash = sha1_hasher.hexdigest()
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
s3_etag = None
# Check if already exists
if self._exists(s3_key):
return sha256_hash, size, s3_key
obj_info = self.get_object_info(s3_key)
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
)
# Upload based on size
if size < MULTIPART_THRESHOLD:
content = b"".join(all_chunks)
self.client.put_object(Bucket=self.bucket, Key=s3_key, Body=content)
response = self.client.put_object(Bucket=self.bucket, Key=s3_key, Body=content)
s3_etag = response.get("ETag", "").strip('"')
else:
# Use multipart for large files
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
@@ -205,12 +271,13 @@ class S3Storage:
"ETag": response["ETag"],
})
self.client.complete_multipart_upload(
complete_response = self.client.complete_multipart_upload(
Bucket=self.bucket,
Key=s3_key,
UploadId=upload_id,
MultipartUpload={"Parts": parts},
)
s3_etag = complete_response.get("ETag", "").strip('"')
except Exception as e:
logger.error(f"Streaming multipart upload failed: {e}")
@@ -221,7 +288,14 @@ class S3Storage:
)
raise
return sha256_hash, size, s3_key
return StorageResult(
sha256=sha256_hash,
size=size,
s3_key=s3_key,
md5=md5_hash,
sha1=sha1_hash,
s3_etag=s3_etag,
)
def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]:
"""