diff --git a/cache.go b/cache.go index e48a48a5..fc01d323 100644 --- a/cache.go +++ b/cache.go @@ -29,7 +29,7 @@ import ( "github.com/dgraph-io/ristretto/z" ) -const ( +var ( // TODO: find the optimal value for this or make it configurable setBufSize = 32 * 1024 ) @@ -235,7 +235,10 @@ func (c *Cache) SetWithTTL(key, value interface{}, cost int64, ttl time.Duration return true default: c.Metrics.add(dropSets, keyHash, 1) - return false + // Return true if this was an update operation since we've already + // updated the store. For all the other operations (set/delete), we + // return false which means the item was not inserted. + return i.flag == itemUpdate } } diff --git a/cache_test.go b/cache_test.go index 0b6bf903..4ab160e3 100644 --- a/cache_test.go +++ b/cache_test.go @@ -1,7 +1,9 @@ package ristretto import ( + "fmt" "math/rand" + "strconv" "strings" "sync" "testing" @@ -618,3 +620,65 @@ func init() { // Set bucketSizeSecs to 1 to avoid waiting too much during the tests. bucketDurationSecs = 1 } + +// Regression test for bug https://github.com/dgraph-io/ristretto/issues/167 +func TestDropUpdates(t *testing.T) { + originalSetBugSize := setBufSize + defer func() { setBufSize = originalSetBugSize }() + + test := func() { + // dropppedMap stores the items dropped from the cache. + droppedMap := make(map[int]struct{}) + lastEvictedSet := int64(-1) + + var err error + handler := func(_ interface{}, value interface{}) { + v := value.(string) + lastEvictedSet, err = strconv.ParseInt(string(v), 10, 32) + require.NoError(t, err) + + _, ok := droppedMap[int(lastEvictedSet)] + if ok { + panic(fmt.Sprintf("val = %+v was dropped but it got evicted. Dropped items: %+v\n", + lastEvictedSet, droppedMap)) + } + } + + // This is important. The race condition shows up only when the setBuf + // is full and that's why we reduce the buf size here. The test will + // try to fill up the setbuf to it's capacity and then perform an + // update on a key. + setBufSize = 10 + + c, err := NewCache(&Config{ + NumCounters: 100, + MaxCost: 10, + BufferItems: 64, + Metrics: true, + OnEvict: func(_, _ uint64, value interface{}, _ int64) { + handler(nil, value) + }, + }) + require.NoError(t, err) + + for i := 0; i < 5*setBufSize; i++ { + v := fmt.Sprintf("%0100d", i) + // We're updating the same key. + if !c.Set(0, v, 1) { + // The race condition doesn't show up without this sleep. + time.Sleep(time.Microsecond) + droppedMap[i] = struct{}{} + } + } + // Wait for all the items to be processed. + time.Sleep(time.Millisecond) + // This will cause eviction from the cache. + require.True(t, c.Set(1, nil, 10)) + c.Close() + } + + // Run the test 100 times since it's not reliable. + for i := 0; i < 100; i++ { + test() + } +}