Skip to content

Commit

Permalink
Update tests with proper comparison, Fix SyncMap, improve readability
Browse files Browse the repository at this point in the history
  • Loading branch information
bretep committed Jun 10, 2024
1 parent d2f3182 commit 7d550c1
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 23 deletions.
2 changes: 2 additions & 0 deletions erigon-lib/common/concurrent/concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ func (m *SyncMap[K, T]) Do(k K, fn func(T, bool) (T, bool)) (after T, ok bool) {
nv, save := fn(val, ok)
if save {
m.m[k] = nv
} else {
delete(m.m, k)
}
return nv, ok
}
Expand Down
29 changes: 20 additions & 9 deletions turbo/rpchelper/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,32 +401,43 @@ func (ff *Filters) UnsubscribePendingTxs(id PendingTxsSubID) bool {

// SubscribeLogs subscribes to logs using the specified filter criteria and returns a channel to receive the logs
// and a subscription ID to manage the subscription.
func (ff *Filters) SubscribeLogs(size int, crit filters.FilterCriteria) (<-chan *types.Log, LogsSubID) {
func (ff *Filters) SubscribeLogs(size int, criteria filters.FilterCriteria) (<-chan *types.Log, LogsSubID) {
sub := newChanSub[*types.Log](size)
id, f := ff.logsSubs.insertLogsFilter(sub)

// Initialize address and topic maps
f.addrs = concurrent.NewSyncMap[libcommon.Address, int]()
if len(crit.Addresses) == 0 {
f.topics = concurrent.NewSyncMap[libcommon.Hash, int]()

// Handle addresses
if len(criteria.Addresses) == 0 {
// If no addresses are specified, it means all addresses should be included
f.allAddrs = 1
} else {
for _, addr := range crit.Addresses {
for _, addr := range criteria.Addresses {
f.addrs.Put(addr, 1)
}
}
f.topics = concurrent.NewSyncMap[libcommon.Hash, int]()
if len(crit.Topics) == 0 {

// Handle topics
if len(criteria.Topics) == 0 {
// If no topics are specified, it means all topics should be included
f.allTopics = 1
} else {
for _, topics := range crit.Topics {
for _, topics := range criteria.Topics {
for _, topic := range topics {
f.topics.Put(topic, 1)
}
}
}
f.topicsOriginal = crit.Topics

// Store original topics for reference
f.topicsOriginal = criteria.Topics

// Add the filter to the list of log filters
ff.logsSubs.addLogsFilters(f)

// if any filter in the aggregate needs all addresses or all topics then the global log subscription needs to
// allow all addresses or topics through
// Create a filter request based on the aggregated filters
lfr := ff.logsSubs.createFilterRequest()
addresses, topics := ff.logsSubs.getAggMaps()
for addr := range addresses {
Expand Down
20 changes: 10 additions & 10 deletions turbo/rpchelper/filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) {
if lastFilterRequest.AllTopics == false {
t.Error("2: expected all topics to be true")
}
if len(lastFilterRequest.Addresses) != 1 && lastFilterRequest.Addresses[0] != address1H160 {
if len(lastFilterRequest.Addresses) != 1 || gointerfaces.ConvertH160toAddress(lastFilterRequest.Addresses[0]) != gointerfaces.ConvertH160toAddress(address1H160) {
t.Error("2: expected the address to match the last request")
}

Expand All @@ -288,10 +288,10 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) {
if lastFilterRequest.AllTopics == false {
t.Error("3: expected all topics to be true")
}
if len(lastFilterRequest.Addresses) != 1 && lastFilterRequest.Addresses[0] != address1H160 {
if len(lastFilterRequest.Addresses) != 1 || gointerfaces.ConvertH160toAddress(lastFilterRequest.Addresses[0]) != gointerfaces.ConvertH160toAddress(address1H160) {
t.Error("3: expected the address to match the previous request")
}
if len(lastFilterRequest.Topics) != 1 && lastFilterRequest.Topics[0] != topic1H256 {
if len(lastFilterRequest.Topics) != 1 || gointerfaces.ConvertH256ToHash(lastFilterRequest.Topics[0]) != gointerfaces.ConvertH256ToHash(topic1H256) {
t.Error("3: expected the topics to match the last request")
}

Expand All @@ -307,10 +307,10 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) {
if lastFilterRequest.AllTopics == false {
t.Error("4: expected all topics to be true")
}
if len(lastFilterRequest.Addresses) != 1 && lastFilterRequest.Addresses[0] != address1H160 {
if len(lastFilterRequest.Addresses) != 1 || gointerfaces.ConvertH160toAddress(lastFilterRequest.Addresses[0]) != gointerfaces.ConvertH160toAddress(address1H160) {
t.Error("4: expected an address to be present")
}
if len(lastFilterRequest.Topics) != 1 && lastFilterRequest.Topics[0] != topic1H256 {
if len(lastFilterRequest.Topics) != 1 || gointerfaces.ConvertH256ToHash(lastFilterRequest.Topics[0]) != gointerfaces.ConvertH256ToHash(topic1H256) {
t.Error("4: expected a topic to be present")
}

Expand All @@ -327,23 +327,23 @@ func TestFilters_SubscribeLogsGeneratesCorrectLogFilterRequest(t *testing.T) {
if len(lastFilterRequest.Addresses) != 0 {
t.Error("5: expected addresses to be empty")
}
if len(lastFilterRequest.Topics) != 1 && lastFilterRequest.Topics[0] != topic1H256 {
if len(lastFilterRequest.Topics) != 1 || gointerfaces.ConvertH256ToHash(lastFilterRequest.Topics[0]) != gointerfaces.ConvertH256ToHash(topic1H256) {
t.Error("5: expected a topic to be present")
}

// unsubscribing the last filter should leave us with false for the all addresses and all topics
// and nothing in the address or topics lists
f.UnsubscribeLogs(id3)
if lastFilterRequest.AllAddresses == true {
t.Error("5: expected all addresses to be false")
t.Error("6: expected all addresses to be false")
}
if lastFilterRequest.AllTopics == true {
t.Error("5: expected all topics to be false")
t.Error("6: expected all topics to be false")
}
if len(lastFilterRequest.Addresses) != 0 {
t.Error("5: expected addresses to be empty")
t.Error("6: expected addresses to be empty")
}
if len(lastFilterRequest.Topics) != 0 {
t.Error("5: expected topics to be empty")
t.Error("6: expected topics to be empty")
}
}
9 changes: 5 additions & 4 deletions turbo/rpchelper/logsfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ func (a *LogsFilterAggregator) insertLogsFilter(sender Sub[*types2.Log]) (LogsSu
// removeLogsFilter removes a log filter identified by filterId from the LogsFilterAggregator.
// It closes the filter and subtracts its addresses and topics from the aggregated filter.
func (a *LogsFilterAggregator) removeLogsFilter(filterId LogsSubID) bool {
a.logsFilterLock.Lock()
defer a.logsFilterLock.Unlock()

filter, ok := a.logsFilters.Get(filterId)
if !ok {
return false
Expand Down Expand Up @@ -101,14 +104,12 @@ func (a *LogsFilterAggregator) createFilterRequest() *remote.LogsFilterRequest {
// It decrements the counters for each address and topic in the aggregated filter by the corresponding counts in the
// provided LogsFilter. If the count for any address or topic reaches zero, it is removed from the aggregated filter.
func (a *LogsFilterAggregator) subtractLogFilters(f *LogsFilter) {
a.logsFilterLock.Lock()
defer a.logsFilterLock.Unlock()
a.aggLogsFilter.allAddrs -= f.allAddrs
f.addrs.Range(func(addr libcommon.Address, count int) error {
a.aggLogsFilter.addrs.Do(addr, func(value int, exists bool) (int, bool) {
if exists {
newValue := value - count
if newValue == 0 {
if newValue <= 0 {
return 0, false
}
return newValue, true
Expand All @@ -122,7 +123,7 @@ func (a *LogsFilterAggregator) subtractLogFilters(f *LogsFilter) {
a.aggLogsFilter.topics.Do(topic, func(value int, exists bool) (int, bool) {
if exists {
newValue := value - count
if newValue == 0 {
if newValue <= 0 {
return 0, false
}
return newValue, true
Expand Down

0 comments on commit 7d550c1

Please sign in to comment.