Files
warehouse13/app/storage/s3_backend.py
2025-10-14 15:37:37 -05:00

88 lines
3.2 KiB
Python

import boto3
from botocore.exceptions import ClientError
from typing import BinaryIO
from app.storage.base import StorageBackend
from app.config import settings
import logging
logger = logging.getLogger(__name__)
class S3Backend(StorageBackend):
"""AWS S3 storage backend implementation"""
def __init__(self):
self.s3_client = boto3.client(
's3',
aws_access_key_id=settings.aws_access_key_id,
aws_secret_access_key=settings.aws_secret_access_key,
region_name=settings.aws_region
)
self.bucket_name = settings.s3_bucket_name
self._ensure_bucket_exists()
def _ensure_bucket_exists(self):
"""Create bucket if it doesn't exist"""
try:
self.s3_client.head_bucket(Bucket=self.bucket_name)
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == '404':
try:
self.s3_client.create_bucket(
Bucket=self.bucket_name,
CreateBucketConfiguration={'LocationConstraint': settings.aws_region}
)
logger.info(f"Created S3 bucket: {self.bucket_name}")
except ClientError as create_error:
logger.error(f"Failed to create bucket: {create_error}")
raise
async def upload_file(self, file_data: BinaryIO, object_name: str) -> str:
"""Upload file to S3"""
try:
self.s3_client.upload_fileobj(file_data, self.bucket_name, object_name)
return f"s3://{self.bucket_name}/{object_name}"
except ClientError as e:
logger.error(f"Failed to upload file to S3: {e}")
raise
async def download_file(self, object_name: str) -> bytes:
"""Download file from S3"""
try:
response = self.s3_client.get_object(Bucket=self.bucket_name, Key=object_name)
return response['Body'].read()
except ClientError as e:
logger.error(f"Failed to download file from S3: {e}")
raise
async def delete_file(self, object_name: str) -> bool:
"""Delete file from S3"""
try:
self.s3_client.delete_object(Bucket=self.bucket_name, Key=object_name)
return True
except ClientError as e:
logger.error(f"Failed to delete file from S3: {e}")
return False
async def file_exists(self, object_name: str) -> bool:
"""Check if file exists in S3"""
try:
self.s3_client.head_object(Bucket=self.bucket_name, Key=object_name)
return True
except ClientError:
return False
async def get_file_url(self, object_name: str, expiration: int = 3600) -> str:
"""Generate presigned URL for S3 object"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': object_name},
ExpiresIn=expiration
)
return url
except ClientError as e:
logger.error(f"Failed to generate presigned URL: {e}")
raise