Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions server/restserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,7 @@ func PutHandler(s *Server, w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(resp)
return
}
s.store.Put(namespace, key)
count := s.store.Count(namespace, key)
count := s.store.Put(namespace, key)
resp := CountResponse{Count: count}
json.NewEncoder(w).Encode(resp)
}
Expand Down
27 changes: 18 additions & 9 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func (m *Metrics) ListenAndServe(promHostPort string) error {
type Store struct {
sync.Mutex // mutext locks namespaces
namespaces *hashmap.Map
namespaceSnapshot []string
expireAfterSecs int64
cleanupServiceEnabled bool
LastMetrics *Metrics
Expand Down Expand Up @@ -78,8 +79,9 @@ func NewStore(expireAfterSecs int64) *Store {
registry.MustRegister(maxNamespacesDenomGauge, namespacesTotalCount, namespacesGarbageCollected, keysRemainingInGCNamespaces, countTotalRemainingInGCNamespaces, gcPauseTime)

s := &Store{
expireAfterSecs: expireAfterSecs,
namespaces: hashmap.New(),
expireAfterSecs: expireAfterSecs,
namespaces: hashmap.New(),
lastGCdNamespaces: map[string]bool{},
LastMetrics: &Metrics{
registry: registry,
maxNamespacesDenom: maxNamespacesDenomGauge,
Expand Down Expand Up @@ -191,24 +193,27 @@ func (s *Store) runCleanup() []string {
for _, key := range keys {
keyList = append(keyList, key.(string))
}

s.Lock()
s.namespaceSnapshot = append([]string(nil), keyList...)
s.Unlock()
return keyList
}

func (s *Store) Put(ns, entryKey string) {
func (s *Store) Put(ns, entryKey string) int {
var subtree *tree.Tree
s.Lock()
subtreeI, found := s.namespaces.Get(ns)
s.Unlock()
if !found {
subtree = tree.NewTree(s.expireAfterSecs)
s.Lock()
s.namespaces.Put(ns, subtree)
s.Unlock()
s.namespaceSnapshot = append(s.namespaceSnapshot, ns)
} else {
subtree = subtreeI.(*tree.Tree)
}
s.Unlock()

subtree.Put(entryKey)
return subtree.Put(entryKey)
}

// Count returns the number of entries at a namespace and key, returning
Expand All @@ -227,8 +232,12 @@ func (s *Store) Count(ns, entryKey string) int {

// Namespaces returns the approximate current namespaces list
func (s *Store) Namespaces() []string {
keys := s.runCleanup()
return keys
s.Lock()
defer s.Unlock()
if len(s.namespaceSnapshot) == 0 {
return []string{}
}
return append([]string(nil), s.namespaceSnapshot...)
}

// CountEntries returns the count of all entries for the entire namespace.
Expand Down
20 changes: 20 additions & 0 deletions store/store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package store

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestStore_NamespacesUsesSnapshot(t *testing.T) {
s := NewStore(60)
s.DisableCleanup()

assert.Equal(t, []string{}, s.Namespaces())

assert.Equal(t, 1, s.Put("namespace0", "key0"))
assert.Equal(t, 2, s.Put("namespace0", "key0"))
assert.Equal(t, 1, s.Put("namespace1", "key1"))

assert.ElementsMatch(t, []string{"namespace0", "namespace1"}, s.Namespaces())
}
64 changes: 39 additions & 25 deletions store/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ import (

type ExpireAtSecs int64

type entry struct {
expiresAt []int64
liveCount int
}

// Tree is a a thread-safe data structure for tracking expirable items. It automatically expires old entries and keys.
// It does not garbage collect. Items are only expired when interacting with the data structure.
type Tree struct {
Expand Down Expand Up @@ -60,20 +65,24 @@ func (n *Tree) Count(entryKey string) int {
n.Lock()
defer n.Unlock()

datesSecs := n.getAndCleanupUnsafe(entryKey)
if datesSecs == nil {
item, found := n.getAndCleanupUnsafe(entryKey)
if !found {
return 0
}

if item.liveCount == 0 {
n.tree.Remove(entryKey)
return 0
}

if len(*datesSecs) == 0 {
item = removeExpired(item)
if item.liveCount == 0 {
n.tree.Remove(entryKey)
return 0
}

datesSecs = removeExpired(datesSecs)
count := len(*datesSecs)
n.tree.Put(entryKey, *datesSecs)
return count
n.tree.Put(entryKey, item)
return item.liveCount
}

// KeyMatch crawls the subtree to return keys starting with the `keyPattern` string.
Expand Down Expand Up @@ -110,47 +119,52 @@ func (n *Tree) KeyMatch(keyPattern string) []string {
return out
}

func (n *Tree) Put(entryKey string) {
func (n *Tree) Put(entryKey string) int {
n.Lock()
defer n.Unlock()

datesSecs := n.getAndCleanupUnsafe(entryKey)
if datesSecs == nil {
datesSecs = &[]int64{}
item, found := n.getAndCleanupUnsafe(entryKey)
if !found {
item = entry{}
}
datesSecs = removeExpired(datesSecs)
item = removeExpired(item)
secs := time.Now().Unix()
nextDatesSecs := append(*datesSecs, secs+n.defaultExpireAfterSecs)
n.tree.Put(entryKey, nextDatesSecs)
item.expiresAt = append(item.expiresAt, secs+n.defaultExpireAfterSecs)
item.liveCount = len(item.expiresAt)
n.tree.Put(entryKey, item)
return item.liveCount
}

// getAndCleanupUnsafe does not lock the mutex, so it can be used inside a lock
func (n *Tree) getAndCleanupUnsafe(entryKey string) *[]int64 {
func (n *Tree) getAndCleanupUnsafe(entryKey string) (entry, bool) {
val, found := n.tree.Get(entryKey)
if !found {
return nil
return entry{}, false
}
dates := val.([]int64)
if len(dates) == 0 {
item := val.(entry)
if item.liveCount == 0 {
// cleanup empty entry
n.tree.Remove(entryKey)
return nil
return entry{}, false
}
return &dates // not extra copy
return item, true
}

func removeExpired(datesSecs *[]int64) *[]int64 {
if len(*datesSecs) == 0 {
return datesSecs
func removeExpired(item entry) entry {
if len(item.expiresAt) == 0 {
item.liveCount = 0
return item
}
currentTime := time.Now().Unix()
var out []int64
// TODO: these are already sorted, so we can discard earlier entries
for _, removeAt := range *datesSecs {
for _, removeAt := range item.expiresAt {
if removeAt > currentTime {
// KEEP - not expired
out = append(out, removeAt)
}
}
return &out
item.expiresAt = out
item.liveCount = len(out)
return item
}
17 changes: 12 additions & 5 deletions store/tree/tree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ func TestTree_removeExpired(t *testing.T) {
time.Now().Unix() - 1, // expired
}

result := removeExpired(&entries)
assert.Equal(t, 2, len(*result))
assert.Equal(t, (*result)[0], keep1)
assert.Equal(t, (*result)[1], keep2)
result := removeExpired(entry{expiresAt: entries, liveCount: len(entries)})
assert.Equal(t, 2, result.liveCount)
assert.Equal(t, result.expiresAt[0], keep1)
assert.Equal(t, result.expiresAt[1], keep2)
}

func TestTree_Count(t *testing.T) {
Expand Down Expand Up @@ -78,6 +78,13 @@ func TestTree_Count(t *testing.T) {
assert.Equal(t, 0, len(keys), "Keys() should expire keys")
assert.Equal(t, 0, entryCount, "Keys() should expire entries")
})
t.Run("put returns live count without needing a second scan", func(t *testing.T) {
tr := NewTree(2)
assert.Equal(t, 1, tr.Put("willy"))
assert.Equal(t, 2, tr.Put("willy"))
assert.Equal(t, 1, tr.Put("other"))
assert.Equal(t, 2, tr.Count("willy"))
})
}

func TestTree_KeyMatch(t *testing.T) {
Expand Down Expand Up @@ -125,7 +132,7 @@ func TestTree_KeyMatch(t *testing.T) {
t.Run("it does not return the key when all its values are expired", func(t *testing.T) {
tr := NewTree(1)
tr.Put("a")
tr.tree.Put("a", []string{})
tr.tree.Put("a", entry{})

tr.Put("cdbe")
tr.Put("cd:aa")
Expand Down
Loading