Skip to content

Commit

Permalink
Problem: websocket client get duplicated messages
Browse files Browse the repository at this point in the history
Closes: evmos#954
Solution:
- localize the subscription management within current connection
  • Loading branch information
yihuang committed Feb 24, 2022
1 parent 0f5b1aa commit ea06a30
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 262 deletions.
34 changes: 17 additions & 17 deletions rpc/ethereum/namespaces/eth/filters/filter_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,62 +90,62 @@ func (es *EventSystem) WithContext(ctx context.Context) {

// subscribe performs a new event subscription to a given Tendermint event.
// The subscription creates a unidirectional receive event channel to receive the ResultEvent.
func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, context.CancelFunc, error) {
func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, pubsub.UnsubscribeFunc, error) {
var (
err error
cancelFn context.CancelFunc
)

es.ctx, cancelFn = context.WithCancel(context.Background())
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()

existingSubs := es.eventBus.Topics()
for _, topic := range existingSubs {
if topic == sub.event {
eventCh, err := es.eventBus.Subscribe(sub.event)
eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event)
if err != nil {
err := errors.Wrapf(err, "failed to subscribe to topic: %s", sub.event)
return nil, cancelFn, err
return nil, nil, err
}

sub.eventCh = eventCh
return sub, cancelFn, nil
return sub, unsubFn, nil
}
}

switch sub.typ {
case filters.LogsSubscription:
err = es.tmWSClient.Subscribe(es.ctx, sub.event)
err = es.tmWSClient.Subscribe(ctx, sub.event)
case filters.BlocksSubscription:
err = es.tmWSClient.Subscribe(es.ctx, sub.event)
err = es.tmWSClient.Subscribe(ctx, sub.event)
case filters.PendingTransactionsSubscription:
err = es.tmWSClient.Subscribe(es.ctx, sub.event)
err = es.tmWSClient.Subscribe(ctx, sub.event)
default:
err = fmt.Errorf("invalid filter subscription type %d", sub.typ)
}

if err != nil {
sub.err <- err
return nil, cancelFn, err
return nil, nil, err
}

// wrap events in a go routine to prevent blocking
es.install <- sub
<-sub.installed

eventCh, err := es.eventBus.Subscribe(sub.event)
eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event)
if err != nil {
err := errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event)
return sub, cancelFn, err
return nil, nil, errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event)
}

sub.eventCh = eventCh
return sub, cancelFn, nil
return sub, unsubFn, nil
}

// SubscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel. Default value for the from and to
// block is "latest". If the fromBlock > toBlock an error is returned.
func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) {
var from, to rpc.BlockNumber
if crit.FromBlock == nil {
from = rpc.LatestBlockNumber
Expand Down Expand Up @@ -173,7 +173,7 @@ func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription

// subscribeLogs creates a subscription that will write all logs matching the
// given criteria to the given logs channel.
func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) {
func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.LogsSubscription,
Expand All @@ -188,7 +188,7 @@ func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription
}

// SubscribeNewHeads subscribes to new block headers events.
func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, error) {
func (es EventSystem) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.BlocksSubscription,
Expand All @@ -202,7 +202,7 @@ func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, er
}

// SubscribePendingTxs subscribes to new pending transactions events from the mempool.
func (es EventSystem) SubscribePendingTxs() (*Subscription, context.CancelFunc, error) {
func (es EventSystem) SubscribePendingTxs() (*Subscription, pubsub.UnsubscribeFunc, error) {
sub := &Subscription{
id: rpc.NewID(),
typ: filters.PendingTransactionsSubscription,
Expand Down
41 changes: 30 additions & 11 deletions rpc/ethereum/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,43 @@ package pubsub

import (
"sync"
"sync/atomic"

"github.com/pkg/errors"

coretypes "github.com/tendermint/tendermint/rpc/core/types"
)

type UnsubscribeFunc func()

type EventBus interface {
AddTopic(name string, src <-chan coretypes.ResultEvent) error
RemoveTopic(name string)
Subscribe(name string) (<-chan coretypes.ResultEvent, error)
Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error)
Topics() []string
}

type memEventBus struct {
topics map[string]<-chan coretypes.ResultEvent
topicsMux *sync.RWMutex
subscribers map[string][]chan<- coretypes.ResultEvent
subscribersMux *sync.RWMutex
topics map[string]<-chan coretypes.ResultEvent
topicsMux *sync.RWMutex
subscribers map[string]map[uint64]chan<- coretypes.ResultEvent
subscribersMux *sync.RWMutex
currentUniqueID uint64
}

func NewEventBus() EventBus {
return &memEventBus{
topics: make(map[string]<-chan coretypes.ResultEvent),
topicsMux: new(sync.RWMutex),
subscribers: make(map[string][]chan<- coretypes.ResultEvent),
subscribers: make(map[string]map[uint64]chan<- coretypes.ResultEvent),
subscribersMux: new(sync.RWMutex),
}
}

func (m *memEventBus) GenUniqueID() uint64 {
return atomic.AddUint64(&m.currentUniqueID, 1)
}

func (m *memEventBus) Topics() (topics []string) {
m.topicsMux.RLock()
defer m.topicsMux.RUnlock()
Expand Down Expand Up @@ -67,21 +75,32 @@ func (m *memEventBus) RemoveTopic(name string) {
m.topicsMux.Unlock()
}

func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, error) {
func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) {
m.topicsMux.RLock()
_, ok := m.topics[name]
m.topicsMux.RUnlock()

if !ok {
return nil, errors.Errorf("topic not found: %s", name)
return nil, nil, errors.Errorf("topic not found: %s", name)
}

ch := make(chan coretypes.ResultEvent)
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
m.subscribers[name] = append(m.subscribers[name], ch)

return ch, nil
id := m.GenUniqueID()
if _, ok := m.subscribers[name]; !ok {
m.subscribers[name] = make(map[uint64]chan<- coretypes.ResultEvent)
}
m.subscribers[name][id] = ch

unsubscribe := func() {
m.subscribersMux.Lock()
defer m.subscribersMux.Unlock()
delete(m.subscribers[name], id)
}

return ch, unsubscribe, nil
}

func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) {
Expand Down Expand Up @@ -115,7 +134,7 @@ func (m *memEventBus) publishAllSubscribers(name string, msg coretypes.ResultEve
subsribers := m.subscribers[name]
m.subscribersMux.RUnlock()

for _, sub := range subsribers {
for id, sub := range subsribers {
select {
case sub <- msg:
default:
Expand Down
Loading

0 comments on commit ea06a30

Please sign in to comment.