Initial commit: Orchard content-addressable storage system
- Go server with Gin framework - PostgreSQL for metadata storage - MinIO/S3 for artifact storage with SHA256 content addressing - REST API for grove/tree/fruit operations - Web UI for managing artifacts - Docker Compose setup for local development
This commit is contained in:
158
internal/storage/s3.go
Normal file
158
internal/storage/s3.go
Normal file
@@ -0,0 +1,158 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
orchardconfig "github.com/bsf/orchard/internal/config"
|
||||
)
|
||||
|
||||
// S3Storage implements content-addressable storage using S3
|
||||
type S3Storage struct {
|
||||
client *s3.Client
|
||||
bucket string
|
||||
}
|
||||
|
||||
// NewS3Storage creates a new S3 storage backend
|
||||
func NewS3Storage(cfg *orchardconfig.S3Config) (*S3Storage, error) {
|
||||
customResolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
|
||||
if cfg.Endpoint != "" {
|
||||
return aws.Endpoint{
|
||||
URL: cfg.Endpoint,
|
||||
HostnameImmutable: true,
|
||||
}, nil
|
||||
}
|
||||
return aws.Endpoint{}, &aws.EndpointNotFoundError{}
|
||||
})
|
||||
|
||||
awsCfg, err := config.LoadDefaultConfig(context.Background(),
|
||||
config.WithRegion(cfg.Region),
|
||||
config.WithEndpointResolverWithOptions(customResolver),
|
||||
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
|
||||
cfg.AccessKeyID,
|
||||
cfg.SecretAccessKey,
|
||||
"",
|
||||
)),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load AWS config: %w", err)
|
||||
}
|
||||
|
||||
client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
|
||||
o.UsePathStyle = cfg.UsePathStyle
|
||||
})
|
||||
|
||||
return &S3Storage{
|
||||
client: client,
|
||||
bucket: cfg.Bucket,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// hashToKey converts a SHA256 hash to an S3 object key
|
||||
// Uses directory structure: ab/cd/abcdef123...
|
||||
func hashToKey(hash string) string {
|
||||
if len(hash) < 4 {
|
||||
return hash
|
||||
}
|
||||
return fmt.Sprintf("%s/%s/%s", hash[:2], hash[2:4], hash)
|
||||
}
|
||||
|
||||
// Store uploads content to S3 and returns the SHA256 hash
|
||||
func (s *S3Storage) Store(ctx context.Context, reader io.Reader) (string, int64, error) {
|
||||
// Read all content into memory to compute hash and enable seeking
|
||||
data, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return "", 0, fmt.Errorf("failed to read content: %w", err)
|
||||
}
|
||||
|
||||
size := int64(len(data))
|
||||
|
||||
// Compute SHA256 hash
|
||||
hasher := sha256.New()
|
||||
hasher.Write(data)
|
||||
hash := hex.EncodeToString(hasher.Sum(nil))
|
||||
finalKey := hashToKey(hash)
|
||||
|
||||
// Check if object already exists (deduplication)
|
||||
_, err = s.client.HeadObject(ctx, &s3.HeadObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(finalKey),
|
||||
})
|
||||
if err == nil {
|
||||
// Object already exists, return existing hash (deduplication)
|
||||
return hash, size, nil
|
||||
}
|
||||
|
||||
// Upload to final location
|
||||
_, err = s.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(finalKey),
|
||||
Body: bytes.NewReader(data),
|
||||
ContentLength: aws.Int64(size),
|
||||
})
|
||||
if err != nil {
|
||||
return "", 0, fmt.Errorf("failed to upload to S3: %w", err)
|
||||
}
|
||||
|
||||
return hash, size, nil
|
||||
}
|
||||
|
||||
// Retrieve downloads content by SHA256 hash
|
||||
func (s *S3Storage) Retrieve(ctx context.Context, hash string) (io.ReadCloser, error) {
|
||||
key := hashToKey(hash)
|
||||
|
||||
result, err := s.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to retrieve object: %w", err)
|
||||
}
|
||||
|
||||
return result.Body, nil
|
||||
}
|
||||
|
||||
// Exists checks if a fruit exists by hash
|
||||
func (s *S3Storage) Exists(ctx context.Context, hash string) (bool, error) {
|
||||
key := hashToKey(hash)
|
||||
|
||||
_, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// GetSize returns the size of a stored object
|
||||
func (s *S3Storage) GetSize(ctx context.Context, hash string) (int64, error) {
|
||||
key := hashToKey(hash)
|
||||
|
||||
result, err := s.client.HeadObject(ctx, &s3.HeadObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(key),
|
||||
})
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("failed to get object metadata: %w", err)
|
||||
}
|
||||
|
||||
return *result.ContentLength, nil
|
||||
}
|
||||
|
||||
// generateTempID creates a unique temporary ID
|
||||
func generateTempID() string {
|
||||
b := make([]byte, 16)
|
||||
rand.Read(b)
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
Reference in New Issue
Block a user