diff --git a/decaymap/decaymap.go b/decaymap/decaymap.go index a6b6727..dd3ea41 100644 --- a/decaymap/decaymap.go +++ b/decaymap/decaymap.go @@ -14,6 +14,12 @@ func Zilch[T any]() T { type Impl[K comparable, V any] struct { data map[K]decayMapEntry[V] lock sync.RWMutex + + // deleteCh receives decay-deletion requests from readers. + deleteCh chan deleteReq[K] + // stopCh stops the background cleanup worker. + stopCh chan struct{} + wg sync.WaitGroup } type decayMapEntry[V any] struct { @@ -21,30 +27,38 @@ type decayMapEntry[V any] struct { expiry time.Time } +// deleteReq is a request to remove a key if its expiry timestamp still matches +// the observed one. This prevents racing with concurrent Set updates. +type deleteReq[K comparable] struct { + key K + expiry time.Time +} + // New creates a new DecayMap of key type K and value type V. // // Key types must be comparable to work with maps. func New[K comparable, V any]() *Impl[K, V] { - return &Impl[K, V]{ - data: make(map[K]decayMapEntry[V]), + m := &Impl[K, V]{ + data: make(map[K]decayMapEntry[V]), + deleteCh: make(chan deleteReq[K], 1024), + stopCh: make(chan struct{}), } + m.wg.Add(1) + go m.cleanupWorker() + return m } // expire forcibly expires a key by setting its time-to-live one second in the past. func (m *Impl[K, V]) expire(key K) bool { - m.lock.RLock() + // Use a single write lock to avoid RUnlock->Lock convoy. + m.lock.Lock() + defer m.lock.Unlock() val, ok := m.data[key] - m.lock.RUnlock() - if !ok { return false } - - m.lock.Lock() val.expiry = time.Now().Add(-1 * time.Second) m.data[key] = val - m.lock.Unlock() - return true } @@ -53,19 +67,14 @@ func (m *Impl[K, V]) expire(key K) bool { // If the value does not exist, return false. Return true after // deletion. func (m *Impl[K, V]) Delete(key K) bool { - m.lock.RLock() - _, ok := m.data[key] - m.lock.RUnlock() - - if !ok { - return false - } - + // Use a single write lock to avoid RUnlock->Lock convoy. m.lock.Lock() - delete(m.data, key) - m.lock.Unlock() - - return true + defer m.lock.Unlock() + _, ok := m.data[key] + if ok { + delete(m.data, key) + } + return ok } // Get gets a value from the DecayMap by key. @@ -81,13 +90,12 @@ func (m *Impl[K, V]) Get(key K) (V, bool) { } if time.Now().After(value.expiry) { - m.lock.Lock() - // Since previously reading m.data[key], the value may have been updated. - // Delete the entry only if the expiry time is still the same. - if m.data[key].expiry.Equal(value.expiry) { - delete(m.data, key) + // Defer decay deletion to the background worker to avoid convoy. + select { + case m.deleteCh <- deleteReq[K]{key: key, expiry: value.expiry}: + default: + // Channel full: drop request; a future Cleanup() or Get will retry. } - m.lock.Unlock() return Zilch[V](), false } @@ -125,3 +133,64 @@ func (m *Impl[K, V]) Len() int { defer m.lock.RUnlock() return len(m.data) } + +// Close stops the background cleanup worker. It's optional to call; maps live +// for the process lifetime in many cases. Call in tests or when you know you no +// longer need the map to avoid goroutine leaks. +func (m *Impl[K, V]) Close() { + close(m.stopCh) + m.wg.Wait() +} + +// cleanupWorker batches decay deletions to minimize lock contention. +func (m *Impl[K, V]) cleanupWorker() { + defer m.wg.Done() + batch := make([]deleteReq[K], 0, 64) + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + flush := func() { + if len(batch) == 0 { + return + } + m.applyDeletes(batch) + // reset batch without reallocating + batch = batch[:0] + } + + for { + select { + case req := <-m.deleteCh: + batch = append(batch, req) + case <-ticker.C: + flush() + case <-m.stopCh: + // Drain any remaining requests then exit + for { + select { + case req := <-m.deleteCh: + batch = append(batch, req) + default: + flush() + return + } + } + } + } +} + +func (m *Impl[K, V]) applyDeletes(batch []deleteReq[K]) { + now := time.Now() + m.lock.Lock() + for _, req := range batch { + entry, ok := m.data[req.key] + if !ok { + continue + } + // Only delete if the expiry is unchanged and already past. + if entry.expiry.Equal(req.expiry) && now.After(entry.expiry) { + delete(m.data, req.key) + } + } + m.lock.Unlock() +} diff --git a/decaymap/decaymap_test.go b/decaymap/decaymap_test.go index c1830ed..e9bc824 100644 --- a/decaymap/decaymap_test.go +++ b/decaymap/decaymap_test.go @@ -7,6 +7,7 @@ import ( func TestImpl(t *testing.T) { dm := New[string, string]() + t.Cleanup(dm.Close) dm.Set("test", "hi", 5*time.Minute) @@ -28,10 +29,24 @@ func TestImpl(t *testing.T) { if ok { t.Error("got value even though it was supposed to be expired") } + + // Deletion of expired entries after Get is deferred to a background worker. + // Assert it eventually disappears from the map. + deadline := time.Now().Add(200 * time.Millisecond) + for time.Now().Before(deadline) { + if dm.Len() == 0 { + break + } + time.Sleep(5 * time.Millisecond) + } + if dm.Len() != 0 { + t.Fatalf("expected background cleanup to remove expired key; len=%d", dm.Len()) + } } func TestCleanup(t *testing.T) { dm := New[string, string]() + t.Cleanup(dm.Close) dm.Set("test1", "hi1", 1*time.Second) dm.Set("test2", "hi2", 2*time.Second) diff --git a/docs/docs/CHANGELOG.md b/docs/docs/CHANGELOG.md index e5c6cea..8f717c6 100644 --- a/docs/docs/CHANGELOG.md +++ b/docs/docs/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +- Fix lock convoy problem in decaymap ([#1103](https://github.com/TecharoHQ/anubis/issues/1103)) - Document missing environment variables in installation guide: `SLOG_LEVEL`, `COOKIE_PREFIX`, `FORCED_LANGUAGE`, and `TARGET_DISABLE_KEEPALIVE` ([#1086](https://github.com/TecharoHQ/anubis/pull/1086)) - Add validation warning when persistent storage is used without setting signing keys - Fixed `robots2policy` to properly group consecutive user agents into `any:` instead of only processing the last one ([#925](https://github.com/TecharoHQ/anubis/pull/925))