Store SHA256 checksums with artifacts and add multiple hash support
- Add sha256 field to API responses as explicit alias of artifact id - Add checksum_sha1 and s3_etag fields to artifacts table - Compute MD5, SHA1, and capture S3 ETag during upload - Update StorageResult to return all checksums from storage layer - Add migration 003_checksum_fields.sql for existing databases - Add Dockerfile.local and docker-compose.local.yml for local development - Update schemas to include all checksum fields in responses
This commit is contained in:
@@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
- Added `sha256` field to API responses for clarity (alias of `id`) (#25)
|
||||||
|
- Added `checksum_sha1` field to artifacts table for compatibility (#25)
|
||||||
|
- Added `s3_etag` field to artifacts table for S3 verification (#25)
|
||||||
|
- Compute and store MD5, SHA1, and S3 ETag alongside SHA256 during upload (#25)
|
||||||
|
- Added `Dockerfile.local` and `docker-compose.local.yml` for local development (#25)
|
||||||
|
- Added migration script `003_checksum_fields.sql` for existing databases (#25)
|
||||||
|
|
||||||
## [0.2.0] - 2025-12-15
|
## [0.2.0] - 2025-12-15
|
||||||
### Changed
|
### Changed
|
||||||
- Updated images to use internal container BSF proxy (#46)
|
- Updated images to use internal container BSF proxy (#46)
|
||||||
|
|||||||
50
Dockerfile.local
Normal file
50
Dockerfile.local
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
# Frontend build stage
|
||||||
|
FROM node:20-alpine AS frontend-builder
|
||||||
|
|
||||||
|
WORKDIR /app/frontend
|
||||||
|
|
||||||
|
# Copy package files
|
||||||
|
COPY frontend/package*.json ./
|
||||||
|
RUN npm install
|
||||||
|
|
||||||
|
# Copy frontend source
|
||||||
|
COPY frontend/ ./
|
||||||
|
|
||||||
|
# Build frontend
|
||||||
|
RUN npm run build
|
||||||
|
|
||||||
|
# Runtime stage
|
||||||
|
FROM python:3.12-slim
|
||||||
|
|
||||||
|
# Install system dependencies
|
||||||
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
|
curl \
|
||||||
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
# Create non-root user
|
||||||
|
RUN groupadd -g 1000 orchard && \
|
||||||
|
useradd -u 1000 -g orchard -s /bin/bash -m orchard
|
||||||
|
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Copy requirements and install Python dependencies
|
||||||
|
COPY backend/requirements.txt .
|
||||||
|
RUN pip install --no-cache-dir -r requirements.txt
|
||||||
|
|
||||||
|
# Copy backend source
|
||||||
|
COPY backend/ ./backend/
|
||||||
|
|
||||||
|
# Copy frontend build
|
||||||
|
COPY --from=frontend-builder /app/frontend/dist ./frontend/dist
|
||||||
|
|
||||||
|
# Set ownership
|
||||||
|
RUN chown -R orchard:orchard /app
|
||||||
|
|
||||||
|
USER orchard
|
||||||
|
|
||||||
|
EXPOSE 8080
|
||||||
|
|
||||||
|
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
|
||||||
|
CMD curl -f http://localhost:8080/health || exit 1
|
||||||
|
|
||||||
|
CMD ["uvicorn", "backend.app.main:app", "--host", "0.0.0.0", "--port", "8080"]
|
||||||
@@ -74,7 +74,9 @@ class Artifact(Base):
|
|||||||
content_type = Column(String(255))
|
content_type = Column(String(255))
|
||||||
original_name = Column(String(1024))
|
original_name = Column(String(1024))
|
||||||
checksum_md5 = Column(String(32)) # MD5 hash for additional verification
|
checksum_md5 = Column(String(32)) # MD5 hash for additional verification
|
||||||
metadata = Column(JSON, default=dict) # Format-specific metadata
|
checksum_sha1 = Column(String(40)) # SHA1 hash for compatibility
|
||||||
|
s3_etag = Column(String(64)) # S3 ETag for verification
|
||||||
|
artifact_metadata = Column("metadata", JSON, default=dict) # Format-specific metadata (column name is 'metadata')
|
||||||
created_at = Column(DateTime(timezone=True), default=datetime.utcnow)
|
created_at = Column(DateTime(timezone=True), default=datetime.utcnow)
|
||||||
created_by = Column(String(255), nullable=False)
|
created_by = Column(String(255), nullable=False)
|
||||||
ref_count = Column(Integer, default=1)
|
ref_count = Column(Integer, default=1)
|
||||||
@@ -83,6 +85,21 @@ class Artifact(Base):
|
|||||||
tags = relationship("Tag", back_populates="artifact")
|
tags = relationship("Tag", back_populates="artifact")
|
||||||
uploads = relationship("Upload", back_populates="artifact")
|
uploads = relationship("Upload", back_populates="artifact")
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sha256(self) -> str:
|
||||||
|
"""Alias for id - the SHA256 hash of the artifact content"""
|
||||||
|
return self.id
|
||||||
|
|
||||||
|
@property
|
||||||
|
def format_metadata(self):
|
||||||
|
"""Alias for artifact_metadata - backward compatibility"""
|
||||||
|
return self.artifact_metadata
|
||||||
|
|
||||||
|
@format_metadata.setter
|
||||||
|
def format_metadata(self, value):
|
||||||
|
"""Alias setter for artifact_metadata - backward compatibility"""
|
||||||
|
self.artifact_metadata = value
|
||||||
|
|
||||||
__table_args__ = (
|
__table_args__ = (
|
||||||
Index("idx_artifacts_created_at", "created_at"),
|
Index("idx_artifacts_created_at", "created_at"),
|
||||||
Index("idx_artifacts_created_by", "created_by"),
|
Index("idx_artifacts_created_by", "created_by"),
|
||||||
|
|||||||
@@ -520,40 +520,51 @@ def upload_artifact(
|
|||||||
)
|
)
|
||||||
|
|
||||||
# Store file (uses multipart for large files)
|
# Store file (uses multipart for large files)
|
||||||
sha256_hash, size, s3_key = storage.store(file.file, content_length)
|
storage_result = storage.store(file.file, content_length)
|
||||||
|
|
||||||
# Check if this is a deduplicated upload
|
# Check if this is a deduplicated upload
|
||||||
deduplicated = False
|
deduplicated = False
|
||||||
|
|
||||||
# Create or update artifact record
|
# Create or update artifact record
|
||||||
artifact = db.query(Artifact).filter(Artifact.id == sha256_hash).first()
|
artifact = db.query(Artifact).filter(Artifact.id == storage_result.sha256).first()
|
||||||
if artifact:
|
if artifact:
|
||||||
artifact.ref_count += 1
|
artifact.ref_count += 1
|
||||||
deduplicated = True
|
deduplicated = True
|
||||||
# Merge metadata if new metadata was extracted
|
# Merge metadata if new metadata was extracted
|
||||||
if file_metadata and artifact.format_metadata:
|
if file_metadata and artifact.artifact_metadata:
|
||||||
artifact.format_metadata = {**artifact.format_metadata, **file_metadata}
|
artifact.artifact_metadata = {**artifact.artifact_metadata, **file_metadata}
|
||||||
elif file_metadata:
|
elif file_metadata:
|
||||||
artifact.format_metadata = file_metadata
|
artifact.artifact_metadata = file_metadata
|
||||||
|
# Update checksums if not already set
|
||||||
|
if not artifact.checksum_md5 and storage_result.md5:
|
||||||
|
artifact.checksum_md5 = storage_result.md5
|
||||||
|
if not artifact.checksum_sha1 and storage_result.sha1:
|
||||||
|
artifact.checksum_sha1 = storage_result.sha1
|
||||||
|
if not artifact.s3_etag and storage_result.s3_etag:
|
||||||
|
artifact.s3_etag = storage_result.s3_etag
|
||||||
else:
|
else:
|
||||||
artifact = Artifact(
|
artifact = Artifact(
|
||||||
id=sha256_hash,
|
id=storage_result.sha256,
|
||||||
size=size,
|
size=storage_result.size,
|
||||||
content_type=file.content_type,
|
content_type=file.content_type,
|
||||||
original_name=file.filename,
|
original_name=file.filename,
|
||||||
|
checksum_md5=storage_result.md5,
|
||||||
|
checksum_sha1=storage_result.sha1,
|
||||||
|
s3_etag=storage_result.s3_etag,
|
||||||
created_by=user_id,
|
created_by=user_id,
|
||||||
s3_key=s3_key,
|
s3_key=storage_result.s3_key,
|
||||||
format_metadata=file_metadata or {},
|
artifact_metadata=file_metadata or {},
|
||||||
)
|
)
|
||||||
db.add(artifact)
|
db.add(artifact)
|
||||||
|
|
||||||
# Record upload
|
# Record upload
|
||||||
upload = Upload(
|
upload = Upload(
|
||||||
artifact_id=sha256_hash,
|
artifact_id=storage_result.sha256,
|
||||||
package_id=package.id,
|
package_id=package.id,
|
||||||
original_name=file.filename,
|
original_name=file.filename,
|
||||||
uploaded_by=user_id,
|
uploaded_by=user_id,
|
||||||
source_ip=request.client.host if request.client else None,
|
source_ip=request.client.host if request.client else None,
|
||||||
|
deduplicated=deduplicated,
|
||||||
)
|
)
|
||||||
db.add(upload)
|
db.add(upload)
|
||||||
|
|
||||||
@@ -561,13 +572,13 @@ def upload_artifact(
|
|||||||
if tag:
|
if tag:
|
||||||
existing_tag = db.query(Tag).filter(Tag.package_id == package.id, Tag.name == tag).first()
|
existing_tag = db.query(Tag).filter(Tag.package_id == package.id, Tag.name == tag).first()
|
||||||
if existing_tag:
|
if existing_tag:
|
||||||
existing_tag.artifact_id = sha256_hash
|
existing_tag.artifact_id = storage_result.sha256
|
||||||
existing_tag.created_by = user_id
|
existing_tag.created_by = user_id
|
||||||
else:
|
else:
|
||||||
new_tag = Tag(
|
new_tag = Tag(
|
||||||
package_id=package.id,
|
package_id=package.id,
|
||||||
name=tag,
|
name=tag,
|
||||||
artifact_id=sha256_hash,
|
artifact_id=storage_result.sha256,
|
||||||
created_by=user_id,
|
created_by=user_id,
|
||||||
)
|
)
|
||||||
db.add(new_tag)
|
db.add(new_tag)
|
||||||
@@ -575,12 +586,16 @@ def upload_artifact(
|
|||||||
db.commit()
|
db.commit()
|
||||||
|
|
||||||
return UploadResponse(
|
return UploadResponse(
|
||||||
artifact_id=sha256_hash,
|
artifact_id=storage_result.sha256,
|
||||||
size=size,
|
sha256=storage_result.sha256,
|
||||||
|
size=storage_result.size,
|
||||||
project=project_name,
|
project=project_name,
|
||||||
package=package_name,
|
package=package_name,
|
||||||
tag=tag,
|
tag=tag,
|
||||||
format_metadata=artifact.format_metadata,
|
checksum_md5=storage_result.md5,
|
||||||
|
checksum_sha1=storage_result.sha1,
|
||||||
|
s3_etag=storage_result.s3_etag,
|
||||||
|
format_metadata=artifact.artifact_metadata,
|
||||||
deduplicated=deduplicated,
|
deduplicated=deduplicated,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -99,9 +99,13 @@ class PackageDetailResponse(BaseModel):
|
|||||||
# Artifact schemas
|
# Artifact schemas
|
||||||
class ArtifactResponse(BaseModel):
|
class ArtifactResponse(BaseModel):
|
||||||
id: str
|
id: str
|
||||||
|
sha256: str # Explicit SHA256 field (same as id)
|
||||||
size: int
|
size: int
|
||||||
content_type: Optional[str]
|
content_type: Optional[str]
|
||||||
original_name: Optional[str]
|
original_name: Optional[str]
|
||||||
|
checksum_md5: Optional[str] = None
|
||||||
|
checksum_sha1: Optional[str] = None
|
||||||
|
s3_etag: Optional[str] = None
|
||||||
created_at: datetime
|
created_at: datetime
|
||||||
created_by: str
|
created_by: str
|
||||||
ref_count: int
|
ref_count: int
|
||||||
@@ -173,9 +177,13 @@ class ArtifactTagInfo(BaseModel):
|
|||||||
class ArtifactDetailResponse(BaseModel):
|
class ArtifactDetailResponse(BaseModel):
|
||||||
"""Artifact with list of tags/packages referencing it"""
|
"""Artifact with list of tags/packages referencing it"""
|
||||||
id: str
|
id: str
|
||||||
|
sha256: str # Explicit SHA256 field (same as id)
|
||||||
size: int
|
size: int
|
||||||
content_type: Optional[str]
|
content_type: Optional[str]
|
||||||
original_name: Optional[str]
|
original_name: Optional[str]
|
||||||
|
checksum_md5: Optional[str] = None
|
||||||
|
checksum_sha1: Optional[str] = None
|
||||||
|
s3_etag: Optional[str] = None
|
||||||
created_at: datetime
|
created_at: datetime
|
||||||
created_by: str
|
created_by: str
|
||||||
ref_count: int
|
ref_count: int
|
||||||
@@ -189,9 +197,13 @@ class ArtifactDetailResponse(BaseModel):
|
|||||||
class PackageArtifactResponse(BaseModel):
|
class PackageArtifactResponse(BaseModel):
|
||||||
"""Artifact with tags for package artifact listing"""
|
"""Artifact with tags for package artifact listing"""
|
||||||
id: str
|
id: str
|
||||||
|
sha256: str # Explicit SHA256 field (same as id)
|
||||||
size: int
|
size: int
|
||||||
content_type: Optional[str]
|
content_type: Optional[str]
|
||||||
original_name: Optional[str]
|
original_name: Optional[str]
|
||||||
|
checksum_md5: Optional[str] = None
|
||||||
|
checksum_sha1: Optional[str] = None
|
||||||
|
s3_etag: Optional[str] = None
|
||||||
created_at: datetime
|
created_at: datetime
|
||||||
created_by: str
|
created_by: str
|
||||||
format_metadata: Optional[Dict[str, Any]] = None
|
format_metadata: Optional[Dict[str, Any]] = None
|
||||||
@@ -204,10 +216,14 @@ class PackageArtifactResponse(BaseModel):
|
|||||||
# Upload response
|
# Upload response
|
||||||
class UploadResponse(BaseModel):
|
class UploadResponse(BaseModel):
|
||||||
artifact_id: str
|
artifact_id: str
|
||||||
|
sha256: str # Explicit SHA256 field (same as artifact_id)
|
||||||
size: int
|
size: int
|
||||||
project: str
|
project: str
|
||||||
package: str
|
package: str
|
||||||
tag: Optional[str]
|
tag: Optional[str]
|
||||||
|
checksum_md5: Optional[str] = None
|
||||||
|
checksum_sha1: Optional[str] = None
|
||||||
|
s3_etag: Optional[str] = None
|
||||||
format_metadata: Optional[Dict[str, Any]] = None
|
format_metadata: Optional[Dict[str, Any]] = None
|
||||||
deduplicated: bool = False
|
deduplicated: bool = False
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import hashlib
|
import hashlib
|
||||||
import logging
|
import logging
|
||||||
from typing import BinaryIO, Tuple, Optional, Dict, Any, Generator
|
from typing import BinaryIO, Tuple, Optional, Dict, Any, Generator, NamedTuple
|
||||||
import boto3
|
import boto3
|
||||||
from botocore.config import Config
|
from botocore.config import Config
|
||||||
from botocore.exceptions import ClientError
|
from botocore.exceptions import ClientError
|
||||||
@@ -18,6 +18,16 @@ MULTIPART_CHUNK_SIZE = 10 * 1024 * 1024
|
|||||||
HASH_CHUNK_SIZE = 8 * 1024 * 1024
|
HASH_CHUNK_SIZE = 8 * 1024 * 1024
|
||||||
|
|
||||||
|
|
||||||
|
class StorageResult(NamedTuple):
|
||||||
|
"""Result of storing a file with all computed checksums"""
|
||||||
|
sha256: str
|
||||||
|
size: int
|
||||||
|
s3_key: str
|
||||||
|
md5: Optional[str] = None
|
||||||
|
sha1: Optional[str] = None
|
||||||
|
s3_etag: Optional[str] = None
|
||||||
|
|
||||||
|
|
||||||
class S3Storage:
|
class S3Storage:
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
config = Config(s3={"addressing_style": "path"} if settings.s3_use_path_style else {})
|
config = Config(s3={"addressing_style": "path"} if settings.s3_use_path_style else {})
|
||||||
@@ -34,9 +44,9 @@ class S3Storage:
|
|||||||
# Store active multipart uploads for resumable support
|
# Store active multipart uploads for resumable support
|
||||||
self._active_uploads: Dict[str, Dict[str, Any]] = {}
|
self._active_uploads: Dict[str, Dict[str, Any]] = {}
|
||||||
|
|
||||||
def store(self, file: BinaryIO, content_length: Optional[int] = None) -> Tuple[str, int, str]:
|
def store(self, file: BinaryIO, content_length: Optional[int] = None) -> StorageResult:
|
||||||
"""
|
"""
|
||||||
Store a file and return its SHA256 hash, size, and s3_key.
|
Store a file and return StorageResult with all checksums.
|
||||||
Content-addressable: if the file already exists, just return the hash.
|
Content-addressable: if the file already exists, just return the hash.
|
||||||
Uses multipart upload for files larger than MULTIPART_THRESHOLD.
|
Uses multipart upload for files larger than MULTIPART_THRESHOLD.
|
||||||
"""
|
"""
|
||||||
@@ -46,45 +56,76 @@ class S3Storage:
|
|||||||
else:
|
else:
|
||||||
return self._store_multipart(file, content_length)
|
return self._store_multipart(file, content_length)
|
||||||
|
|
||||||
def _store_simple(self, file: BinaryIO) -> Tuple[str, int, str]:
|
def _store_simple(self, file: BinaryIO) -> StorageResult:
|
||||||
"""Store a small file using simple put_object"""
|
"""Store a small file using simple put_object"""
|
||||||
# Read file and compute hash
|
# Read file and compute all hashes
|
||||||
content = file.read()
|
content = file.read()
|
||||||
sha256_hash = hashlib.sha256(content).hexdigest()
|
sha256_hash = hashlib.sha256(content).hexdigest()
|
||||||
|
md5_hash = hashlib.md5(content).hexdigest()
|
||||||
|
sha1_hash = hashlib.sha1(content).hexdigest()
|
||||||
size = len(content)
|
size = len(content)
|
||||||
|
|
||||||
# Check if already exists
|
# Check if already exists
|
||||||
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
|
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
|
||||||
|
s3_etag = None
|
||||||
|
|
||||||
if not self._exists(s3_key):
|
if not self._exists(s3_key):
|
||||||
self.client.put_object(
|
response = self.client.put_object(
|
||||||
Bucket=self.bucket,
|
Bucket=self.bucket,
|
||||||
Key=s3_key,
|
Key=s3_key,
|
||||||
Body=content,
|
Body=content,
|
||||||
)
|
)
|
||||||
|
s3_etag = response.get("ETag", "").strip('"')
|
||||||
|
else:
|
||||||
|
# Get existing ETag
|
||||||
|
obj_info = self.get_object_info(s3_key)
|
||||||
|
if obj_info:
|
||||||
|
s3_etag = obj_info.get("etag", "").strip('"')
|
||||||
|
|
||||||
return sha256_hash, size, s3_key
|
return StorageResult(
|
||||||
|
sha256=sha256_hash,
|
||||||
|
size=size,
|
||||||
|
s3_key=s3_key,
|
||||||
|
md5=md5_hash,
|
||||||
|
sha1=sha1_hash,
|
||||||
|
s3_etag=s3_etag,
|
||||||
|
)
|
||||||
|
|
||||||
def _store_multipart(self, file: BinaryIO, content_length: int) -> Tuple[str, int, str]:
|
def _store_multipart(self, file: BinaryIO, content_length: int) -> StorageResult:
|
||||||
"""Store a large file using S3 multipart upload with streaming hash computation"""
|
"""Store a large file using S3 multipart upload with streaming hash computation"""
|
||||||
# First pass: compute hash by streaming through file
|
# First pass: compute all hashes by streaming through file
|
||||||
hasher = hashlib.sha256()
|
sha256_hasher = hashlib.sha256()
|
||||||
|
md5_hasher = hashlib.md5()
|
||||||
|
sha1_hasher = hashlib.sha1()
|
||||||
size = 0
|
size = 0
|
||||||
|
|
||||||
# Read file in chunks to compute hash
|
# Read file in chunks to compute hashes
|
||||||
while True:
|
while True:
|
||||||
chunk = file.read(HASH_CHUNK_SIZE)
|
chunk = file.read(HASH_CHUNK_SIZE)
|
||||||
if not chunk:
|
if not chunk:
|
||||||
break
|
break
|
||||||
hasher.update(chunk)
|
sha256_hasher.update(chunk)
|
||||||
|
md5_hasher.update(chunk)
|
||||||
|
sha1_hasher.update(chunk)
|
||||||
size += len(chunk)
|
size += len(chunk)
|
||||||
|
|
||||||
sha256_hash = hasher.hexdigest()
|
sha256_hash = sha256_hasher.hexdigest()
|
||||||
|
md5_hash = md5_hasher.hexdigest()
|
||||||
|
sha1_hash = sha1_hasher.hexdigest()
|
||||||
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
|
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
|
||||||
|
|
||||||
# Check if already exists (deduplication)
|
# Check if already exists (deduplication)
|
||||||
if self._exists(s3_key):
|
if self._exists(s3_key):
|
||||||
return sha256_hash, size, s3_key
|
obj_info = self.get_object_info(s3_key)
|
||||||
|
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
|
||||||
|
return StorageResult(
|
||||||
|
sha256=sha256_hash,
|
||||||
|
size=size,
|
||||||
|
s3_key=s3_key,
|
||||||
|
md5=md5_hash,
|
||||||
|
sha1=sha1_hash,
|
||||||
|
s3_etag=s3_etag,
|
||||||
|
)
|
||||||
|
|
||||||
# Seek back to start for upload
|
# Seek back to start for upload
|
||||||
file.seek(0)
|
file.seek(0)
|
||||||
@@ -116,14 +157,22 @@ class S3Storage:
|
|||||||
part_number += 1
|
part_number += 1
|
||||||
|
|
||||||
# Complete multipart upload
|
# Complete multipart upload
|
||||||
self.client.complete_multipart_upload(
|
complete_response = self.client.complete_multipart_upload(
|
||||||
Bucket=self.bucket,
|
Bucket=self.bucket,
|
||||||
Key=s3_key,
|
Key=s3_key,
|
||||||
UploadId=upload_id,
|
UploadId=upload_id,
|
||||||
MultipartUpload={"Parts": parts},
|
MultipartUpload={"Parts": parts},
|
||||||
)
|
)
|
||||||
|
s3_etag = complete_response.get("ETag", "").strip('"')
|
||||||
|
|
||||||
return sha256_hash, size, s3_key
|
return StorageResult(
|
||||||
|
sha256=sha256_hash,
|
||||||
|
size=size,
|
||||||
|
s3_key=s3_key,
|
||||||
|
md5=md5_hash,
|
||||||
|
sha1=sha1_hash,
|
||||||
|
s3_etag=s3_etag,
|
||||||
|
)
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Abort multipart upload on failure
|
# Abort multipart upload on failure
|
||||||
@@ -135,33 +184,50 @@ class S3Storage:
|
|||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def store_streaming(self, chunks: Generator[bytes, None, None]) -> Tuple[str, int, str]:
|
def store_streaming(self, chunks: Generator[bytes, None, None]) -> StorageResult:
|
||||||
"""
|
"""
|
||||||
Store a file from a stream of chunks.
|
Store a file from a stream of chunks.
|
||||||
First accumulates to compute hash, then uploads.
|
First accumulates to compute hash, then uploads.
|
||||||
For truly large files, consider using initiate_resumable_upload instead.
|
For truly large files, consider using initiate_resumable_upload instead.
|
||||||
"""
|
"""
|
||||||
# Accumulate chunks and compute hash
|
# Accumulate chunks and compute all hashes
|
||||||
hasher = hashlib.sha256()
|
sha256_hasher = hashlib.sha256()
|
||||||
|
md5_hasher = hashlib.md5()
|
||||||
|
sha1_hasher = hashlib.sha1()
|
||||||
all_chunks = []
|
all_chunks = []
|
||||||
size = 0
|
size = 0
|
||||||
|
|
||||||
for chunk in chunks:
|
for chunk in chunks:
|
||||||
hasher.update(chunk)
|
sha256_hasher.update(chunk)
|
||||||
|
md5_hasher.update(chunk)
|
||||||
|
sha1_hasher.update(chunk)
|
||||||
all_chunks.append(chunk)
|
all_chunks.append(chunk)
|
||||||
size += len(chunk)
|
size += len(chunk)
|
||||||
|
|
||||||
sha256_hash = hasher.hexdigest()
|
sha256_hash = sha256_hasher.hexdigest()
|
||||||
|
md5_hash = md5_hasher.hexdigest()
|
||||||
|
sha1_hash = sha1_hasher.hexdigest()
|
||||||
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
|
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
|
||||||
|
s3_etag = None
|
||||||
|
|
||||||
# Check if already exists
|
# Check if already exists
|
||||||
if self._exists(s3_key):
|
if self._exists(s3_key):
|
||||||
return sha256_hash, size, s3_key
|
obj_info = self.get_object_info(s3_key)
|
||||||
|
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
|
||||||
|
return StorageResult(
|
||||||
|
sha256=sha256_hash,
|
||||||
|
size=size,
|
||||||
|
s3_key=s3_key,
|
||||||
|
md5=md5_hash,
|
||||||
|
sha1=sha1_hash,
|
||||||
|
s3_etag=s3_etag,
|
||||||
|
)
|
||||||
|
|
||||||
# Upload based on size
|
# Upload based on size
|
||||||
if size < MULTIPART_THRESHOLD:
|
if size < MULTIPART_THRESHOLD:
|
||||||
content = b"".join(all_chunks)
|
content = b"".join(all_chunks)
|
||||||
self.client.put_object(Bucket=self.bucket, Key=s3_key, Body=content)
|
response = self.client.put_object(Bucket=self.bucket, Key=s3_key, Body=content)
|
||||||
|
s3_etag = response.get("ETag", "").strip('"')
|
||||||
else:
|
else:
|
||||||
# Use multipart for large files
|
# Use multipart for large files
|
||||||
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
|
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
|
||||||
@@ -205,12 +271,13 @@ class S3Storage:
|
|||||||
"ETag": response["ETag"],
|
"ETag": response["ETag"],
|
||||||
})
|
})
|
||||||
|
|
||||||
self.client.complete_multipart_upload(
|
complete_response = self.client.complete_multipart_upload(
|
||||||
Bucket=self.bucket,
|
Bucket=self.bucket,
|
||||||
Key=s3_key,
|
Key=s3_key,
|
||||||
UploadId=upload_id,
|
UploadId=upload_id,
|
||||||
MultipartUpload={"Parts": parts},
|
MultipartUpload={"Parts": parts},
|
||||||
)
|
)
|
||||||
|
s3_etag = complete_response.get("ETag", "").strip('"')
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Streaming multipart upload failed: {e}")
|
logger.error(f"Streaming multipart upload failed: {e}")
|
||||||
@@ -221,7 +288,14 @@ class S3Storage:
|
|||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
return sha256_hash, size, s3_key
|
return StorageResult(
|
||||||
|
sha256=sha256_hash,
|
||||||
|
size=size,
|
||||||
|
s3_key=s3_key,
|
||||||
|
md5=md5_hash,
|
||||||
|
sha1=sha1_hash,
|
||||||
|
s3_etag=s3_etag,
|
||||||
|
)
|
||||||
|
|
||||||
def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]:
|
def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
|
|||||||
122
docker-compose.local.yml
Normal file
122
docker-compose.local.yml
Normal file
@@ -0,0 +1,122 @@
|
|||||||
|
version: '3.8'
|
||||||
|
|
||||||
|
services:
|
||||||
|
orchard-server:
|
||||||
|
build:
|
||||||
|
context: .
|
||||||
|
dockerfile: Dockerfile.local
|
||||||
|
ports:
|
||||||
|
- "8080:8080"
|
||||||
|
environment:
|
||||||
|
- ORCHARD_SERVER_HOST=0.0.0.0
|
||||||
|
- ORCHARD_SERVER_PORT=8080
|
||||||
|
- ORCHARD_DATABASE_HOST=postgres
|
||||||
|
- ORCHARD_DATABASE_PORT=5432
|
||||||
|
- ORCHARD_DATABASE_USER=orchard
|
||||||
|
- ORCHARD_DATABASE_PASSWORD=orchard_secret
|
||||||
|
- ORCHARD_DATABASE_DBNAME=orchard
|
||||||
|
- ORCHARD_DATABASE_SSLMODE=disable
|
||||||
|
- ORCHARD_S3_ENDPOINT=http://minio:9000
|
||||||
|
- ORCHARD_S3_REGION=us-east-1
|
||||||
|
- ORCHARD_S3_BUCKET=orchard-artifacts
|
||||||
|
- ORCHARD_S3_ACCESS_KEY_ID=minioadmin
|
||||||
|
- ORCHARD_S3_SECRET_ACCESS_KEY=minioadmin
|
||||||
|
- ORCHARD_S3_USE_PATH_STYLE=true
|
||||||
|
- ORCHARD_REDIS_HOST=redis
|
||||||
|
- ORCHARD_REDIS_PORT=6379
|
||||||
|
depends_on:
|
||||||
|
postgres:
|
||||||
|
condition: service_healthy
|
||||||
|
minio:
|
||||||
|
condition: service_healthy
|
||||||
|
redis:
|
||||||
|
condition: service_healthy
|
||||||
|
networks:
|
||||||
|
- orchard-network
|
||||||
|
restart: unless-stopped
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
|
||||||
|
interval: 30s
|
||||||
|
timeout: 3s
|
||||||
|
start_period: 10s
|
||||||
|
retries: 3
|
||||||
|
|
||||||
|
postgres:
|
||||||
|
image: postgres:16-alpine
|
||||||
|
environment:
|
||||||
|
- POSTGRES_USER=orchard
|
||||||
|
- POSTGRES_PASSWORD=orchard_secret
|
||||||
|
- POSTGRES_DB=orchard
|
||||||
|
volumes:
|
||||||
|
- postgres-data-local:/var/lib/postgresql/data
|
||||||
|
- ./migrations:/docker-entrypoint-initdb.d:ro
|
||||||
|
ports:
|
||||||
|
- "5432:5432"
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD-SHELL", "pg_isready -U orchard -d orchard"]
|
||||||
|
interval: 10s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 5
|
||||||
|
networks:
|
||||||
|
- orchard-network
|
||||||
|
restart: unless-stopped
|
||||||
|
|
||||||
|
minio:
|
||||||
|
image: minio/minio:latest
|
||||||
|
command: server /data --console-address ":9001"
|
||||||
|
environment:
|
||||||
|
- MINIO_ROOT_USER=minioadmin
|
||||||
|
- MINIO_ROOT_PASSWORD=minioadmin
|
||||||
|
volumes:
|
||||||
|
- minio-data-local:/data
|
||||||
|
ports:
|
||||||
|
- "9000:9000"
|
||||||
|
- "9001:9001"
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "mc", "ready", "local"]
|
||||||
|
interval: 10s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 5
|
||||||
|
networks:
|
||||||
|
- orchard-network
|
||||||
|
restart: unless-stopped
|
||||||
|
|
||||||
|
minio-init:
|
||||||
|
image: minio/mc:latest
|
||||||
|
depends_on:
|
||||||
|
minio:
|
||||||
|
condition: service_healthy
|
||||||
|
entrypoint: >
|
||||||
|
/bin/sh -c "
|
||||||
|
mc alias set myminio http://minio:9000 minioadmin minioadmin;
|
||||||
|
mc mb myminio/orchard-artifacts --ignore-existing;
|
||||||
|
mc anonymous set download myminio/orchard-artifacts;
|
||||||
|
exit 0;
|
||||||
|
"
|
||||||
|
networks:
|
||||||
|
- orchard-network
|
||||||
|
|
||||||
|
redis:
|
||||||
|
image: redis:7-alpine
|
||||||
|
command: redis-server --appendonly yes
|
||||||
|
volumes:
|
||||||
|
- redis-data-local:/data
|
||||||
|
ports:
|
||||||
|
- "6379:6379"
|
||||||
|
healthcheck:
|
||||||
|
test: ["CMD", "redis-cli", "ping"]
|
||||||
|
interval: 10s
|
||||||
|
timeout: 5s
|
||||||
|
retries: 5
|
||||||
|
networks:
|
||||||
|
- orchard-network
|
||||||
|
restart: unless-stopped
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
postgres-data-local:
|
||||||
|
minio-data-local:
|
||||||
|
redis-data-local:
|
||||||
|
|
||||||
|
networks:
|
||||||
|
orchard-network:
|
||||||
|
driver: bridge
|
||||||
@@ -41,6 +41,8 @@ CREATE TABLE IF NOT EXISTS artifacts (
|
|||||||
content_type VARCHAR(255),
|
content_type VARCHAR(255),
|
||||||
original_name VARCHAR(1024),
|
original_name VARCHAR(1024),
|
||||||
checksum_md5 VARCHAR(32), -- MD5 hash for additional verification
|
checksum_md5 VARCHAR(32), -- MD5 hash for additional verification
|
||||||
|
checksum_sha1 VARCHAR(40), -- SHA1 hash for compatibility
|
||||||
|
s3_etag VARCHAR(64), -- S3 ETag for verification
|
||||||
metadata JSONB, -- format-specific metadata
|
metadata JSONB, -- format-specific metadata
|
||||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||||
created_by VARCHAR(255) NOT NULL,
|
created_by VARCHAR(255) NOT NULL,
|
||||||
@@ -51,6 +53,8 @@ CREATE TABLE IF NOT EXISTS artifacts (
|
|||||||
CREATE INDEX idx_artifacts_created_at ON artifacts(created_at);
|
CREATE INDEX idx_artifacts_created_at ON artifacts(created_at);
|
||||||
CREATE INDEX idx_artifacts_created_by ON artifacts(created_by);
|
CREATE INDEX idx_artifacts_created_by ON artifacts(created_by);
|
||||||
CREATE INDEX idx_artifacts_metadata ON artifacts USING GIN (metadata);
|
CREATE INDEX idx_artifacts_metadata ON artifacts USING GIN (metadata);
|
||||||
|
CREATE INDEX idx_artifacts_checksum_md5 ON artifacts(checksum_md5) WHERE checksum_md5 IS NOT NULL;
|
||||||
|
CREATE INDEX idx_artifacts_checksum_sha1 ON artifacts(checksum_sha1) WHERE checksum_sha1 IS NOT NULL;
|
||||||
|
|
||||||
-- Tags (Aliases pointing to artifacts)
|
-- Tags (Aliases pointing to artifacts)
|
||||||
CREATE TABLE IF NOT EXISTS tags (
|
CREATE TABLE IF NOT EXISTS tags (
|
||||||
|
|||||||
12
migrations/003_checksum_fields.sql
Normal file
12
migrations/003_checksum_fields.sql
Normal file
@@ -0,0 +1,12 @@
|
|||||||
|
-- Migration 003: Additional Checksum Fields
|
||||||
|
-- Adds checksum_sha1 and s3_etag fields to artifacts table
|
||||||
|
|
||||||
|
-- ============================================
|
||||||
|
-- Artifacts: Add checksum_sha1 and s3_etag fields
|
||||||
|
-- ============================================
|
||||||
|
ALTER TABLE artifacts ADD COLUMN IF NOT EXISTS checksum_sha1 VARCHAR(40);
|
||||||
|
ALTER TABLE artifacts ADD COLUMN IF NOT EXISTS s3_etag VARCHAR(64);
|
||||||
|
|
||||||
|
-- Create indexes for checksum lookups (optional, for verification queries)
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_artifacts_checksum_md5 ON artifacts(checksum_md5) WHERE checksum_md5 IS NOT NULL;
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_artifacts_checksum_sha1 ON artifacts(checksum_sha1) WHERE checksum_sha1 IS NOT NULL;
|
||||||
Reference in New Issue
Block a user