feat(store/bbolt): implement actor pattern (#1107)
* feat(store/bbolt): implement actor pattern Signed-off-by: Xe Iaso <me@xeiaso.net> * docs(internal/actorify): document package Signed-off-by: Xe Iaso <me@xeiaso.net> * Update metadata check-spelling run (pull_request) for Xe/actorify Signed-off-by: check-spelling-bot <check-spelling-bot@users.noreply.github.com> on-behalf-of: @check-spelling <check-spelling-bot@check-spelling.dev> --------- Signed-off-by: Xe Iaso <me@xeiaso.net> Signed-off-by: check-spelling-bot <check-spelling-bot@users.noreply.github.com>
This commit is contained in:
parent
63591866aa
commit
401e18f29f
5 changed files with 200 additions and 6 deletions
4
.github/actions/spelling/expect.txt
vendored
4
.github/actions/spelling/expect.txt
vendored
|
|
@ -1,4 +1,7 @@
|
||||||
acs
|
acs
|
||||||
|
Actorified
|
||||||
|
actorifiedstore
|
||||||
|
actorify
|
||||||
Aibrew
|
Aibrew
|
||||||
alibaba
|
alibaba
|
||||||
alrest
|
alrest
|
||||||
|
|
@ -157,6 +160,7 @@ ifm
|
||||||
Imagesift
|
Imagesift
|
||||||
imgproxy
|
imgproxy
|
||||||
impressum
|
impressum
|
||||||
|
inbox
|
||||||
inp
|
inp
|
||||||
internets
|
internets
|
||||||
IPTo
|
IPTo
|
||||||
|
|
|
||||||
|
|
@ -13,13 +13,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
|
|
||||||
<!-- This changes the project to: -->
|
<!-- This changes the project to: -->
|
||||||
|
|
||||||
- Fix lock convoy problem in decaymap ([#1103](https://github.com/TecharoHQ/anubis/issues/1103))
|
- Fix lock convoy problem in decaymap ([#1103](https://github.com/TecharoHQ/anubis/issues/1103)).
|
||||||
- Document missing environment variables in installation guide: `SLOG_LEVEL`, `COOKIE_PREFIX`, `FORCED_LANGUAGE`, and `TARGET_DISABLE_KEEPALIVE` ([#1086](https://github.com/TecharoHQ/anubis/pull/1086))
|
- Fix lock convoy problem in bbolt by implementing the actor pattern ([#1103](https://github.com/TecharoHQ/anubis/issues/1103)).
|
||||||
- Add validation warning when persistent storage is used without setting signing keys
|
- Document missing environment variables in installation guide: `SLOG_LEVEL`, `COOKIE_PREFIX`, `FORCED_LANGUAGE`, and `TARGET_DISABLE_KEEPALIVE` ([#1086](https://github.com/TecharoHQ/anubis/pull/1086)).
|
||||||
- Fixed `robots2policy` to properly group consecutive user agents into `any:` instead of only processing the last one ([#925](https://github.com/TecharoHQ/anubis/pull/925))
|
- Add validation warning when persistent storage is used without setting signing keys.
|
||||||
|
- Fixed `robots2policy` to properly group consecutive user agents into `any:` instead of only processing the last one ([#925](https://github.com/TecharoHQ/anubis/pull/925)).
|
||||||
- Add the [`s3api` storage backend](./admin/policies.mdx#s3api) to allow Anubis to use S3 API compatible object storage as its storage backend.
|
- Add the [`s3api` storage backend](./admin/policies.mdx#s3api) to allow Anubis to use S3 API compatible object storage as its storage backend.
|
||||||
- Make `cmd/containerbuild` support commas for separating elements of the `--docker-tags` argument as well as newlines.
|
- Make `cmd/containerbuild` support commas for separating elements of the `--docker-tags` argument as well as newlines.
|
||||||
- Add the `DIFFICULTY_IN_JWT` option, which allows one to add the `difficulty` field in the JWT claims which indicates the difficulty of the token ([#1063](https://github.com/TecharoHQ/anubis/pull/1063))
|
- Add the `DIFFICULTY_IN_JWT` option, which allows one to add the `difficulty` field in the JWT claims which indicates the difficulty of the token ([#1063](https://github.com/TecharoHQ/anubis/pull/1063)).
|
||||||
- Ported the client-side JS to TypeScript to avoid egregious errors in the future.
|
- Ported the client-side JS to TypeScript to avoid egregious errors in the future.
|
||||||
- Fixes concurrency problems with very old browsers ([#1082](https://github.com/TecharoHQ/anubis/issues/1082)).
|
- Fixes concurrency problems with very old browsers ([#1082](https://github.com/TecharoHQ/anubis/issues/1082)).
|
||||||
|
|
||||||
|
|
|
||||||
107
internal/actorify/actorify.go
Normal file
107
internal/actorify/actorify.go
Normal file
|
|
@ -0,0 +1,107 @@
|
||||||
|
// Package actorify lets you transform a parallel operation into a serialized
|
||||||
|
// operation via the Actor pattern[1].
|
||||||
|
//
|
||||||
|
// [1]: https://en.wikipedia.org/wiki/Actor_model
|
||||||
|
package actorify
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func z[Z any]() Z {
|
||||||
|
var z Z
|
||||||
|
return z
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
// ErrActorDied is returned when the actor inbox or reply channel was closed.
|
||||||
|
ErrActorDied = errors.New("actorify: the actor inbox or reply channel was closed")
|
||||||
|
)
|
||||||
|
|
||||||
|
// Handler is a function alias for the underlying logic the Actor should call.
|
||||||
|
type Handler[Input, Output any] func(ctx context.Context, input Input) (Output, error)
|
||||||
|
|
||||||
|
// Actor is a serializing wrapper that runs a function in a background goroutine.
|
||||||
|
// Whenever the Call method is invoked, a message is sent to the actor's inbox and then
|
||||||
|
// the callee waits for a response. Depending on how busy the actor is, this may take
|
||||||
|
// a moment.
|
||||||
|
type Actor[Input, Output any] struct {
|
||||||
|
handler Handler[Input, Output]
|
||||||
|
inbox chan *message[Input, Output]
|
||||||
|
}
|
||||||
|
|
||||||
|
type message[Input, Output any] struct {
|
||||||
|
ctx context.Context
|
||||||
|
arg Input
|
||||||
|
reply chan reply[Output]
|
||||||
|
}
|
||||||
|
|
||||||
|
type reply[Output any] struct {
|
||||||
|
output Output
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// New constructs a new Actor and starts its background thread. Cancel the context and you cancel
|
||||||
|
// the Actor.
|
||||||
|
func New[Input, Output any](ctx context.Context, handler Handler[Input, Output]) *Actor[Input, Output] {
|
||||||
|
result := &Actor[Input, Output]{
|
||||||
|
handler: handler,
|
||||||
|
inbox: make(chan *message[Input, Output], 32),
|
||||||
|
}
|
||||||
|
|
||||||
|
go result.handle(ctx)
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *Actor[Input, Output]) handle(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
close(a.inbox)
|
||||||
|
return
|
||||||
|
case msg, ok := <-a.inbox:
|
||||||
|
if !ok {
|
||||||
|
if msg.reply != nil {
|
||||||
|
close(msg.reply)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
result, err := a.handler(msg.ctx, msg.arg)
|
||||||
|
|
||||||
|
reply := reply[Output]{
|
||||||
|
output: result,
|
||||||
|
err: err,
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.reply <- reply
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call calls the Actor with a given Input and returns the handler's Output.
|
||||||
|
//
|
||||||
|
// This only works with unary functions by design. If you need to have more inputs, define
|
||||||
|
// a struct type to use as a container.
|
||||||
|
func (a *Actor[Input, Output]) Call(ctx context.Context, input Input) (Output, error) {
|
||||||
|
replyCh := make(chan reply[Output])
|
||||||
|
|
||||||
|
a.inbox <- &message[Input, Output]{
|
||||||
|
arg: input,
|
||||||
|
reply: replyCh,
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case reply, ok := <-replyCh:
|
||||||
|
if !ok {
|
||||||
|
return z[Output](), ErrActorDied
|
||||||
|
}
|
||||||
|
|
||||||
|
return reply.output, reply.err
|
||||||
|
case <-ctx.Done():
|
||||||
|
return z[Output](), context.Cause(ctx)
|
||||||
|
}
|
||||||
|
}
|
||||||
82
lib/store/actorifiedstore.go
Normal file
82
lib/store/actorifiedstore.go
Normal file
|
|
@ -0,0 +1,82 @@
|
||||||
|
package store
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/TecharoHQ/anubis/internal/actorify"
|
||||||
|
)
|
||||||
|
|
||||||
|
type unit struct{}
|
||||||
|
|
||||||
|
type ActorifiedStore struct {
|
||||||
|
Interface
|
||||||
|
|
||||||
|
deleteActor *actorify.Actor[string, unit]
|
||||||
|
getActor *actorify.Actor[string, []byte]
|
||||||
|
setActor *actorify.Actor[*actorSetReq, unit]
|
||||||
|
cancel context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
type actorSetReq struct {
|
||||||
|
key string
|
||||||
|
value []byte
|
||||||
|
expiry time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewActorifiedStore(backend Interface) *ActorifiedStore {
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
result := &ActorifiedStore{
|
||||||
|
Interface: backend,
|
||||||
|
cancel: cancel,
|
||||||
|
}
|
||||||
|
|
||||||
|
result.deleteActor = actorify.New(ctx, result.actorDelete)
|
||||||
|
result.getActor = actorify.New(ctx, backend.Get)
|
||||||
|
result.setActor = actorify.New(ctx, result.actorSet)
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActorifiedStore) Close() { a.cancel() }
|
||||||
|
|
||||||
|
func (a *ActorifiedStore) Delete(ctx context.Context, key string) error {
|
||||||
|
if _, err := a.deleteActor.Call(ctx, key); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActorifiedStore) Get(ctx context.Context, key string) ([]byte, error) {
|
||||||
|
return a.getActor.Call(ctx, key)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActorifiedStore) Set(ctx context.Context, key string, value []byte, expiry time.Duration) error {
|
||||||
|
if _, err := a.setActor.Call(ctx, &actorSetReq{
|
||||||
|
key: key,
|
||||||
|
value: value,
|
||||||
|
expiry: expiry,
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActorifiedStore) actorDelete(ctx context.Context, key string) (unit, error) {
|
||||||
|
if err := a.Interface.Delete(ctx, key); err != nil {
|
||||||
|
return unit{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return unit{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *ActorifiedStore) actorSet(ctx context.Context, req *actorSetReq) (unit, error) {
|
||||||
|
if err := a.Interface.Set(ctx, req.key, req.value, req.expiry); err != nil {
|
||||||
|
return unit{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return unit{}, nil
|
||||||
|
}
|
||||||
|
|
@ -48,7 +48,7 @@ func (Factory) Build(ctx context.Context, data json.RawMessage) (store.Interface
|
||||||
|
|
||||||
go result.cleanupThread(ctx)
|
go result.cleanupThread(ctx)
|
||||||
|
|
||||||
return result, nil
|
return store.NewActorifiedStore(result), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Valid parses and validates the bbolt store Config or returns
|
// Valid parses and validates the bbolt store Config or returns
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue