fix(decaymap): fix lock convoy (#1106)
* fix(decaymap): fix lock convoy Ref #1103 This uses the actor pattern to delay deletion instead of making things fight over a lock. It also properly fixes locking logic to prevent the convoy problem. Signed-off-by: Xe Iaso <me@xeiaso.net> * docs: update CHANGELOG Signed-off-by: Xe Iaso <me@xeiaso.net> --------- Signed-off-by: Xe Iaso <me@xeiaso.net>
This commit is contained in:
parent
f79d36d21e
commit
63591866aa
3 changed files with 112 additions and 27 deletions
|
|
@ -14,6 +14,12 @@ func Zilch[T any]() T {
|
||||||
type Impl[K comparable, V any] struct {
|
type Impl[K comparable, V any] struct {
|
||||||
data map[K]decayMapEntry[V]
|
data map[K]decayMapEntry[V]
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
|
|
||||||
|
// deleteCh receives decay-deletion requests from readers.
|
||||||
|
deleteCh chan deleteReq[K]
|
||||||
|
// stopCh stops the background cleanup worker.
|
||||||
|
stopCh chan struct{}
|
||||||
|
wg sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
type decayMapEntry[V any] struct {
|
type decayMapEntry[V any] struct {
|
||||||
|
|
@ -21,30 +27,38 @@ type decayMapEntry[V any] struct {
|
||||||
expiry time.Time
|
expiry time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deleteReq is a request to remove a key if its expiry timestamp still matches
|
||||||
|
// the observed one. This prevents racing with concurrent Set updates.
|
||||||
|
type deleteReq[K comparable] struct {
|
||||||
|
key K
|
||||||
|
expiry time.Time
|
||||||
|
}
|
||||||
|
|
||||||
// New creates a new DecayMap of key type K and value type V.
|
// New creates a new DecayMap of key type K and value type V.
|
||||||
//
|
//
|
||||||
// Key types must be comparable to work with maps.
|
// Key types must be comparable to work with maps.
|
||||||
func New[K comparable, V any]() *Impl[K, V] {
|
func New[K comparable, V any]() *Impl[K, V] {
|
||||||
return &Impl[K, V]{
|
m := &Impl[K, V]{
|
||||||
data: make(map[K]decayMapEntry[V]),
|
data: make(map[K]decayMapEntry[V]),
|
||||||
|
deleteCh: make(chan deleteReq[K], 1024),
|
||||||
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
|
m.wg.Add(1)
|
||||||
|
go m.cleanupWorker()
|
||||||
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// expire forcibly expires a key by setting its time-to-live one second in the past.
|
// expire forcibly expires a key by setting its time-to-live one second in the past.
|
||||||
func (m *Impl[K, V]) expire(key K) bool {
|
func (m *Impl[K, V]) expire(key K) bool {
|
||||||
m.lock.RLock()
|
// Use a single write lock to avoid RUnlock->Lock convoy.
|
||||||
|
m.lock.Lock()
|
||||||
|
defer m.lock.Unlock()
|
||||||
val, ok := m.data[key]
|
val, ok := m.data[key]
|
||||||
m.lock.RUnlock()
|
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
m.lock.Lock()
|
|
||||||
val.expiry = time.Now().Add(-1 * time.Second)
|
val.expiry = time.Now().Add(-1 * time.Second)
|
||||||
m.data[key] = val
|
m.data[key] = val
|
||||||
m.lock.Unlock()
|
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -53,19 +67,14 @@ func (m *Impl[K, V]) expire(key K) bool {
|
||||||
// If the value does not exist, return false. Return true after
|
// If the value does not exist, return false. Return true after
|
||||||
// deletion.
|
// deletion.
|
||||||
func (m *Impl[K, V]) Delete(key K) bool {
|
func (m *Impl[K, V]) Delete(key K) bool {
|
||||||
m.lock.RLock()
|
// Use a single write lock to avoid RUnlock->Lock convoy.
|
||||||
_, ok := m.data[key]
|
|
||||||
m.lock.RUnlock()
|
|
||||||
|
|
||||||
if !ok {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
m.lock.Lock()
|
m.lock.Lock()
|
||||||
delete(m.data, key)
|
defer m.lock.Unlock()
|
||||||
m.lock.Unlock()
|
_, ok := m.data[key]
|
||||||
|
if ok {
|
||||||
return true
|
delete(m.data, key)
|
||||||
|
}
|
||||||
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get gets a value from the DecayMap by key.
|
// Get gets a value from the DecayMap by key.
|
||||||
|
|
@ -81,13 +90,12 @@ func (m *Impl[K, V]) Get(key K) (V, bool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if time.Now().After(value.expiry) {
|
if time.Now().After(value.expiry) {
|
||||||
m.lock.Lock()
|
// Defer decay deletion to the background worker to avoid convoy.
|
||||||
// Since previously reading m.data[key], the value may have been updated.
|
select {
|
||||||
// Delete the entry only if the expiry time is still the same.
|
case m.deleteCh <- deleteReq[K]{key: key, expiry: value.expiry}:
|
||||||
if m.data[key].expiry.Equal(value.expiry) {
|
default:
|
||||||
delete(m.data, key)
|
// Channel full: drop request; a future Cleanup() or Get will retry.
|
||||||
}
|
}
|
||||||
m.lock.Unlock()
|
|
||||||
|
|
||||||
return Zilch[V](), false
|
return Zilch[V](), false
|
||||||
}
|
}
|
||||||
|
|
@ -125,3 +133,64 @@ func (m *Impl[K, V]) Len() int {
|
||||||
defer m.lock.RUnlock()
|
defer m.lock.RUnlock()
|
||||||
return len(m.data)
|
return len(m.data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Close stops the background cleanup worker. It's optional to call; maps live
|
||||||
|
// for the process lifetime in many cases. Call in tests or when you know you no
|
||||||
|
// longer need the map to avoid goroutine leaks.
|
||||||
|
func (m *Impl[K, V]) Close() {
|
||||||
|
close(m.stopCh)
|
||||||
|
m.wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupWorker batches decay deletions to minimize lock contention.
|
||||||
|
func (m *Impl[K, V]) cleanupWorker() {
|
||||||
|
defer m.wg.Done()
|
||||||
|
batch := make([]deleteReq[K], 0, 64)
|
||||||
|
ticker := time.NewTicker(10 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
flush := func() {
|
||||||
|
if len(batch) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.applyDeletes(batch)
|
||||||
|
// reset batch without reallocating
|
||||||
|
batch = batch[:0]
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case req := <-m.deleteCh:
|
||||||
|
batch = append(batch, req)
|
||||||
|
case <-ticker.C:
|
||||||
|
flush()
|
||||||
|
case <-m.stopCh:
|
||||||
|
// Drain any remaining requests then exit
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case req := <-m.deleteCh:
|
||||||
|
batch = append(batch, req)
|
||||||
|
default:
|
||||||
|
flush()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Impl[K, V]) applyDeletes(batch []deleteReq[K]) {
|
||||||
|
now := time.Now()
|
||||||
|
m.lock.Lock()
|
||||||
|
for _, req := range batch {
|
||||||
|
entry, ok := m.data[req.key]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Only delete if the expiry is unchanged and already past.
|
||||||
|
if entry.expiry.Equal(req.expiry) && now.After(entry.expiry) {
|
||||||
|
delete(m.data, req.key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
m.lock.Unlock()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ import (
|
||||||
|
|
||||||
func TestImpl(t *testing.T) {
|
func TestImpl(t *testing.T) {
|
||||||
dm := New[string, string]()
|
dm := New[string, string]()
|
||||||
|
t.Cleanup(dm.Close)
|
||||||
|
|
||||||
dm.Set("test", "hi", 5*time.Minute)
|
dm.Set("test", "hi", 5*time.Minute)
|
||||||
|
|
||||||
|
|
@ -28,10 +29,24 @@ func TestImpl(t *testing.T) {
|
||||||
if ok {
|
if ok {
|
||||||
t.Error("got value even though it was supposed to be expired")
|
t.Error("got value even though it was supposed to be expired")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Deletion of expired entries after Get is deferred to a background worker.
|
||||||
|
// Assert it eventually disappears from the map.
|
||||||
|
deadline := time.Now().Add(200 * time.Millisecond)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
if dm.Len() == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
}
|
||||||
|
if dm.Len() != 0 {
|
||||||
|
t.Fatalf("expected background cleanup to remove expired key; len=%d", dm.Len())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCleanup(t *testing.T) {
|
func TestCleanup(t *testing.T) {
|
||||||
dm := New[string, string]()
|
dm := New[string, string]()
|
||||||
|
t.Cleanup(dm.Close)
|
||||||
|
|
||||||
dm.Set("test1", "hi1", 1*time.Second)
|
dm.Set("test1", "hi1", 1*time.Second)
|
||||||
dm.Set("test2", "hi2", 2*time.Second)
|
dm.Set("test2", "hi2", 2*time.Second)
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
|
|
||||||
<!-- This changes the project to: -->
|
<!-- This changes the project to: -->
|
||||||
|
|
||||||
|
- Fix lock convoy problem in decaymap ([#1103](https://github.com/TecharoHQ/anubis/issues/1103))
|
||||||
- Document missing environment variables in installation guide: `SLOG_LEVEL`, `COOKIE_PREFIX`, `FORCED_LANGUAGE`, and `TARGET_DISABLE_KEEPALIVE` ([#1086](https://github.com/TecharoHQ/anubis/pull/1086))
|
- Document missing environment variables in installation guide: `SLOG_LEVEL`, `COOKIE_PREFIX`, `FORCED_LANGUAGE`, and `TARGET_DISABLE_KEEPALIVE` ([#1086](https://github.com/TecharoHQ/anubis/pull/1086))
|
||||||
- Add validation warning when persistent storage is used without setting signing keys
|
- Add validation warning when persistent storage is used without setting signing keys
|
||||||
- Fixed `robots2policy` to properly group consecutive user agents into `any:` instead of only processing the last one ([#925](https://github.com/TecharoHQ/anubis/pull/925))
|
- Fixed `robots2policy` to properly group consecutive user agents into `any:` instead of only processing the last one ([#925](https://github.com/TecharoHQ/anubis/pull/925))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue