(feat) Add cluster support to redis/vaultkey store (#1276)

* (feat) Add cluster support to redis/vaultkey store

* (chore) Update CHANGELOG.md

* (fix) Disable maintenance notification on the Valkey store

* (fix) Valkey text fix and allow maintnotifications in spelling.
This commit is contained in:
Esteban Gimbernat 2025-11-14 14:22:22 +01:00 committed by GitHub
parent a5bb6d2751
commit 6a7f80e6f5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 95 additions and 81 deletions

View file

@ -8,3 +8,4 @@ msgbox
xeact
ABee
tencent
maintnotifications

View file

@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add Ukrainian locale ([#1044](https://github.com/TecharoHQ/anubis/pull/1044)).
- Allow Renovate as an OCI registry client.
- Properly handle 4in6 addresses so that IP matching works with those addresses.
- Add support to simple Valkey/Redis cluster mode
## v1.23.1: Lyse Hext - Echo 1

View file

@ -5,80 +5,98 @@ import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/TecharoHQ/anubis/lib/store"
valkey "github.com/redis/go-redis/v9"
)
var (
ErrNoURL = errors.New("valkey.Config: no URL defined")
ErrBadURL = errors.New("valkey.Config: URL is invalid")
"github.com/redis/go-redis/v9/maintnotifications"
)
func init() {
store.Register("valkey", Factory{})
}
type Factory struct{}
func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface, error) {
var config Config
if err := json.Unmarshal([]byte(data), &config); err != nil {
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
}
if err := config.Valid(); err != nil {
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
}
opts, err := valkey.ParseURL(config.URL)
if err != nil {
return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err)
}
rdb := valkey.NewClient(opts)
if _, err := rdb.Ping(ctx).Result(); err != nil {
return nil, fmt.Errorf("can't ping valkey instance: %w", err)
}
return &Store{
rdb: rdb,
}, nil
}
func (Factory) Valid(data json.RawMessage) error {
var config Config
if err := json.Unmarshal([]byte(data), &config); err != nil {
return fmt.Errorf("%w: %w", store.ErrBadConfig, err)
}
if err := config.Valid(); err != nil {
return fmt.Errorf("%w: %w", store.ErrBadConfig, err)
}
return nil
}
// Errors kept as-is so other code/tests still pass.
var (
ErrNoURL = errors.New("valkey.Config: no URL defined")
ErrBadURL = errors.New("valkey.Config: URL is invalid")
)
// Config is what Anubis unmarshals from the "parameters" JSON.
type Config struct {
URL string `json:"url"`
Cluster bool `json:"cluster,omitempty"`
}
func (c Config) Valid() error {
var errs []error
if c.URL == "" {
errs = append(errs, ErrNoURL)
return ErrNoURL
}
// Just validate that it's a valid Redis URL.
if _, err := valkey.ParseURL(c.URL); err != nil {
errs = append(errs, ErrBadURL)
}
if len(errs) != 0 {
return fmt.Errorf("valkey.Config: invalid config: %w", errors.Join(errs...))
return fmt.Errorf("%w: %v", ErrBadURL, err)
}
return nil
}
// redisClient is satisfied by *valkey.Client and *valkey.ClusterClient.
type redisClient interface {
Get(ctx context.Context, key string) *valkey.StringCmd
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *valkey.StatusCmd
Del(ctx context.Context, keys ...string) *valkey.IntCmd
Ping(ctx context.Context) *valkey.StatusCmd
}
type Factory struct{}
func (Factory) Valid(data json.RawMessage) error {
var cfg Config
if err := json.Unmarshal(data, &cfg); err != nil {
return err
}
return cfg.Valid()
}
func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface, error) {
var cfg Config
if err := json.Unmarshal(data, &cfg); err != nil {
return nil, err
}
if err := cfg.Valid(); err != nil {
return nil, err
}
opts, err := valkey.ParseURL(cfg.URL)
if err != nil {
return nil, fmt.Errorf("valkey.Factory: %w", err)
}
var client redisClient
if cfg.Cluster {
// Cluster mode: use the parsed Addr as the seed node.
clusterOpts := &valkey.ClusterOptions{
Addrs: []string{opts.Addr},
// Explicitly disable maintenance notifications
// This prevents the client from sending CLIENT MAINT_NOTIFICATIONS ON
MaintNotificationsConfig: &maintnotifications.Config{
Mode: maintnotifications.ModeDisabled,
},
}
client = valkey.NewClusterClient(clusterOpts)
} else {
opts.MaintNotificationsConfig = &maintnotifications.Config{
Mode: maintnotifications.ModeDisabled,
}
client = valkey.NewClient(opts)
}
// Optional but nice: fail fast if the cluster/single node is unreachable.
if err := client.Ping(ctx).Err(); err != nil {
return nil, fmt.Errorf("valkey.Factory: ping failed: %w", err)
}
return &Store{client: client}, nil
}

View file

@ -2,52 +2,46 @@ package valkey
import (
"context"
"fmt"
"time"
"github.com/TecharoHQ/anubis/lib/store"
valkey "github.com/redis/go-redis/v9"
)
// Store implements store.Interface on top of Redis/Valkey.
type Store struct {
rdb *valkey.Client
client redisClient
}
func (s *Store) Delete(ctx context.Context, key string) error {
n, err := s.rdb.Del(ctx, key).Result()
if err != nil {
return fmt.Errorf("can't delete from valkey: %w", err)
}
switch n {
case 0:
return fmt.Errorf("%w: %d key(s) deleted", store.ErrNotFound, n)
default:
return nil
}
}
var _ store.Interface = (*Store)(nil)
func (s *Store) Get(ctx context.Context, key string) ([]byte, error) {
result, err := s.rdb.Get(ctx, key).Result()
if err != nil {
if valkey.HasErrorPrefix(err, "redis: nil") {
return nil, fmt.Errorf("%w: %w", store.ErrNotFound, err)
cmd := s.client.Get(ctx, key)
if err := cmd.Err(); err != nil {
if err == valkey.Nil {
return nil, store.ErrNotFound
}
return nil, fmt.Errorf("can't fetch from valkey: %w", err)
return nil, err
}
return []byte(result), nil
return cmd.Bytes()
}
func (s *Store) Set(ctx context.Context, key string, value []byte, expiry time.Duration) error {
if _, err := s.rdb.Set(ctx, key, string(value), expiry).Result(); err != nil {
return fmt.Errorf("can't set %q in valkey: %w", key, err)
}
return s.client.Set(ctx, key, value, expiry).Err()
}
func (s *Store) Delete(ctx context.Context, key string) error {
res := s.client.Del(ctx, key)
if err := res.Err(); err != nil {
return err
}
if n, _ := res.Result(); n == 0 {
return store.ErrNotFound
}
return nil
}
// IsPersistent tells Anubis this backend is “real” storage, not in-memory.
func (s *Store) IsPersistent() bool {
return true
}