Store SHA256 checksums with artifacts and add multiple hash support
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
import hashlib
|
||||
import logging
|
||||
from typing import BinaryIO, Tuple, Optional, Dict, Any, Generator
|
||||
from typing import BinaryIO, Tuple, Optional, Dict, Any, Generator, NamedTuple
|
||||
import boto3
|
||||
from botocore.config import Config
|
||||
from botocore.exceptions import ClientError
|
||||
@@ -18,6 +18,16 @@ MULTIPART_CHUNK_SIZE = 10 * 1024 * 1024
|
||||
HASH_CHUNK_SIZE = 8 * 1024 * 1024
|
||||
|
||||
|
||||
class StorageResult(NamedTuple):
|
||||
"""Result of storing a file with all computed checksums"""
|
||||
sha256: str
|
||||
size: int
|
||||
s3_key: str
|
||||
md5: Optional[str] = None
|
||||
sha1: Optional[str] = None
|
||||
s3_etag: Optional[str] = None
|
||||
|
||||
|
||||
class S3Storage:
|
||||
def __init__(self):
|
||||
config = Config(s3={"addressing_style": "path"} if settings.s3_use_path_style else {})
|
||||
@@ -34,9 +44,9 @@ class S3Storage:
|
||||
# Store active multipart uploads for resumable support
|
||||
self._active_uploads: Dict[str, Dict[str, Any]] = {}
|
||||
|
||||
def store(self, file: BinaryIO, content_length: Optional[int] = None) -> Tuple[str, int, str]:
|
||||
def store(self, file: BinaryIO, content_length: Optional[int] = None) -> StorageResult:
|
||||
"""
|
||||
Store a file and return its SHA256 hash, size, and s3_key.
|
||||
Store a file and return StorageResult with all checksums.
|
||||
Content-addressable: if the file already exists, just return the hash.
|
||||
Uses multipart upload for files larger than MULTIPART_THRESHOLD.
|
||||
"""
|
||||
@@ -46,45 +56,76 @@ class S3Storage:
|
||||
else:
|
||||
return self._store_multipart(file, content_length)
|
||||
|
||||
def _store_simple(self, file: BinaryIO) -> Tuple[str, int, str]:
|
||||
def _store_simple(self, file: BinaryIO) -> StorageResult:
|
||||
"""Store a small file using simple put_object"""
|
||||
# Read file and compute hash
|
||||
# Read file and compute all hashes
|
||||
content = file.read()
|
||||
sha256_hash = hashlib.sha256(content).hexdigest()
|
||||
md5_hash = hashlib.md5(content).hexdigest()
|
||||
sha1_hash = hashlib.sha1(content).hexdigest()
|
||||
size = len(content)
|
||||
|
||||
# Check if already exists
|
||||
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
|
||||
s3_etag = None
|
||||
|
||||
if not self._exists(s3_key):
|
||||
self.client.put_object(
|
||||
response = self.client.put_object(
|
||||
Bucket=self.bucket,
|
||||
Key=s3_key,
|
||||
Body=content,
|
||||
)
|
||||
s3_etag = response.get("ETag", "").strip('"')
|
||||
else:
|
||||
# Get existing ETag
|
||||
obj_info = self.get_object_info(s3_key)
|
||||
if obj_info:
|
||||
s3_etag = obj_info.get("etag", "").strip('"')
|
||||
|
||||
return sha256_hash, size, s3_key
|
||||
return StorageResult(
|
||||
sha256=sha256_hash,
|
||||
size=size,
|
||||
s3_key=s3_key,
|
||||
md5=md5_hash,
|
||||
sha1=sha1_hash,
|
||||
s3_etag=s3_etag,
|
||||
)
|
||||
|
||||
def _store_multipart(self, file: BinaryIO, content_length: int) -> Tuple[str, int, str]:
|
||||
def _store_multipart(self, file: BinaryIO, content_length: int) -> StorageResult:
|
||||
"""Store a large file using S3 multipart upload with streaming hash computation"""
|
||||
# First pass: compute hash by streaming through file
|
||||
hasher = hashlib.sha256()
|
||||
# First pass: compute all hashes by streaming through file
|
||||
sha256_hasher = hashlib.sha256()
|
||||
md5_hasher = hashlib.md5()
|
||||
sha1_hasher = hashlib.sha1()
|
||||
size = 0
|
||||
|
||||
# Read file in chunks to compute hash
|
||||
# Read file in chunks to compute hashes
|
||||
while True:
|
||||
chunk = file.read(HASH_CHUNK_SIZE)
|
||||
if not chunk:
|
||||
break
|
||||
hasher.update(chunk)
|
||||
sha256_hasher.update(chunk)
|
||||
md5_hasher.update(chunk)
|
||||
sha1_hasher.update(chunk)
|
||||
size += len(chunk)
|
||||
|
||||
sha256_hash = hasher.hexdigest()
|
||||
sha256_hash = sha256_hasher.hexdigest()
|
||||
md5_hash = md5_hasher.hexdigest()
|
||||
sha1_hash = sha1_hasher.hexdigest()
|
||||
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
|
||||
|
||||
# Check if already exists (deduplication)
|
||||
if self._exists(s3_key):
|
||||
return sha256_hash, size, s3_key
|
||||
obj_info = self.get_object_info(s3_key)
|
||||
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
|
||||
return StorageResult(
|
||||
sha256=sha256_hash,
|
||||
size=size,
|
||||
s3_key=s3_key,
|
||||
md5=md5_hash,
|
||||
sha1=sha1_hash,
|
||||
s3_etag=s3_etag,
|
||||
)
|
||||
|
||||
# Seek back to start for upload
|
||||
file.seek(0)
|
||||
@@ -116,14 +157,22 @@ class S3Storage:
|
||||
part_number += 1
|
||||
|
||||
# Complete multipart upload
|
||||
self.client.complete_multipart_upload(
|
||||
complete_response = self.client.complete_multipart_upload(
|
||||
Bucket=self.bucket,
|
||||
Key=s3_key,
|
||||
UploadId=upload_id,
|
||||
MultipartUpload={"Parts": parts},
|
||||
)
|
||||
s3_etag = complete_response.get("ETag", "").strip('"')
|
||||
|
||||
return sha256_hash, size, s3_key
|
||||
return StorageResult(
|
||||
sha256=sha256_hash,
|
||||
size=size,
|
||||
s3_key=s3_key,
|
||||
md5=md5_hash,
|
||||
sha1=sha1_hash,
|
||||
s3_etag=s3_etag,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# Abort multipart upload on failure
|
||||
@@ -135,33 +184,50 @@ class S3Storage:
|
||||
)
|
||||
raise
|
||||
|
||||
def store_streaming(self, chunks: Generator[bytes, None, None]) -> Tuple[str, int, str]:
|
||||
def store_streaming(self, chunks: Generator[bytes, None, None]) -> StorageResult:
|
||||
"""
|
||||
Store a file from a stream of chunks.
|
||||
First accumulates to compute hash, then uploads.
|
||||
For truly large files, consider using initiate_resumable_upload instead.
|
||||
"""
|
||||
# Accumulate chunks and compute hash
|
||||
hasher = hashlib.sha256()
|
||||
# Accumulate chunks and compute all hashes
|
||||
sha256_hasher = hashlib.sha256()
|
||||
md5_hasher = hashlib.md5()
|
||||
sha1_hasher = hashlib.sha1()
|
||||
all_chunks = []
|
||||
size = 0
|
||||
|
||||
for chunk in chunks:
|
||||
hasher.update(chunk)
|
||||
sha256_hasher.update(chunk)
|
||||
md5_hasher.update(chunk)
|
||||
sha1_hasher.update(chunk)
|
||||
all_chunks.append(chunk)
|
||||
size += len(chunk)
|
||||
|
||||
sha256_hash = hasher.hexdigest()
|
||||
sha256_hash = sha256_hasher.hexdigest()
|
||||
md5_hash = md5_hasher.hexdigest()
|
||||
sha1_hash = sha1_hasher.hexdigest()
|
||||
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
|
||||
s3_etag = None
|
||||
|
||||
# Check if already exists
|
||||
if self._exists(s3_key):
|
||||
return sha256_hash, size, s3_key
|
||||
obj_info = self.get_object_info(s3_key)
|
||||
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
|
||||
return StorageResult(
|
||||
sha256=sha256_hash,
|
||||
size=size,
|
||||
s3_key=s3_key,
|
||||
md5=md5_hash,
|
||||
sha1=sha1_hash,
|
||||
s3_etag=s3_etag,
|
||||
)
|
||||
|
||||
# Upload based on size
|
||||
if size < MULTIPART_THRESHOLD:
|
||||
content = b"".join(all_chunks)
|
||||
self.client.put_object(Bucket=self.bucket, Key=s3_key, Body=content)
|
||||
response = self.client.put_object(Bucket=self.bucket, Key=s3_key, Body=content)
|
||||
s3_etag = response.get("ETag", "").strip('"')
|
||||
else:
|
||||
# Use multipart for large files
|
||||
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
|
||||
@@ -205,12 +271,13 @@ class S3Storage:
|
||||
"ETag": response["ETag"],
|
||||
})
|
||||
|
||||
self.client.complete_multipart_upload(
|
||||
complete_response = self.client.complete_multipart_upload(
|
||||
Bucket=self.bucket,
|
||||
Key=s3_key,
|
||||
UploadId=upload_id,
|
||||
MultipartUpload={"Parts": parts},
|
||||
)
|
||||
s3_etag = complete_response.get("ETag", "").strip('"')
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Streaming multipart upload failed: {e}")
|
||||
@@ -221,7 +288,14 @@ class S3Storage:
|
||||
)
|
||||
raise
|
||||
|
||||
return sha256_hash, size, s3_key
|
||||
return StorageResult(
|
||||
sha256=sha256_hash,
|
||||
size=size,
|
||||
s3_key=s3_key,
|
||||
md5=md5_hash,
|
||||
sha1=sha1_hash,
|
||||
s3_etag=s3_etag,
|
||||
)
|
||||
|
||||
def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user