diff --git a/jsonrpc/filter_manager.go b/jsonrpc/filter_manager.go index e50767f2a7..72b01fa4cb 100644 --- a/jsonrpc/filter_manager.go +++ b/jsonrpc/filter_manager.go @@ -34,6 +34,8 @@ var defaultTimeout = 1 * time.Minute const ( // The index in heap which is indicating the element is not in the heap NoIndexInHeap = -1 + // _checkDuration is for filter timeout check + _checkDuration = time.Second ) // filter is an interface that BlockFilter and LogFilter implement @@ -295,47 +297,38 @@ func (f *FilterManager) Run() { } }() - var timeoutCh <-chan time.Time + // Do not use 'for range + create long time after chan' any more, + // which would bring out some unpredictable result, especially when + // re-assgining the chan, the elder one would not be recycled by + // the GC as we expected. + // Use 'timer + reset' instead. + var checkTimer = time.NewTimer(_checkDuration) + defer checkTimer.Stop() -OUT_LOOP: for { // check for the next filter to be removed filterBase := f.nextTimeoutFilter() + // remove expired filter first + if filterBase != nil && filterBase.expiresAt.Before(time.Now()) { + f.logger.Info("filter timeout", "id", filterBase.id, "expiresAt", filterBase.expiresAt) + f.Uninstall(filterBase.id) - // set timer to remove filter - if filterBase != nil { - // It is awful, which would simple drop the not expired timer to the GC, - // which might take several minutes to recycle it. - // And even worse, the ws filter don't have an expire timestamp, which - // means we might make an imediate fired timer. - timeoutCh = time.After(time.Until(filterBase.expiresAt)) + continue } + // reset timer for next check + checkTimer.Reset(_checkDuration) + select { - case evnt := <-watchCh: + case ev := <-watchCh: // new blockchain event - if err := f.dispatchEvent(evnt); err != nil { + if err := f.dispatchEvent(ev); err != nil { f.logger.Error("failed to dispatch event", "err", err) } - - case <-timeoutCh: - // timeout for filter - // might be nil - if filterBase == nil { - f.logger.Warn("timeout filterBase is nil") - - continue OUT_LOOP - } - - f.logger.Info("filterBase timeout", "id", filterBase.id, "expiresAt", filterBase.expiresAt) - - if !f.Uninstall(filterBase.id) { - f.logger.Error("failed to uninstall filter", "id", filterBase.id) - } - + case <-checkTimer.C: + // no need to do anything, checkout the timeout filter in the next loop case <-f.updateCh: // filters change, reset the loop to start the timeout timer - case <-f.closeCh: // stop the filter manager return @@ -553,12 +546,8 @@ func (f *FilterManager) GetFilterChanges(id string) (string, error) { // getFilterAndChanges returns the updates of the filter with given ID in string func (f *FilterManager) getFilterAndChanges(id string) (filter, string, error) { - f.RLock() - defer f.RUnlock() - - filter, ok := f.filters[id] - - if !ok { + filter := f.getFilterByID(id) + if filter == nil { return nil, "", ErrFilterDoesNotExists } @@ -589,13 +578,19 @@ func (f *FilterManager) Uninstall(id string) bool { func (f *FilterManager) removeFilterByID(id string) bool { filter, ok := f.filters[id] if !ok { - return false + // not exits, should not retry + f.logger.Debug("filter not in list", "id", id) + + return true } delete(f.filters, id) if removed := f.timeouts.removeFilter(filter.getFilterBase()); removed { + f.logger.Debug("filter found in timeout heap", "id", id) f.emitSignalToUpdateCh() + } else { + f.logger.Debug("filter already removed from timeout heap", "id", id) } return true @@ -831,7 +826,7 @@ func (t *timeHeapImpl) Pop() interface{} { n := len(old) item := old[n-1] old[n-1] = nil - item.heapIndex = -1 + item.heapIndex = NoIndexInHeap // pop out and set it to not in heap *t = old[0 : n-1] return item diff --git a/txpool/event_subscription_test.go b/txpool/event_subscription_test.go index 5b2ad675f2..52510af4b2 100644 --- a/txpool/event_subscription_test.go +++ b/txpool/event_subscription_test.go @@ -42,6 +42,7 @@ func shuffleTxPoolEvents( randomEventType := func(supported bool) proto.EventType { for { + //nolint:gosec randNum, _ := rand.Int(rand.Reader, big.NewInt(int64(len(supportedTypes)))) randType := allEvents[randNum.Int64()]