From a41d671389bf3605490e12c2d5ac55bb101596c2 Mon Sep 17 00:00:00 2001 From: mhmtszr Date: Tue, 16 Apr 2024 15:14:01 +0300 Subject: [PATCH] feat: update range function as lock free --- concurrent_swiss_map.go | 59 +++++++++++++++++++++++++++++++----- concurrent_swiss_map_test.go | 22 ++++++++++++++ 2 files changed, 74 insertions(+), 7 deletions(-) diff --git a/concurrent_swiss_map.go b/concurrent_swiss_map.go index eedb379..f26a41e 100644 --- a/concurrent_swiss_map.go +++ b/concurrent_swiss_map.go @@ -1,6 +1,7 @@ package csmap import ( + "context" "sync" "github.com/mhmtszr/concurrent-swiss-map/maphash" @@ -166,18 +167,62 @@ func (m *CsMap[K, V]) IsEmpty() bool { return m.Count() == 0 } +type Tuple[K comparable, V any] struct { + Key K + Val V +} + // Range If the callback function returns true iteration will stop. func (m *CsMap[K, V]) Range(f func(key K, value V) (stop bool)) { + ch := make(chan Tuple[K, V], m.Count()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + listenCompleted := m.listen(f, ch) + m.produce(ctx, ch) + listenCompleted.Wait() +} + +func (m *CsMap[K, V]) produce(ctx context.Context, ch chan Tuple[K, V]) { + var wg sync.WaitGroup + wg.Add(len(m.shards)) for i := range m.shards { - shard := m.shards[i] - shard.RLock() - stop := shard.items.Iter(f) - if stop { + go func(i int) { + defer wg.Done() + + shard := m.shards[i] + shard.RLock() + shard.items.Iter(func(k K, v V) (stop bool) { + select { + case <-ctx.Done(): + return true + default: + ch <- Tuple[K, V]{Key: k, Val: v} + } + return false + }) shard.RUnlock() - return - } - shard.RUnlock() + }(i) } + go func() { + wg.Wait() + close(ch) + }() +} + +func (m *CsMap[K, V]) listen(f func(key K, value V) (stop bool), ch chan Tuple[K, V]) *sync.WaitGroup { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for t := range ch { + if stop := f(t.Key, t.Val); stop { + return + } + } + }() + return &wg } type HashShardPair[K comparable, V any] struct { diff --git a/concurrent_swiss_map_test.go b/concurrent_swiss_map_test.go index ff78263..4a9247c 100644 --- a/concurrent_swiss_map_test.go +++ b/concurrent_swiss_map_test.go @@ -195,6 +195,28 @@ func TestCustomHasherWithRange(t *testing.T) { } } +func TestDeleteFromRange(t *testing.T) { + myMap := csmap.Create[string, int]( + csmap.WithSize[string, int](1024), + ) + + myMap.Store("aaa", 10) + myMap.Store("aab", 11) + myMap.Store("aac", 15) + myMap.Store("aad", 124) + myMap.Store("aaf", 987) + + myMap.Range(func(key string, value int) (stop bool) { + if value > 20 { + myMap.Delete(key) + } + return false + }) + if myMap.Count() != 3 { + t.Fatal("total should be 3, because currently range deletes values that bigger than 20.") + } +} + func TestBasicConcurrentWriteDeleteCount(t *testing.T) { myMap := csmap.Create[int, string]( csmap.WithShardCount[int, string](32),