Cleanup: improve pod naming, remove dead code, update docs
This commit is contained in:
@@ -6,7 +6,6 @@ from typing import (
|
||||
Optional,
|
||||
Dict,
|
||||
Any,
|
||||
Generator,
|
||||
NamedTuple,
|
||||
Protocol,
|
||||
runtime_checkable,
|
||||
@@ -511,127 +510,6 @@ class S3Storage:
|
||||
)
|
||||
raise
|
||||
|
||||
def store_streaming(self, chunks: Generator[bytes, None, None]) -> StorageResult:
|
||||
"""
|
||||
Store a file from a stream of chunks.
|
||||
First accumulates to compute hash, then uploads.
|
||||
For truly large files, consider using initiate_resumable_upload instead.
|
||||
"""
|
||||
# Accumulate chunks and compute all hashes
|
||||
sha256_hasher = hashlib.sha256()
|
||||
md5_hasher = hashlib.md5()
|
||||
sha1_hasher = hashlib.sha1()
|
||||
all_chunks = []
|
||||
size = 0
|
||||
|
||||
for chunk in chunks:
|
||||
sha256_hasher.update(chunk)
|
||||
md5_hasher.update(chunk)
|
||||
sha1_hasher.update(chunk)
|
||||
all_chunks.append(chunk)
|
||||
size += len(chunk)
|
||||
|
||||
sha256_hash = sha256_hasher.hexdigest()
|
||||
md5_hash = md5_hasher.hexdigest()
|
||||
sha1_hash = sha1_hasher.hexdigest()
|
||||
s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}"
|
||||
s3_etag = None
|
||||
|
||||
# Check if already exists
|
||||
if self._exists(s3_key):
|
||||
obj_info = self.get_object_info(s3_key)
|
||||
s3_etag = obj_info.get("etag", "").strip('"') if obj_info else None
|
||||
return StorageResult(
|
||||
sha256=sha256_hash,
|
||||
size=size,
|
||||
s3_key=s3_key,
|
||||
md5=md5_hash,
|
||||
sha1=sha1_hash,
|
||||
s3_etag=s3_etag,
|
||||
already_existed=True,
|
||||
)
|
||||
|
||||
# Upload based on size
|
||||
if size < MULTIPART_THRESHOLD:
|
||||
content = b"".join(all_chunks)
|
||||
response = self.client.put_object(
|
||||
Bucket=self.bucket, Key=s3_key, Body=content
|
||||
)
|
||||
s3_etag = response.get("ETag", "").strip('"')
|
||||
else:
|
||||
# Use multipart for large files
|
||||
mpu = self.client.create_multipart_upload(Bucket=self.bucket, Key=s3_key)
|
||||
upload_id = mpu["UploadId"]
|
||||
|
||||
try:
|
||||
parts = []
|
||||
part_number = 1
|
||||
buffer = b""
|
||||
|
||||
for chunk in all_chunks:
|
||||
buffer += chunk
|
||||
while len(buffer) >= MULTIPART_CHUNK_SIZE:
|
||||
part_data = buffer[:MULTIPART_CHUNK_SIZE]
|
||||
buffer = buffer[MULTIPART_CHUNK_SIZE:]
|
||||
|
||||
response = self.client.upload_part(
|
||||
Bucket=self.bucket,
|
||||
Key=s3_key,
|
||||
UploadId=upload_id,
|
||||
PartNumber=part_number,
|
||||
Body=part_data,
|
||||
)
|
||||
parts.append(
|
||||
{
|
||||
"PartNumber": part_number,
|
||||
"ETag": response["ETag"],
|
||||
}
|
||||
)
|
||||
part_number += 1
|
||||
|
||||
# Upload remaining buffer
|
||||
if buffer:
|
||||
response = self.client.upload_part(
|
||||
Bucket=self.bucket,
|
||||
Key=s3_key,
|
||||
UploadId=upload_id,
|
||||
PartNumber=part_number,
|
||||
Body=buffer,
|
||||
)
|
||||
parts.append(
|
||||
{
|
||||
"PartNumber": part_number,
|
||||
"ETag": response["ETag"],
|
||||
}
|
||||
)
|
||||
|
||||
complete_response = self.client.complete_multipart_upload(
|
||||
Bucket=self.bucket,
|
||||
Key=s3_key,
|
||||
UploadId=upload_id,
|
||||
MultipartUpload={"Parts": parts},
|
||||
)
|
||||
s3_etag = complete_response.get("ETag", "").strip('"')
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Streaming multipart upload failed: {e}")
|
||||
self.client.abort_multipart_upload(
|
||||
Bucket=self.bucket,
|
||||
Key=s3_key,
|
||||
UploadId=upload_id,
|
||||
)
|
||||
raise
|
||||
|
||||
return StorageResult(
|
||||
sha256=sha256_hash,
|
||||
size=size,
|
||||
s3_key=s3_key,
|
||||
md5=md5_hash,
|
||||
sha1=sha1_hash,
|
||||
s3_etag=s3_etag,
|
||||
already_existed=False,
|
||||
)
|
||||
|
||||
def initiate_resumable_upload(self, expected_hash: str) -> Dict[str, Any]:
|
||||
"""
|
||||
Initiate a resumable upload session.
|
||||
|
||||
Reference in New Issue
Block a user