feat(lib): use new challenge creation flow (#749)
* feat(decaymap): add Delete method Signed-off-by: Xe Iaso <me@xeiaso.net> * chore(lib/challenge): refactor Validate to take ValidateInput Signed-off-by: Xe Iaso <me@xeiaso.net> * feat(lib): implement store interface Signed-off-by: Xe Iaso <me@xeiaso.net> * feat(lib/store): all metapackage to import all store implementations Signed-off-by: Xe Iaso <me@xeiaso.net> * chore(policy): import all store backends Signed-off-by: Xe Iaso <me@xeiaso.net> * feat(lib): use new challenge creation flow Previously Anubis constructed challenge strings from request metadata. This was a good idea in spirit, but has turned out to be a very bad idea in practice. This new flow reuses the Store facility to dynamically create challenge values with completely random data. This is a fairly big rewrite of how Anubis processes challenges. Right now it defaults to using the in-memory storage backend, but on-disk (boltdb) and valkey-based adaptors will come soon. Signed-off-by: Xe Iaso <me@xeiaso.net> * chore(decaymap): fix documentation typo Signed-off-by: Xe Iaso <me@xeiaso.net> * chore(lib): fix SA4004 Signed-off-by: Xe Iaso <me@xeiaso.net> * test(lib/store): make generic storage interface test adaptor Signed-off-by: Xe Iaso <me@xeiaso.net> * chore: spelling Signed-off-by: Xe Iaso <me@xeiaso.net> * fix(decaymap): invert locking process for Delete Signed-off-by: Xe Iaso <me@xeiaso.net> * feat(lib/store): add bbolt store implementation Signed-off-by: Xe Iaso <me@xeiaso.net> * chore: spelling Signed-off-by: Xe Iaso <me@xeiaso.net> * chore: go mod tidy Signed-off-by: Xe Iaso <me@xeiaso.net> * chore(devcontainer): adapt to docker compose, add valkey service Signed-off-by: Xe Iaso <me@xeiaso.net> * fix(lib): make challenges live for 30 minutes by default Signed-off-by: Xe Iaso <me@xeiaso.net> * feat(lib/store): implement valkey backend Signed-off-by: Xe Iaso <me@xeiaso.net> * test(lib/store/valkey): disable tests if not using docker Signed-off-by: Xe Iaso <me@xeiaso.net> * test(lib/policy/config): ensure valkey stores can be loaded Signed-off-by: Xe Iaso <me@xeiaso.net> * Update metadata check-spelling run (pull_request) for Xe/store-interface Signed-off-by: check-spelling-bot <check-spelling-bot@users.noreply.github.com> on-behalf-of: @check-spelling <check-spelling-bot@check-spelling.dev> * chore(devcontainer): remove port forwards because vs code handles that for you Signed-off-by: Xe Iaso <me@xeiaso.net> * docs(default-config): add a nudge to the storage backends section of the docs Signed-off-by: Xe Iaso <me@xeiaso.net> * chore(docs): listen on 0.0.0.0 for dev container support Signed-off-by: Xe Iaso <me@xeiaso.net> * docs(policy): document storage backends Signed-off-by: Xe Iaso <me@xeiaso.net> * docs: update CHANGELOG and internal links Signed-off-by: Xe Iaso <me@xeiaso.net> * docs(admin/policies): don't start a sentence with as Signed-off-by: Xe Iaso <me@xeiaso.net> * chore: fixes found in review Signed-off-by: Xe Iaso <me@xeiaso.net> --------- Signed-off-by: Xe Iaso <me@xeiaso.net> Signed-off-by: check-spelling-bot <check-spelling-bot@users.noreply.github.com>
This commit is contained in:
parent
506d8817d5
commit
dff2176beb
43 changed files with 1539 additions and 140 deletions
10
lib/store/all/all.go
Normal file
10
lib/store/all/all.go
Normal file
|
|
@ -0,0 +1,10 @@
|
|||
// Package all is a meta-package that imports all store implementations.
|
||||
//
|
||||
// This is a HACK to make tests work consistently.
|
||||
package all
|
||||
|
||||
import (
|
||||
_ "github.com/TecharoHQ/anubis/lib/store/bbolt"
|
||||
_ "github.com/TecharoHQ/anubis/lib/store/memory"
|
||||
_ "github.com/TecharoHQ/anubis/lib/store/valkey"
|
||||
)
|
||||
142
lib/store/bbolt/bbolt.go
Normal file
142
lib/store/bbolt/bbolt.go
Normal file
|
|
@ -0,0 +1,142 @@
|
|||
package bbolt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrBucketDoesNotExist = errors.New("bbolt: bucket does not exist")
|
||||
ErrNotExists = errors.New("bbolt: value does not exist in store")
|
||||
)
|
||||
|
||||
type Item struct {
|
||||
Data []byte `json:"data"`
|
||||
Expires time.Time `json:"expires"`
|
||||
}
|
||||
|
||||
type Store struct {
|
||||
bucket []byte
|
||||
bdb *bbolt.DB
|
||||
}
|
||||
|
||||
func (s *Store) Delete(ctx context.Context, key string) error {
|
||||
return s.bdb.Update(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(s.bucket)
|
||||
if bkt == nil {
|
||||
return fmt.Errorf("%w: %q", ErrBucketDoesNotExist, string(s.bucket))
|
||||
}
|
||||
|
||||
if bkt.Get([]byte(key)) == nil {
|
||||
return fmt.Errorf("%w: %q", ErrNotExists, key)
|
||||
}
|
||||
|
||||
return bkt.Delete([]byte(key))
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
var i Item
|
||||
|
||||
if err := s.bdb.View(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(s.bucket)
|
||||
if bkt == nil {
|
||||
return fmt.Errorf("%w: %q", ErrBucketDoesNotExist, string(s.bucket))
|
||||
}
|
||||
|
||||
bucketData := bkt.Get([]byte(key))
|
||||
if bucketData == nil {
|
||||
return fmt.Errorf("%w: %q", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(bucketData, &i); err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrCantDecode, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if time.Now().After(i.Expires) {
|
||||
go s.Delete(context.Background(), key)
|
||||
return nil, fmt.Errorf("%w: %q", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
return i.Data, nil
|
||||
}
|
||||
|
||||
func (s *Store) Set(ctx context.Context, key string, value []byte, expiry time.Duration) error {
|
||||
i := Item{
|
||||
Data: value,
|
||||
Expires: time.Now().Add(expiry),
|
||||
}
|
||||
|
||||
data, err := json.Marshal(i)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrCantEncode, err)
|
||||
}
|
||||
|
||||
return s.bdb.Update(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(s.bucket)
|
||||
if bkt == nil {
|
||||
return fmt.Errorf("%w: %q", ErrBucketDoesNotExist, string(s.bucket))
|
||||
}
|
||||
|
||||
return bkt.Put([]byte(key), data)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Store) cleanup(ctx context.Context) error {
|
||||
now := time.Now()
|
||||
|
||||
return s.bdb.Update(func(tx *bbolt.Tx) error {
|
||||
bkt := tx.Bucket(s.bucket)
|
||||
if bkt == nil {
|
||||
return fmt.Errorf("cache bucket %q does not exist", string(s.bucket))
|
||||
}
|
||||
|
||||
return bkt.ForEach(func(k, v []byte) error {
|
||||
var i Item
|
||||
|
||||
data := bkt.Get(k)
|
||||
if data == nil {
|
||||
return fmt.Errorf("%s in Cache bucket does not exist???", string(k))
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &i); err != nil {
|
||||
return fmt.Errorf("can't unmarshal data at key %s: %w", string(k), err)
|
||||
}
|
||||
|
||||
if now.After(i.Expires) {
|
||||
return bkt.Delete(k)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func (s *Store) cleanupThread(ctx context.Context) {
|
||||
t := time.NewTicker(5 * time.Minute)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
if err := s.cleanup(ctx); err != nil {
|
||||
slog.Error("error during bbolt cleanup", "err", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
23
lib/store/bbolt/bbolt_test.go
Normal file
23
lib/store/bbolt/bbolt_test.go
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
package bbolt
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store/storetest"
|
||||
)
|
||||
|
||||
func TestImpl(t *testing.T) {
|
||||
path := filepath.Join(t.TempDir(), "db")
|
||||
t.Log(path)
|
||||
data, err := json.Marshal(Config{
|
||||
Path: path,
|
||||
Bucket: "anubis",
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
storetest.Common(t, Factory{}, json.RawMessage(data))
|
||||
}
|
||||
100
lib/store/bbolt/factory.go
Normal file
100
lib/store/bbolt/factory.go
Normal file
|
|
@ -0,0 +1,100 @@
|
|||
package bbolt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrMissingPath = errors.New("bbolt: path is missing from config")
|
||||
ErrCantWriteToPath = errors.New("bbolt: can't write to path")
|
||||
)
|
||||
|
||||
func init() {
|
||||
store.Register("bbolt", Factory{})
|
||||
}
|
||||
|
||||
type Factory struct{}
|
||||
|
||||
func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface, error) {
|
||||
var config Config
|
||||
if err := json.Unmarshal([]byte(data), &config); err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
if err := config.Valid(); err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
if config.Bucket == "" {
|
||||
config.Bucket = "anubis"
|
||||
}
|
||||
|
||||
bdb, err := bbolt.Open(config.Path, 0600, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can't open bbolt database %s: %w", config.Path, err)
|
||||
}
|
||||
|
||||
if err := bdb.Update(func(tx *bbolt.Tx) error {
|
||||
if _, err := tx.CreateBucketIfNotExists([]byte(config.Bucket)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("can't create bbolt bucket %q: %w", config.Bucket, err)
|
||||
}
|
||||
|
||||
result := &Store{
|
||||
bdb: bdb,
|
||||
bucket: []byte(config.Bucket),
|
||||
}
|
||||
|
||||
go result.cleanupThread(ctx)
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (Factory) Valid(data json.RawMessage) error {
|
||||
var config Config
|
||||
if err := json.Unmarshal([]byte(data), &config); err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
if err := config.Valid(); err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Path string `json:"path"`
|
||||
Bucket string `json:"bucket,omitempty"`
|
||||
}
|
||||
|
||||
func (c Config) Valid() error {
|
||||
var errs []error
|
||||
|
||||
if c.Path == "" {
|
||||
errs = append(errs, ErrMissingPath)
|
||||
} else {
|
||||
dir := filepath.Dir(c.Path)
|
||||
if err := os.WriteFile(filepath.Join(dir, ".test-file"), []byte(""), 0600); err != nil {
|
||||
errs = append(errs, ErrCantWriteToPath)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) != 0 {
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
50
lib/store/bbolt/factory_test.go
Normal file
50
lib/store/bbolt/factory_test.go
Normal file
|
|
@ -0,0 +1,50 @@
|
|||
package bbolt
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestFactoryValid(t *testing.T) {
|
||||
f := Factory{}
|
||||
|
||||
t.Run("bad config", func(t *testing.T) {
|
||||
if err := f.Valid(json.RawMessage(`}`)); err == nil {
|
||||
t.Error("wanted parsing failure but got a successful result")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("invalid config", func(t *testing.T) {
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
cfg Config
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "missing path",
|
||||
cfg: Config{},
|
||||
err: ErrMissingPath,
|
||||
},
|
||||
{
|
||||
name: "unwritable folder",
|
||||
cfg: Config{
|
||||
Path: filepath.Join("/", "testdb"),
|
||||
},
|
||||
err: ErrCantWriteToPath,
|
||||
},
|
||||
} {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
data, err := json.Marshal(tt.cfg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := f.Valid(json.RawMessage(data)); !errors.Is(err, tt.err) {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
77
lib/store/interface.go
Normal file
77
lib/store/interface.go
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrNotFound is returned when the store implementation cannot find the value
|
||||
// for a given key.
|
||||
ErrNotFound = errors.New("store: key not found")
|
||||
|
||||
// ErrCantDecode is returned when a store adaptor cannot decode the store format
|
||||
// to a value used by the code.
|
||||
ErrCantDecode = errors.New("store: can't decode value")
|
||||
|
||||
// ErrCantEncode is returned when a store adaptor cannot encode the value into
|
||||
// the format that the store uses.
|
||||
ErrCantEncode = errors.New("store: can't encode value")
|
||||
|
||||
// ErrBadConfig is returned when a store adaptor's configuration is invalid.
|
||||
ErrBadConfig = errors.New("store: configuration is invalid")
|
||||
)
|
||||
|
||||
// Interface defines the calls that Anubis uses for storage in a local or remote
|
||||
// datastore. This can be implemented with an in-memory, on-disk, or in-database
|
||||
// storage backend.
|
||||
type Interface interface {
|
||||
// Delete removes a value from the store by key.
|
||||
Delete(ctx context.Context, key string) error
|
||||
|
||||
// Get returns the value of a key assuming that value exists and has not expired.
|
||||
Get(ctx context.Context, key string) ([]byte, error)
|
||||
|
||||
// Set puts a value into the store that expires according to its expiry.
|
||||
Set(ctx context.Context, key string, value []byte, expiry time.Duration) error
|
||||
}
|
||||
|
||||
func z[T any]() T { return *new(T) }
|
||||
|
||||
type JSON[T any] struct {
|
||||
Underlying Interface
|
||||
}
|
||||
|
||||
func (j *JSON[T]) Delete(ctx context.Context, key string) error {
|
||||
return j.Underlying.Delete(ctx, key)
|
||||
}
|
||||
|
||||
func (j *JSON[T]) Get(ctx context.Context, key string) (T, error) {
|
||||
data, err := j.Underlying.Get(ctx, key)
|
||||
if err != nil {
|
||||
return z[T](), err
|
||||
}
|
||||
|
||||
var result T
|
||||
if err := json.Unmarshal(data, &result); err != nil {
|
||||
return z[T](), fmt.Errorf("%w: %w", ErrCantDecode, err)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (j *JSON[T]) Set(ctx context.Context, key string, value T, expiry time.Duration) error {
|
||||
data, err := json.Marshal(value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%w: %w", ErrCantEncode, err)
|
||||
}
|
||||
|
||||
if err := j.Underlying.Set(ctx, key, data, expiry); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
74
lib/store/memory/memory.go
Normal file
74
lib/store/memory/memory.go
Normal file
|
|
@ -0,0 +1,74 @@
|
|||
package memory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/TecharoHQ/anubis/decaymap"
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
)
|
||||
|
||||
type factory struct{}
|
||||
|
||||
func (factory) Build(ctx context.Context, _ json.RawMessage) (store.Interface, error) {
|
||||
return New(ctx), nil
|
||||
}
|
||||
|
||||
func (factory) Valid(json.RawMessage) error { return nil }
|
||||
|
||||
func init() {
|
||||
store.Register("memory", factory{})
|
||||
}
|
||||
|
||||
type impl struct {
|
||||
store *decaymap.Impl[string, []byte]
|
||||
}
|
||||
|
||||
func (i *impl) Delete(_ context.Context, key string) error {
|
||||
if !i.store.Delete(key) {
|
||||
return fmt.Errorf("%w: %q", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *impl) Get(_ context.Context, key string) ([]byte, error) {
|
||||
result, ok := i.store.Get(key)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%w: %q", store.ErrNotFound, key)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (i *impl) Set(_ context.Context, key string, value []byte, expiry time.Duration) error {
|
||||
i.store.Set(key, value, expiry)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *impl) cleanupThread(ctx context.Context) {
|
||||
t := time.NewTicker(5 * time.Minute)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-t.C:
|
||||
i.store.Cleanup()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a simple in-memory store. This will not scale to multiple Anubis instances.
|
||||
func New(ctx context.Context) store.Interface {
|
||||
result := &impl{
|
||||
store: decaymap.New[string, []byte](),
|
||||
}
|
||||
|
||||
go result.cleanupThread(ctx)
|
||||
|
||||
return result
|
||||
}
|
||||
11
lib/store/memory/memory_test.go
Normal file
11
lib/store/memory/memory_test.go
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
package memory
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store/storetest"
|
||||
)
|
||||
|
||||
func TestImpl(t *testing.T) {
|
||||
storetest.Common(t, factory{}, nil)
|
||||
}
|
||||
43
lib/store/registry.go
Normal file
43
lib/store/registry.go
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
package store
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
registry map[string]Factory = map[string]Factory{}
|
||||
regLock sync.RWMutex
|
||||
)
|
||||
|
||||
type Factory interface {
|
||||
Build(ctx context.Context, config json.RawMessage) (Interface, error)
|
||||
Valid(config json.RawMessage) error
|
||||
}
|
||||
|
||||
func Register(name string, impl Factory) {
|
||||
regLock.Lock()
|
||||
defer regLock.Unlock()
|
||||
|
||||
registry[name] = impl
|
||||
}
|
||||
|
||||
func Get(name string) (Factory, bool) {
|
||||
regLock.RLock()
|
||||
defer regLock.RUnlock()
|
||||
result, ok := registry[name]
|
||||
return result, ok
|
||||
}
|
||||
|
||||
func Methods() []string {
|
||||
regLock.RLock()
|
||||
defer regLock.RUnlock()
|
||||
var result []string
|
||||
for method := range registry {
|
||||
result = append(result, method)
|
||||
}
|
||||
sort.Strings(result)
|
||||
return result
|
||||
}
|
||||
92
lib/store/storetest/storetest.go
Normal file
92
lib/store/storetest/storetest.go
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
package storetest
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
)
|
||||
|
||||
func Common(t *testing.T, f store.Factory, config json.RawMessage) {
|
||||
if err := f.Valid(config); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
s, err := f.Build(t.Context(), config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, tt := range []struct {
|
||||
name string
|
||||
doer func(t *testing.T, s store.Interface) error
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "basic get set delete",
|
||||
doer: func(t *testing.T, s store.Interface) error {
|
||||
if _, err := s.Get(t.Context(), t.Name()); !errors.Is(err, store.ErrNotFound) {
|
||||
t.Errorf("wanted %s to not exist in store but it exists anyways", t.Name())
|
||||
}
|
||||
|
||||
if err := s.Set(t.Context(), t.Name(), []byte(t.Name()), 5*time.Minute); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
val, err := s.Get(t.Context(), t.Name())
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
t.Errorf("wanted %s to exist in store but it does not", t.Name())
|
||||
}
|
||||
|
||||
if !bytes.Equal(val, []byte(t.Name())) {
|
||||
t.Logf("want: %q", t.Name())
|
||||
t.Logf("got: %q", string(val))
|
||||
t.Error("wrong value returned")
|
||||
}
|
||||
|
||||
if err := s.Delete(t.Context(), t.Name()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := s.Get(t.Context(), t.Name()); !errors.Is(err, store.ErrNotFound) {
|
||||
t.Error("wanted test to not exist in store but it exists anyways")
|
||||
}
|
||||
|
||||
if err := s.Delete(t.Context(), t.Name()); err == nil {
|
||||
t.Errorf("key %q does not exist and Delete did not return non-nil", t.Name())
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "expires",
|
||||
doer: func(t *testing.T, s store.Interface) error {
|
||||
if err := s.Set(t.Context(), t.Name(), []byte(t.Name()), 150*time.Millisecond); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//nosleep:bypass XXX(Xe): use Go's time faking thing in Go 1.25 when that is released.
|
||||
time.Sleep(155 * time.Millisecond)
|
||||
|
||||
if _, err := s.Get(t.Context(), t.Name()); !errors.Is(err, store.ErrNotFound) {
|
||||
t.Errorf("wanted %s to not exist in store but it exists anyways", t.Name())
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
if err := tt.doer(t, s); !errors.Is(err, tt.err) {
|
||||
t.Logf("want: %v", tt.err)
|
||||
t.Logf("got: %v", err)
|
||||
t.Error("wrong error")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
84
lib/store/valkey/factory.go
Normal file
84
lib/store/valkey/factory.go
Normal file
|
|
@ -0,0 +1,84 @@
|
|||
package valkey
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
valkey "github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNoURL = errors.New("valkey.Config: no URL defined")
|
||||
ErrBadURL = errors.New("valkey.Config: URL is invalid")
|
||||
)
|
||||
|
||||
func init() {
|
||||
store.Register("valkey", Factory{})
|
||||
}
|
||||
|
||||
type Factory struct{}
|
||||
|
||||
func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface, error) {
|
||||
var config Config
|
||||
|
||||
if err := json.Unmarshal([]byte(data), &config); err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
if err := config.Valid(); err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
opts, err := valkey.ParseURL(config.URL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
rdb := valkey.NewClient(opts)
|
||||
|
||||
if _, err := rdb.Ping(ctx).Result(); err != nil {
|
||||
return nil, fmt.Errorf("can't ping valkey instance: %w", err)
|
||||
}
|
||||
|
||||
return &Store{
|
||||
rdb: rdb,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (Factory) Valid(data json.RawMessage) error {
|
||||
var config Config
|
||||
if err := json.Unmarshal([]byte(data), &config); err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
if err := config.Valid(); err != nil {
|
||||
return fmt.Errorf("%w: %w", store.ErrBadConfig, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
URL string `json:"url"`
|
||||
}
|
||||
|
||||
func (c Config) Valid() error {
|
||||
var errs []error
|
||||
|
||||
if c.URL == "" {
|
||||
errs = append(errs, ErrNoURL)
|
||||
}
|
||||
|
||||
if _, err := valkey.ParseURL(c.URL); err != nil {
|
||||
errs = append(errs, ErrBadURL)
|
||||
}
|
||||
|
||||
if len(errs) != 0 {
|
||||
return fmt.Errorf("valkey.Config: invalid config: %w", errors.Join(errs...))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
49
lib/store/valkey/valkey.go
Normal file
49
lib/store/valkey/valkey.go
Normal file
|
|
@ -0,0 +1,49 @@
|
|||
package valkey
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/TecharoHQ/anubis/lib/store"
|
||||
valkey "github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type Store struct {
|
||||
rdb *valkey.Client
|
||||
}
|
||||
|
||||
func (s *Store) Delete(ctx context.Context, key string) error {
|
||||
n, err := s.rdb.Del(ctx, key).Result()
|
||||
if err != nil {
|
||||
return fmt.Errorf("can't delete from valkey: %w", err)
|
||||
}
|
||||
|
||||
switch n {
|
||||
case 0:
|
||||
return fmt.Errorf("%w: %d key(s) deleted", store.ErrNotFound, n)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) Get(ctx context.Context, key string) ([]byte, error) {
|
||||
result, err := s.rdb.Get(ctx, key).Result()
|
||||
if err != nil {
|
||||
if valkey.HasErrorPrefix(err, "redis: nil") {
|
||||
return nil, fmt.Errorf("%w: %w", store.ErrNotFound, err)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("can't fetch from valkey: %w", err)
|
||||
}
|
||||
|
||||
return []byte(result), nil
|
||||
}
|
||||
|
||||
func (s *Store) Set(ctx context.Context, key string, value []byte, expiry time.Duration) error {
|
||||
if _, err := s.rdb.Set(ctx, key, string(value), expiry).Result(); err != nil {
|
||||
return fmt.Errorf("can't set %q in valkey: %w", key, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
53
lib/store/valkey/valkey_test.go
Normal file
53
lib/store/valkey/valkey_test.go
Normal file
|
|
@ -0,0 +1,53 @@
|
|||
package valkey
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/TecharoHQ/anubis/internal"
|
||||
"github.com/TecharoHQ/anubis/lib/store/storetest"
|
||||
"github.com/testcontainers/testcontainers-go"
|
||||
"github.com/testcontainers/testcontainers-go/wait"
|
||||
)
|
||||
|
||||
func init() {
|
||||
internal.UnbreakDocker()
|
||||
}
|
||||
|
||||
func TestImpl(t *testing.T) {
|
||||
if os.Getenv("DONT_USE_NETWORK") != "" {
|
||||
t.Skip("test requires network egress")
|
||||
return
|
||||
}
|
||||
|
||||
testcontainers.SkipIfProviderIsNotHealthy(t)
|
||||
|
||||
req := testcontainers.ContainerRequest{
|
||||
Image: "valkey/valkey:8",
|
||||
WaitingFor: wait.ForLog("Ready to accept connections"),
|
||||
}
|
||||
valkeyC, err := testcontainers.GenericContainer(t.Context(), testcontainers.GenericContainerRequest{
|
||||
ContainerRequest: req,
|
||||
Started: true,
|
||||
})
|
||||
testcontainers.CleanupContainer(t, valkeyC)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
containerIP, err := valkeyC.ContainerIP(t.Context())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
data, err := json.Marshal(Config{
|
||||
URL: fmt.Sprintf("redis://%s:6379/0", containerIP),
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
storetest.Common(t, Factory{}, json.RawMessage(data))
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue