From 6a7f80e6f53279139c86944bd83e1f3a766c0ad2 Mon Sep 17 00:00:00 2001 From: Esteban Gimbernat Date: Fri, 14 Nov 2025 14:22:22 +0100 Subject: [PATCH] (feat) Add cluster support to redis/vaultkey store (#1276) * (feat) Add cluster support to redis/vaultkey store * (chore) Update CHANGELOG.md * (fix) Disable maintenance notification on the Valkey store * (fix) Valkey text fix and allow maintnotifications in spelling. --- .github/actions/spelling/allow.txt | 1 + docs/docs/CHANGELOG.md | 1 + lib/store/valkey/factory.go | 128 ++++++++++++++++------------- lib/store/valkey/valkey.go | 46 +++++------ 4 files changed, 95 insertions(+), 81 deletions(-) diff --git a/.github/actions/spelling/allow.txt b/.github/actions/spelling/allow.txt index b53a55e..8267488 100644 --- a/.github/actions/spelling/allow.txt +++ b/.github/actions/spelling/allow.txt @@ -8,3 +8,4 @@ msgbox xeact ABee tencent +maintnotifications \ No newline at end of file diff --git a/docs/docs/CHANGELOG.md b/docs/docs/CHANGELOG.md index e6bd0fc..7b86e5d 100644 --- a/docs/docs/CHANGELOG.md +++ b/docs/docs/CHANGELOG.md @@ -18,6 +18,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add Ukrainian locale ([#1044](https://github.com/TecharoHQ/anubis/pull/1044)). - Allow Renovate as an OCI registry client. - Properly handle 4in6 addresses so that IP matching works with those addresses. +- Add support to simple Valkey/Redis cluster mode ## v1.23.1: Lyse Hext - Echo 1 diff --git a/lib/store/valkey/factory.go b/lib/store/valkey/factory.go index d26c1c7..c36b86d 100644 --- a/lib/store/valkey/factory.go +++ b/lib/store/valkey/factory.go @@ -5,80 +5,98 @@ import ( "encoding/json" "errors" "fmt" + "time" "github.com/TecharoHQ/anubis/lib/store" valkey "github.com/redis/go-redis/v9" -) - -var ( - ErrNoURL = errors.New("valkey.Config: no URL defined") - ErrBadURL = errors.New("valkey.Config: URL is invalid") + "github.com/redis/go-redis/v9/maintnotifications" ) func init() { store.Register("valkey", Factory{}) } -type Factory struct{} - -func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface, error) { - var config Config - - if err := json.Unmarshal([]byte(data), &config); err != nil { - return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err) - } - - if err := config.Valid(); err != nil { - return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err) - } - - opts, err := valkey.ParseURL(config.URL) - if err != nil { - return nil, fmt.Errorf("%w: %w", store.ErrBadConfig, err) - } - - rdb := valkey.NewClient(opts) - - if _, err := rdb.Ping(ctx).Result(); err != nil { - return nil, fmt.Errorf("can't ping valkey instance: %w", err) - } - - return &Store{ - rdb: rdb, - }, nil -} - -func (Factory) Valid(data json.RawMessage) error { - var config Config - if err := json.Unmarshal([]byte(data), &config); err != nil { - return fmt.Errorf("%w: %w", store.ErrBadConfig, err) - } - - if err := config.Valid(); err != nil { - return fmt.Errorf("%w: %w", store.ErrBadConfig, err) - } - - return nil -} +// Errors kept as-is so other code/tests still pass. +var ( + ErrNoURL = errors.New("valkey.Config: no URL defined") + ErrBadURL = errors.New("valkey.Config: URL is invalid") +) +// Config is what Anubis unmarshals from the "parameters" JSON. type Config struct { - URL string `json:"url"` + URL string `json:"url"` + Cluster bool `json:"cluster,omitempty"` } func (c Config) Valid() error { - var errs []error - if c.URL == "" { - errs = append(errs, ErrNoURL) + return ErrNoURL } + // Just validate that it's a valid Redis URL. if _, err := valkey.ParseURL(c.URL); err != nil { - errs = append(errs, ErrBadURL) - } - - if len(errs) != 0 { - return fmt.Errorf("valkey.Config: invalid config: %w", errors.Join(errs...)) + return fmt.Errorf("%w: %v", ErrBadURL, err) } return nil } + +// redisClient is satisfied by *valkey.Client and *valkey.ClusterClient. +type redisClient interface { + Get(ctx context.Context, key string) *valkey.StringCmd + Set(ctx context.Context, key string, value interface{}, expiration time.Duration) *valkey.StatusCmd + Del(ctx context.Context, keys ...string) *valkey.IntCmd + Ping(ctx context.Context) *valkey.StatusCmd +} + +type Factory struct{} + +func (Factory) Valid(data json.RawMessage) error { + var cfg Config + if err := json.Unmarshal(data, &cfg); err != nil { + return err + } + return cfg.Valid() +} + +func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface, error) { + var cfg Config + if err := json.Unmarshal(data, &cfg); err != nil { + return nil, err + } + if err := cfg.Valid(); err != nil { + return nil, err + } + + opts, err := valkey.ParseURL(cfg.URL) + if err != nil { + return nil, fmt.Errorf("valkey.Factory: %w", err) + } + + var client redisClient + + if cfg.Cluster { + // Cluster mode: use the parsed Addr as the seed node. + clusterOpts := &valkey.ClusterOptions{ + Addrs: []string{opts.Addr}, + // Explicitly disable maintenance notifications + // This prevents the client from sending CLIENT MAINT_NOTIFICATIONS ON + MaintNotificationsConfig: &maintnotifications.Config{ + Mode: maintnotifications.ModeDisabled, + }, + } + client = valkey.NewClusterClient(clusterOpts) + } else { + opts.MaintNotificationsConfig = &maintnotifications.Config{ + Mode: maintnotifications.ModeDisabled, + } + client = valkey.NewClient(opts) + } + + // Optional but nice: fail fast if the cluster/single node is unreachable. + if err := client.Ping(ctx).Err(); err != nil { + return nil, fmt.Errorf("valkey.Factory: ping failed: %w", err) + } + + return &Store{client: client}, nil +} diff --git a/lib/store/valkey/valkey.go b/lib/store/valkey/valkey.go index 11f4374..4a28822 100644 --- a/lib/store/valkey/valkey.go +++ b/lib/store/valkey/valkey.go @@ -2,52 +2,46 @@ package valkey import ( "context" - "fmt" "time" "github.com/TecharoHQ/anubis/lib/store" valkey "github.com/redis/go-redis/v9" ) +// Store implements store.Interface on top of Redis/Valkey. type Store struct { - rdb *valkey.Client + client redisClient } -func (s *Store) Delete(ctx context.Context, key string) error { - n, err := s.rdb.Del(ctx, key).Result() - if err != nil { - return fmt.Errorf("can't delete from valkey: %w", err) - } - - switch n { - case 0: - return fmt.Errorf("%w: %d key(s) deleted", store.ErrNotFound, n) - default: - return nil - } -} +var _ store.Interface = (*Store)(nil) func (s *Store) Get(ctx context.Context, key string) ([]byte, error) { - result, err := s.rdb.Get(ctx, key).Result() - if err != nil { - if valkey.HasErrorPrefix(err, "redis: nil") { - return nil, fmt.Errorf("%w: %w", store.ErrNotFound, err) + cmd := s.client.Get(ctx, key) + if err := cmd.Err(); err != nil { + if err == valkey.Nil { + return nil, store.ErrNotFound } - - return nil, fmt.Errorf("can't fetch from valkey: %w", err) + return nil, err } - - return []byte(result), nil + return cmd.Bytes() } func (s *Store) Set(ctx context.Context, key string, value []byte, expiry time.Duration) error { - if _, err := s.rdb.Set(ctx, key, string(value), expiry).Result(); err != nil { - return fmt.Errorf("can't set %q in valkey: %w", key, err) - } + return s.client.Set(ctx, key, value, expiry).Err() +} +func (s *Store) Delete(ctx context.Context, key string) error { + res := s.client.Del(ctx, key) + if err := res.Err(); err != nil { + return err + } + if n, _ := res.Result(); n == 0 { + return store.ErrNotFound + } return nil } +// IsPersistent tells Anubis this backend is “real” storage, not in-memory. func (s *Store) IsPersistent() bool { return true }