import hashlib from typing import BinaryIO, Tuple import boto3 from botocore.config import Config from botocore.exceptions import ClientError from .config import get_settings settings = get_settings() class S3Storage: def __init__(self): config = Config(s3={"addressing_style": "path"} if settings.s3_use_path_style else {}) self.client = boto3.client( "s3", endpoint_url=settings.s3_endpoint if settings.s3_endpoint else None, region_name=settings.s3_region, aws_access_key_id=settings.s3_access_key_id, aws_secret_access_key=settings.s3_secret_access_key, config=config, ) self.bucket = settings.s3_bucket def store(self, file: BinaryIO) -> Tuple[str, int]: """ Store a file and return its SHA256 hash and size. Content-addressable: if the file already exists, just return the hash. """ # Read file and compute hash content = file.read() sha256_hash = hashlib.sha256(content).hexdigest() size = len(content) # Check if already exists s3_key = f"fruits/{sha256_hash[:2]}/{sha256_hash[2:4]}/{sha256_hash}" if not self._exists(s3_key): self.client.put_object( Bucket=self.bucket, Key=s3_key, Body=content, ) return sha256_hash, size, s3_key def get(self, s3_key: str) -> bytes: """Retrieve a file by its S3 key""" response = self.client.get_object(Bucket=self.bucket, Key=s3_key) return response["Body"].read() def get_stream(self, s3_key: str): """Get a streaming response for a file""" response = self.client.get_object(Bucket=self.bucket, Key=s3_key) return response["Body"] def _exists(self, s3_key: str) -> bool: """Check if an object exists""" try: self.client.head_object(Bucket=self.bucket, Key=s3_key) return True except ClientError: return False def delete(self, s3_key: str) -> bool: """Delete an object""" try: self.client.delete_object(Bucket=self.bucket, Key=s3_key) return True except ClientError: return False # Singleton instance _storage = None def get_storage() -> S3Storage: global _storage if _storage is None: _storage = S3Storage() return _storage