Skip to content

Commit

Permalink
Refactor logsFilter to prevent concurrent map fatal errors (#10672) (#…
Browse files Browse the repository at this point in the history
…11892)

relates to #11890
cherry pick from E3
[2f2ce6a](2f2ce6a)

-----

#### Issue:
At line 129 in `logsfilter.go`, we had the following line of code:
```go
_, addrOk := filter.addrs[gointerfaces.ConvertH160toAddress(eventLog.Address)]
```


This line caused a panic due to a fatal error:
```logs
fatal error: concurrent map read and map write

goroutine 106 [running]:
github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog.func1({0xc009701db8?, 0x8?}, 0xc135d26050)
github.com/ledgerwatch/erigon/turbo/rpchelper/logsfilter.go:129 +0xe7
github.com/ledgerwatch/erigon/turbo/rpchelper.(*SyncMap[...]).Range(0xc009701eb0?, 0xc009701e70?)
github.com/ledgerwatch/erigon/turbo/rpchelper/subscription.go:97 +0x11a
github.com/ledgerwatch/erigon/turbo/rpchelper.(*LogsFilterAggregator).distributeLog(0x25f4600?, 0xc0000ce090?)
github.com/ledgerwatch/erigon/turbo/rpchelper/logsfilter.go:131 +0xc7
github.com/ledgerwatch/erigon/turbo/rpchelper.(*Filters).OnNewLogs(...)
github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go:547
github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcservices.(*RemoteBackend).SubscribeLogs(0xc0019c2f50, {0x32f0040, 0xc001b4a280}, 0xc001c0c0e0, 0x0?)
github.com/ledgerwatch/erigon/cmd/rpcdaemon/rpcservices/eth_backend.go:227 +0x1d1
github.com/ledgerwatch/erigon/turbo/rpchelper.New.func2()
github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go:102 +0xec
created by github.com/ledgerwatch/erigon/turbo/rpchelper.New
github.com/ledgerwatch/erigon/turbo/rpchelper/filters.go:92 +0x652
```

This error indicates that there were simultaneous read and write
operations on the `filter.addrs` map, leading to a race condition.


#### Solution:
To resolve this issue, I implemented the following changes:

- Moved SyncMap to erigon-lib common library: This allows us to utilize
a thread-safe map across different packages that require synchronized
map access.
- Refactored logsFilter to use SyncMap: By replacing the standard map
with SyncMap, we ensured that all map operations are thread-safe, thus
preventing concurrent read and write errors.
- Added documentation for SyncMap usage: Detailed documentation was
provided to guide the usage of SyncMap and related refactored
components, ensuring clarity and proper utilization.

Co-authored-by: Bret <787344+bretep@users.noreply.github.com>
  • Loading branch information
taratorio and bretep committed Sep 5, 2024
1 parent 2769237 commit 5ba8303
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 142 deletions.
79 changes: 79 additions & 0 deletions erigon-lib/common/concurrent/concurrent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package concurrent

import "sync"

// NewSyncMap initializes and returns a new instance of SyncMap.
func NewSyncMap[K comparable, T any]() *SyncMap[K, T] {
return &SyncMap[K, T]{
m: make(map[K]T),
}
}

// SyncMap is a generic map that uses a read-write mutex to ensure thread-safe access.
type SyncMap[K comparable, T any] struct {
m map[K]T
mu sync.RWMutex
}

// Get retrieves the value associated with the given key.
func (m *SyncMap[K, T]) Get(k K) (res T, ok bool) {
m.mu.RLock()
defer m.mu.RUnlock()
res, ok = m.m[k]
return res, ok
}

// Put sets the value for the given key, returning the previous value if present.
func (m *SyncMap[K, T]) Put(k K, v T) (T, bool) {
m.mu.Lock()
defer m.mu.Unlock()
old, ok := m.m[k]
m.m[k] = v
return old, ok
}

// Do performs a custom operation on the value associated with the given key.
func (m *SyncMap[K, T]) Do(k K, fn func(T, bool) (T, bool)) (after T, ok bool) {
m.mu.Lock()
defer m.mu.Unlock()
val, ok := m.m[k]
nv, save := fn(val, ok)
if save {
m.m[k] = nv
} else {
delete(m.m, k)
}
return nv, ok
}

// DoAndStore performs a custom operation on the value associated with the given key and stores the result.
func (m *SyncMap[K, T]) DoAndStore(k K, fn func(t T, ok bool) T) (after T, ok bool) {
return m.Do(k, func(t T, b bool) (T, bool) {
res := fn(t, b)
return res, true
})
}

// Range calls a function for each key-value pair in the map.
func (m *SyncMap[K, T]) Range(fn func(k K, v T) error) error {
m.mu.RLock()
defer m.mu.RUnlock()
for k, v := range m.m {
if err := fn(k, v); err != nil {
return err
}
}
return nil
}

// Delete removes the value associated with the given key, if present.
func (m *SyncMap[K, T]) Delete(k K) (t T, deleted bool) {
m.mu.Lock()
defer m.mu.Unlock()
val, ok := m.m[k]
if !ok {
return t, false
}
delete(m.m, k)
return val, true
}
Loading

0 comments on commit 5ba8303

Please sign in to comment.