74 lines
1.7 KiB
Python
74 lines
1.7 KiB
Python
from abc import ABC, abstractmethod
|
|
from typing import BinaryIO
|
|
|
|
|
|
class StorageBackend(ABC):
|
|
"""Abstract base class for storage backends"""
|
|
|
|
@abstractmethod
|
|
async def upload_file(self, file_data: BinaryIO, object_name: str) -> str:
|
|
"""
|
|
Upload a file to storage
|
|
|
|
Args:
|
|
file_data: Binary file data
|
|
object_name: Name/path of the object in storage
|
|
|
|
Returns:
|
|
Storage path/URL of uploaded file
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def download_file(self, object_name: str) -> bytes:
|
|
"""
|
|
Download a file from storage
|
|
|
|
Args:
|
|
object_name: Name/path of the object in storage
|
|
|
|
Returns:
|
|
Binary file data
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def delete_file(self, object_name: str) -> bool:
|
|
"""
|
|
Delete a file from storage
|
|
|
|
Args:
|
|
object_name: Name/path of the object in storage
|
|
|
|
Returns:
|
|
True if successful
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def file_exists(self, object_name: str) -> bool:
|
|
"""
|
|
Check if a file exists in storage
|
|
|
|
Args:
|
|
object_name: Name/path of the object in storage
|
|
|
|
Returns:
|
|
True if file exists
|
|
"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
async def get_file_url(self, object_name: str, expiration: int = 3600) -> str:
|
|
"""
|
|
Get a presigned URL for downloading a file
|
|
|
|
Args:
|
|
object_name: Name/path of the object in storage
|
|
expiration: URL expiration time in seconds
|
|
|
|
Returns:
|
|
Presigned URL
|
|
"""
|
|
pass
|