Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/ws timeout filter not removed #147

Merged
merged 4 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 31 additions & 36 deletions jsonrpc/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var defaultTimeout = 1 * time.Minute
const (
// The index in heap which is indicating the element is not in the heap
NoIndexInHeap = -1
// _checkDuration is for filter timeout check
_checkDuration = time.Second
)

// filter is an interface that BlockFilter and LogFilter implement
Expand Down Expand Up @@ -295,47 +297,38 @@ func (f *FilterManager) Run() {
}
}()

var timeoutCh <-chan time.Time
// Do not use 'for range + create long time after chan' any more,
// which would bring out some unpredictable result, especially when
// re-assgining the chan, the elder one would not be recycled by
// the GC as we expected.
// Use 'timer + reset' instead.
var checkTimer = time.NewTimer(_checkDuration)
defer checkTimer.Stop()

OUT_LOOP:
for {
// check for the next filter to be removed
filterBase := f.nextTimeoutFilter()
// remove expired filter first
if filterBase != nil && filterBase.expiresAt.Before(time.Now()) {
f.logger.Info("filter timeout", "id", filterBase.id, "expiresAt", filterBase.expiresAt)
f.Uninstall(filterBase.id)

// set timer to remove filter
if filterBase != nil {
// It is awful, which would simple drop the not expired timer to the GC,
// which might take several minutes to recycle it.
// And even worse, the ws filter don't have an expire timestamp, which
// means we might make an imediate fired timer.
timeoutCh = time.After(time.Until(filterBase.expiresAt))
continue
}

// reset timer for next check
checkTimer.Reset(_checkDuration)

select {
case evnt := <-watchCh:
case ev := <-watchCh:
// new blockchain event
if err := f.dispatchEvent(evnt); err != nil {
if err := f.dispatchEvent(ev); err != nil {
f.logger.Error("failed to dispatch event", "err", err)
}

case <-timeoutCh:
// timeout for filter
// might be nil
if filterBase == nil {
f.logger.Warn("timeout filterBase is nil")

continue OUT_LOOP
}

f.logger.Info("filterBase timeout", "id", filterBase.id, "expiresAt", filterBase.expiresAt)

if !f.Uninstall(filterBase.id) {
f.logger.Error("failed to uninstall filter", "id", filterBase.id)
}

case <-checkTimer.C:
// no need to do anything, checkout the timeout filter in the next loop
case <-f.updateCh:
// filters change, reset the loop to start the timeout timer

case <-f.closeCh:
// stop the filter manager
return
Expand Down Expand Up @@ -553,12 +546,8 @@ func (f *FilterManager) GetFilterChanges(id string) (string, error) {

// getFilterAndChanges returns the updates of the filter with given ID in string
func (f *FilterManager) getFilterAndChanges(id string) (filter, string, error) {
f.RLock()
defer f.RUnlock()

filter, ok := f.filters[id]

if !ok {
filter := f.getFilterByID(id)
if filter == nil {
return nil, "", ErrFilterDoesNotExists
}

Expand Down Expand Up @@ -589,13 +578,19 @@ func (f *FilterManager) Uninstall(id string) bool {
func (f *FilterManager) removeFilterByID(id string) bool {
filter, ok := f.filters[id]
if !ok {
return false
// not exits, should not retry
f.logger.Debug("filter not in list", "id", id)

return true
}

delete(f.filters, id)

if removed := f.timeouts.removeFilter(filter.getFilterBase()); removed {
f.logger.Debug("filter found in timeout heap", "id", id)
f.emitSignalToUpdateCh()
} else {
f.logger.Debug("filter already removed from timeout heap", "id", id)
}

return true
Expand Down Expand Up @@ -831,7 +826,7 @@ func (t *timeHeapImpl) Pop() interface{} {
n := len(old)
item := old[n-1]
old[n-1] = nil
item.heapIndex = -1
item.heapIndex = NoIndexInHeap // pop out and set it to not in heap
*t = old[0 : n-1]

return item
Expand Down
1 change: 1 addition & 0 deletions txpool/event_subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func shuffleTxPoolEvents(

randomEventType := func(supported bool) proto.EventType {
for {
//nolint:gosec
randNum, _ := rand.Int(rand.Reader, big.NewInt(int64(len(supportedTypes))))

randType := allEvents[randNum.Int64()]
Expand Down