feat(lib/store): add s3api storage backend (#1089)
* feat(lib/store): add s3api storage backend Signed-off-by: Xe Iaso <me@xeiaso.net> * docs(store/s3api): replace fake S3 API keys with the bee movie script Signed-off-by: Xe Iaso <me@xeiaso.net> * docs(store/s3api): fix spelling sin Signed-off-by: Xe Iaso <me@xeiaso.net> * fix(store/s3api): remove vestigal experiment Signed-off-by: Xe Iaso <me@xeiaso.net> * chore: spelling Signed-off-by: Xe Iaso <me@xeiaso.net> * chore: spelling Signed-off-by: Xe Iaso <me@xeiaso.net> * chore(store/s3api): support IsPersistent call Ref #1088 Signed-off-by: Xe Iaso <me@xeiaso.net> * chore: spelling Signed-off-by: Xe Iaso <me@xeiaso.net> * chore(test): go mod tidy Signed-off-by: Xe Iaso <me@xeiaso.net> --------- Signed-off-by: Xe Iaso <me@xeiaso.net>
This commit is contained in:
parent
82099d9e05
commit
98945fb56f
12 changed files with 518 additions and 5 deletions
|
|
@ -6,5 +6,6 @@ package all
|
|||
import (
|
||||
_ "github.com/TecharoHQ/anubis/lib/store/bbolt"
|
||||
_ "github.com/TecharoHQ/anubis/lib/store/memory"
|
||||
_ "github.com/TecharoHQ/anubis/lib/store/s3api"
|
||||
_ "github.com/TecharoHQ/anubis/lib/store/valkey"
|
||||
)
|
||||
|
|
|
|||
107
lib/store/s3api/factory.go
Normal file
107
lib/store/s3api/factory.go
Normal file
|
|
@ -0,0 +1,107 @@
|
|||
package s3api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
awsConfig "github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNoRegion = errors.New("s3api.Config: no region env var name defined")
|
||||
ErrNoAccessKeyID = errors.New("s3api.Config: no access key id env var name defined")
|
||||
ErrNoSecretAccessKey = errors.New("s3api.Config: no secret access key env var name defined")
|
||||
ErrNoBucketName = errors.New("s3api.Config: no bucket name env var name defined")
|
||||
)
|
||||
|
||||
func init() {
|
||||
store.Register("s3api", Factory{})
|
||||
}
|
||||
|
||||
// S3API is the subset of the AWS S3 client used by this store. It enables mocking in tests.
|
||||
type S3API interface {
|
||||
PutObject(ctx context.Context, params *s3.PutObjectInput, optFns ...func(*s3.Options)) (*s3.PutObjectOutput, error)
|
||||
GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
|
||||
DeleteObject(ctx context.Context, params *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error)
|
||||
HeadObject(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error)
|
||||
}
|
||||
|
||||
// Factory builds an S3-backed store. Tests can inject a Mock via Client.
|
||||
// Factory can optionally carry a preconstructed S3 client (e.g., a mock in tests).
|
||||
type Factory struct {
|
||||
Client S3API
|
||||
}
|
||||
|
||||
func (f Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface, error) {
|
||||
var config Config
|
||||
|
||||
if err := json.Unmarshal([]byte(data), &config); err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
if err := config.Valid(); err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
if config.BucketName == "" {
|
||||
return nil, fmt.Errorf("%w: %s", store.ErrBadConfig, ErrNoBucketName)
|
||||
}
|
||||
|
||||
// If a client was injected (e.g., tests), use it directly.
|
||||
if f.Client != nil {
|
||||
return &Store{
|
||||
s3: f.Client,
|
||||
bucket: config.BucketName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
cfg, err := awsConfig.LoadDefaultConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't load AWS config from environment: %w", err)
|
||||
}
|
||||
|
||||
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
|
||||
o.UsePathStyle = config.PathStyle
|
||||
})
|
||||
|
||||
return &Store{
|
||||
s3: client,
|
||||
bucket: config.BucketName,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (Factory) Valid(data json.RawMessage) error {
|
||||
var config Config
|
||||
if err := json.Unmarshal([]byte(data), &config); err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
if err := config.Valid(); err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
PathStyle bool `json:"pathStyle"`
|
||||
BucketName string `json:"bucketName"`
|
||||
}
|
||||
|
||||
func (c Config) Valid() error {
|
||||
var errs []error
|
||||
|
||||
if c.BucketName == "" {
|
||||
errs = append(errs, ErrNoBucketName)
|
||||
}
|
||||
|
||||
if len(errs) != 0 {
|
||||
return fmt.Errorf("s3api.Config: invalid config: %w", errors.Join(errs...))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
78
lib/store/s3api/s3api.go
Normal file
78
lib/store/s3api/s3api.go
Normal file
|
|
@ -0,0 +1,78 @@
|
|||
package s3api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
s3 S3API
|
||||
bucket string
|
||||
}
|
||||
|
||||
func (s *Store) Delete(ctx context.Context, key string) error {
|
||||
normKey := strings.ReplaceAll(key, ":", "/")
|
||||
// Emulate not found by probing first.
|
||||
if _, err := s.s3.HeadObject(ctx, &s3.HeadObjectInput{Bucket: &s.bucket, Key: &normKey}); err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrNotFound, err)
|
||||
}
|
||||
if _, err := s.s3.DeleteObject(ctx, &s3.DeleteObjectInput{Bucket: &s.bucket, Key: &normKey}); err != nil {
|
||||
return fmt.Errorf("can't delete from s3: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
normKey := strings.ReplaceAll(key, ":", "/")
|
||||
out, err := s.s3.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: &s.bucket,
|
||||
Key: &normKey,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", store.ErrNotFound, err)
|
||||
}
|
||||
defer out.Body.Close()
|
||||
if msStr, ok := out.Metadata["x-anubis-expiry-ms"]; ok && msStr != "" {
|
||||
if ms, err := strconv.ParseInt(msStr, 10, 64); err == nil {
|
||||
if time.Now().UnixMilli() >= ms {
|
||||
_, _ = s.s3.DeleteObject(ctx, &s3.DeleteObjectInput{Bucket: &s.bucket, Key: &normKey})
|
||||
return nil, store.ErrNotFound
|
||||
}
|
||||
}
|
||||
}
|
||||
b, err := io.ReadAll(out.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't read s3 object: %w", err)
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func (s *Store) Set(ctx context.Context, key string, value []byte, expiry time.Duration) error {
|
||||
normKey := strings.ReplaceAll(key, ":", "/")
|
||||
// S3 has no native TTL; we store object with metadata X-Anubis-Expiry as epoch seconds.
|
||||
var meta map[string]string
|
||||
if expiry > 0 {
|
||||
exp := time.Now().Add(expiry).UnixMilli()
|
||||
meta = map[string]string{"x-anubis-expiry-ms": fmt.Sprintf("%d", exp)}
|
||||
}
|
||||
_, err := s.s3.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: &s.bucket,
|
||||
Key: &normKey,
|
||||
Body: bytes.NewReader(value),
|
||||
Metadata: meta,
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't put s3 object: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (Store) IsPersistent() bool { return true }
|
||||
140
lib/store/s3api/s3api_test.go
Normal file
140
lib/store/s3api/s3api_test.go
Normal file
|
|
@ -0,0 +1,140 @@
|
|||
package s3api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store/storetest"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
)
|
||||
|
||||
// mockS3 is an in-memory mock of the methods we use.
|
||||
type mockS3 struct {
|
||||
mu sync.RWMutex
|
||||
bucket string
|
||||
data map[string][]byte
|
||||
meta map[string]map[string]string
|
||||
}
|
||||
|
||||
func (m *mockS3) PutObject(ctx context.Context, in *s3.PutObjectInput, _ ...func(*s3.Options)) (*s3.PutObjectOutput, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if m.data == nil {
|
||||
m.data = map[string][]byte{}
|
||||
}
|
||||
if m.meta == nil {
|
||||
m.meta = map[string]map[string]string{}
|
||||
}
|
||||
b, _ := io.ReadAll(in.Body)
|
||||
m.data[aws.ToString(in.Key)] = bytes.Clone(b)
|
||||
if in.Metadata != nil {
|
||||
m.meta[aws.ToString(in.Key)] = map[string]string{}
|
||||
for k, v := range in.Metadata {
|
||||
m.meta[aws.ToString(in.Key)][k] = v
|
||||
}
|
||||
}
|
||||
m.bucket = aws.ToString(in.Bucket)
|
||||
return &s3.PutObjectOutput{}, nil
|
||||
}
|
||||
|
||||
func (m *mockS3) GetObject(ctx context.Context, in *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
b, ok := m.data[aws.ToString(in.Key)]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("not found")
|
||||
}
|
||||
out := &s3.GetObjectOutput{Body: io.NopCloser(bytes.NewReader(b))}
|
||||
if md, ok := m.meta[aws.ToString(in.Key)]; ok {
|
||||
out.Metadata = md
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (m *mockS3) DeleteObject(ctx context.Context, in *s3.DeleteObjectInput, _ ...func(*s3.Options)) (*s3.DeleteObjectOutput, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
delete(m.data, aws.ToString(in.Key))
|
||||
delete(m.meta, aws.ToString(in.Key))
|
||||
return &s3.DeleteObjectOutput{}, nil
|
||||
}
|
||||
|
||||
func (m *mockS3) HeadObject(ctx context.Context, in *s3.HeadObjectInput, _ ...func(*s3.Options)) (*s3.HeadObjectOutput, error) {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
if _, ok := m.data[aws.ToString(in.Key)]; !ok {
|
||||
return nil, fmt.Errorf("not found")
|
||||
}
|
||||
return &s3.HeadObjectOutput{}, nil
|
||||
}
|
||||
|
||||
func TestImpl(t *testing.T) {
|
||||
mock := &mockS3{}
|
||||
f := Factory{Client: mock}
|
||||
|
||||
data, _ := json.Marshal(Config{
|
||||
BucketName: "bucket",
|
||||
})
|
||||
|
||||
storetest.Common(t, f, json.RawMessage(data))
|
||||
}
|
||||
|
||||
func TestKeyNormalization(t *testing.T) {
|
||||
mock := &mockS3{}
|
||||
f := Factory{Client: mock}
|
||||
|
||||
data, _ := json.Marshal(Config{
|
||||
BucketName: "anubis",
|
||||
})
|
||||
|
||||
s, err := f.Build(t.Context(), json.RawMessage(data))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
key := "a:b:c"
|
||||
val := []byte("value")
|
||||
if err := s.Set(t.Context(), key, val, 0); err != nil {
|
||||
t.Fatalf("Set failed: %v", err)
|
||||
}
|
||||
// Ensure mock saw normalized key
|
||||
mock.mu.RLock()
|
||||
_, hasRaw := mock.data["a:b:c"]
|
||||
got, hasNorm := mock.data["a/b/c"]
|
||||
mock.mu.RUnlock()
|
||||
if hasRaw {
|
||||
t.Fatalf("mock contains raw key with colon; normalization failed")
|
||||
}
|
||||
if !hasNorm || !bytes.Equal(got, val) {
|
||||
t.Fatalf("normalized key missing or wrong value: got=%q", string(got))
|
||||
}
|
||||
|
||||
// Get using colon key should work
|
||||
out, err := s.Get(t.Context(), key)
|
||||
if err != nil {
|
||||
t.Fatalf("Get failed: %v", err)
|
||||
}
|
||||
if !bytes.Equal(out, val) {
|
||||
t.Fatalf("Get returned wrong value: got=%q", string(out))
|
||||
}
|
||||
|
||||
// Delete using colon key should delete normalized object
|
||||
if err := s.Delete(t.Context(), key); err != nil {
|
||||
t.Fatalf("Delete failed: %v", err)
|
||||
}
|
||||
// Give any async cleanup in tests a tick (not needed for mock, but harmless)
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
mock.mu.RLock()
|
||||
_, exists := mock.data["a/b/c"]
|
||||
mock.mu.RUnlock()
|
||||
if exists {
|
||||
t.Fatalf("normalized key still exists after Delete")
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue