From 0d69a696250b4e85fb16ec83eae315105bc57124 Mon Sep 17 00:00:00 2001 From: yihuang Date: Sat, 26 Feb 2022 03:08:30 +0800 Subject: [PATCH] fix: websocket client duplicated messages (#955) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Problem: websocket client get duplicated messages Closes: #954 Solution: - localize the subscription management within current connection * changelog * fix linter * fix test building Co-authored-by: Federico Kunze Küllmer <31522760+fedekunze@users.noreply.github.com> --- CHANGELOG.md | 9 +- .../namespaces/eth/filters/filter_system.go | 34 +- rpc/ethereum/pubsub/pubsub.go | 39 +- rpc/ethereum/pubsub/pubsub_test.go | 6 +- rpc/websockets.go | 351 ++++++------------ 5 files changed, 172 insertions(+), 267 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9aea2047fd..5dc0dda693 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,11 +38,16 @@ Ref: https://keepachangelog.com/en/1.0.0/ ## Unreleased + ### Improvements -* (log) [#948](https://github.com/tharsis/ethermint/pull/948) redirect go-ethereum's logs to cosmos-sdk logger. -* (rpc) [tharsis#953](https://github.com/tharsis/ethermint/pull/953) Add `eth_signTypedData` api support. +* (log) [#948](https://github.com/tharsis/ethermint/pull/948) redirect go-ethereum's logs to Cosmos SDK logger. +### Bug Fixes + +* (rpc) [#955](https://github.com/tharsis/ethermint/pull/955) Fix websocket server push duplicated messages to subscriber. +* (rpc) [tharsis#953](https://github.com/tharsis/ethermint/pull/953) Add `eth_signTypedData` api support. +* (log) [#948](https://github.com/tharsis/ethermint/pull/948) redirect go-ethereum's logs to cosmos-sdk logger. ## [v0.10.0-beta1] - 2022-02-15 diff --git a/rpc/ethereum/namespaces/eth/filters/filter_system.go b/rpc/ethereum/namespaces/eth/filters/filter_system.go index 067d783e0b..02685ca80e 100644 --- a/rpc/ethereum/namespaces/eth/filters/filter_system.go +++ b/rpc/ethereum/namespaces/eth/filters/filter_system.go @@ -90,62 +90,62 @@ func (es *EventSystem) WithContext(ctx context.Context) { // subscribe performs a new event subscription to a given Tendermint event. // The subscription creates a unidirectional receive event channel to receive the ResultEvent. -func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, context.CancelFunc, error) { +func (es *EventSystem) subscribe(sub *Subscription) (*Subscription, pubsub.UnsubscribeFunc, error) { var ( err error cancelFn context.CancelFunc ) - es.ctx, cancelFn = context.WithCancel(context.Background()) + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() existingSubs := es.eventBus.Topics() for _, topic := range existingSubs { if topic == sub.event { - eventCh, err := es.eventBus.Subscribe(sub.event) + eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event) if err != nil { err := errors.Wrapf(err, "failed to subscribe to topic: %s", sub.event) - return nil, cancelFn, err + return nil, nil, err } sub.eventCh = eventCh - return sub, cancelFn, nil + return sub, unsubFn, nil } } switch sub.typ { case filters.LogsSubscription: - err = es.tmWSClient.Subscribe(es.ctx, sub.event) + err = es.tmWSClient.Subscribe(ctx, sub.event) case filters.BlocksSubscription: - err = es.tmWSClient.Subscribe(es.ctx, sub.event) + err = es.tmWSClient.Subscribe(ctx, sub.event) case filters.PendingTransactionsSubscription: - err = es.tmWSClient.Subscribe(es.ctx, sub.event) + err = es.tmWSClient.Subscribe(ctx, sub.event) default: err = fmt.Errorf("invalid filter subscription type %d", sub.typ) } if err != nil { sub.err <- err - return nil, cancelFn, err + return nil, nil, err } // wrap events in a go routine to prevent blocking es.install <- sub <-sub.installed - eventCh, err := es.eventBus.Subscribe(sub.event) + eventCh, unsubFn, err := es.eventBus.Subscribe(sub.event) if err != nil { - err := errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event) - return sub, cancelFn, err + return nil, nil, errors.Wrapf(err, "failed to subscribe to topic after installed: %s", sub.event) } sub.eventCh = eventCh - return sub, cancelFn, nil + return sub, unsubFn, nil } // SubscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. Default value for the from and to // block is "latest". If the fromBlock > toBlock an error is returned. -func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) { +func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) { var from, to rpc.BlockNumber if crit.FromBlock == nil { from = rpc.LatestBlockNumber @@ -173,7 +173,7 @@ func (es *EventSystem) SubscribeLogs(crit filters.FilterCriteria) (*Subscription // subscribeLogs creates a subscription that will write all logs matching the // given criteria to the given logs channel. -func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, context.CancelFunc, error) { +func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription, pubsub.UnsubscribeFunc, error) { sub := &Subscription{ id: rpc.NewID(), typ: filters.LogsSubscription, @@ -188,7 +188,7 @@ func (es *EventSystem) subscribeLogs(crit filters.FilterCriteria) (*Subscription } // SubscribeNewHeads subscribes to new block headers events. -func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, error) { +func (es EventSystem) SubscribeNewHeads() (*Subscription, pubsub.UnsubscribeFunc, error) { sub := &Subscription{ id: rpc.NewID(), typ: filters.BlocksSubscription, @@ -202,7 +202,7 @@ func (es EventSystem) SubscribeNewHeads() (*Subscription, context.CancelFunc, er } // SubscribePendingTxs subscribes to new pending transactions events from the mempool. -func (es EventSystem) SubscribePendingTxs() (*Subscription, context.CancelFunc, error) { +func (es EventSystem) SubscribePendingTxs() (*Subscription, pubsub.UnsubscribeFunc, error) { sub := &Subscription{ id: rpc.NewID(), typ: filters.PendingTransactionsSubscription, diff --git a/rpc/ethereum/pubsub/pubsub.go b/rpc/ethereum/pubsub/pubsub.go index 8ecf5ca75b..4ac7f34cb5 100644 --- a/rpc/ethereum/pubsub/pubsub.go +++ b/rpc/ethereum/pubsub/pubsub.go @@ -2,35 +2,43 @@ package pubsub import ( "sync" + "sync/atomic" "github.com/pkg/errors" coretypes "github.com/tendermint/tendermint/rpc/core/types" ) +type UnsubscribeFunc func() + type EventBus interface { AddTopic(name string, src <-chan coretypes.ResultEvent) error RemoveTopic(name string) - Subscribe(name string) (<-chan coretypes.ResultEvent, error) + Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) Topics() []string } type memEventBus struct { - topics map[string]<-chan coretypes.ResultEvent - topicsMux *sync.RWMutex - subscribers map[string][]chan<- coretypes.ResultEvent - subscribersMux *sync.RWMutex + topics map[string]<-chan coretypes.ResultEvent + topicsMux *sync.RWMutex + subscribers map[string]map[uint64]chan<- coretypes.ResultEvent + subscribersMux *sync.RWMutex + currentUniqueID uint64 } func NewEventBus() EventBus { return &memEventBus{ topics: make(map[string]<-chan coretypes.ResultEvent), topicsMux: new(sync.RWMutex), - subscribers: make(map[string][]chan<- coretypes.ResultEvent), + subscribers: make(map[string]map[uint64]chan<- coretypes.ResultEvent), subscribersMux: new(sync.RWMutex), } } +func (m *memEventBus) GenUniqueID() uint64 { + return atomic.AddUint64(&m.currentUniqueID, 1) +} + func (m *memEventBus) Topics() (topics []string) { m.topicsMux.RLock() defer m.topicsMux.RUnlock() @@ -67,21 +75,32 @@ func (m *memEventBus) RemoveTopic(name string) { m.topicsMux.Unlock() } -func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, error) { +func (m *memEventBus) Subscribe(name string) (<-chan coretypes.ResultEvent, UnsubscribeFunc, error) { m.topicsMux.RLock() _, ok := m.topics[name] m.topicsMux.RUnlock() if !ok { - return nil, errors.Errorf("topic not found: %s", name) + return nil, nil, errors.Errorf("topic not found: %s", name) } ch := make(chan coretypes.ResultEvent) m.subscribersMux.Lock() defer m.subscribersMux.Unlock() - m.subscribers[name] = append(m.subscribers[name], ch) - return ch, nil + id := m.GenUniqueID() + if _, ok := m.subscribers[name]; !ok { + m.subscribers[name] = make(map[uint64]chan<- coretypes.ResultEvent) + } + m.subscribers[name][id] = ch + + unsubscribe := func() { + m.subscribersMux.Lock() + defer m.subscribersMux.Unlock() + delete(m.subscribers[name], id) + } + + return ch, unsubscribe, nil } func (m *memEventBus) publishTopic(name string, src <-chan coretypes.ResultEvent) { diff --git a/rpc/ethereum/pubsub/pubsub_test.go b/rpc/ethereum/pubsub/pubsub_test.go index fbaa0ef291..9f5fe8d2e3 100644 --- a/rpc/ethereum/pubsub/pubsub_test.go +++ b/rpc/ethereum/pubsub/pubsub_test.go @@ -37,13 +37,13 @@ func TestSubscribe(t *testing.T) { q.AddTopic("lol", lolSrc) - kekSubC, err := q.Subscribe("kek") + kekSubC, _, err := q.Subscribe("kek") require.NoError(t, err) - lolSubC, err := q.Subscribe("lol") + lolSubC, _, err := q.Subscribe("lol") require.NoError(t, err) - lol2SubC, err := q.Subscribe("lol") + lol2SubC, _, err := q.Subscribe("lol") require.NoError(t, err) wg := new(sync.WaitGroup) diff --git a/rpc/websockets.go b/rpc/websockets.go index 76b5c489c7..c23f59446a 100644 --- a/rpc/websockets.go +++ b/rpc/websockets.go @@ -23,11 +23,11 @@ import ( "github.com/ethereum/go-ethereum/rpc" "github.com/tendermint/tendermint/libs/log" - coretypes "github.com/tendermint/tendermint/rpc/core/types" rpcclient "github.com/tendermint/tendermint/rpc/jsonrpc/client" tmtypes "github.com/tendermint/tendermint/types" rpcfilters "github.com/tharsis/ethermint/rpc/ethereum/namespaces/eth/filters" + "github.com/tharsis/ethermint/rpc/ethereum/pubsub" "github.com/tharsis/ethermint/rpc/ethereum/types" "github.com/tharsis/ethermint/server/config" evmtypes "github.com/tharsis/ethermint/x/evm/types" @@ -168,6 +168,15 @@ func (w *wsConn) ReadMessage() (messageType int, p []byte, err error) { } func (s *websocketsServer) readLoop(wsConn *wsConn) { + // subscriptions of current connection + subscriptions := make(map[rpc.ID]pubsub.UnsubscribeFunc) + defer func() { + // cancel all subscriptions when connection closed + for _, unsubFn := range subscriptions { + unsubFn() + } + }() + for { _, mb, err := wsConn.ReadMessage() if err != nil { @@ -195,57 +204,64 @@ func (s *websocketsServer) readLoop(wsConn *wsConn) { } connID := msg["id"].(float64) - if method == "eth_subscribe" { + switch method { + case "eth_subscribe": params := msg["params"].([]interface{}) if len(params) == 0 { s.sendErrResponse(wsConn, "invalid parameters") continue } - id, err := s.api.subscribe(wsConn, params) + subID := rpc.NewID() + unsubFn, err := s.api.subscribe(wsConn, subID, params) if err != nil { s.sendErrResponse(wsConn, err.Error()) continue } + subscriptions[subID] = unsubFn res := &SubscriptionResponseJSON{ Jsonrpc: "2.0", ID: connID, - Result: id, + Result: subID, } - err = wsConn.WriteJSON(res) - if err != nil { + if err := wsConn.WriteJSON(res); err != nil { + break + } + case "eth_unsubscribe": + params, ok := msg["params"].([]interface{}) + if !ok { + s.sendErrResponse(wsConn, "invalid parameters") continue } - - continue - } else if method == "eth_unsubscribe" { - ids, ok := msg["params"].([]interface{}) - if _, idok := ids[0].(string); !ok || !idok { + id, ok := params[0].(string) + if !ok { s.sendErrResponse(wsConn, "invalid parameters") continue } - ok = s.api.unsubscribe(rpc.ID(ids[0].(string))) + subID := rpc.ID(id) + unsubFn, ok := subscriptions[subID] + if ok { + delete(subscriptions, subID) + unsubFn() + } res := &SubscriptionResponseJSON{ Jsonrpc: "2.0", ID: connID, Result: ok, } - err = wsConn.WriteJSON(res) + if err := wsConn.WriteJSON(res); err != nil { + break + } + default: + // otherwise, call the usual rpc server to respond + err = s.tcpGetAndSendResponse(wsConn, mb) if err != nil { - continue + s.sendErrResponse(wsConn, err.Error()) } - - continue - } - - // otherwise, call the usual rpc server to respond - err = s.tcpGetAndSendResponse(wsConn, mb) - if err != nil { - s.sendErrResponse(wsConn, err.Error()) } } } @@ -281,18 +297,9 @@ func (s *websocketsServer) tcpGetAndSendResponse(wsConn *wsConn, mb []byte) erro return wsConn.WriteJSON(wsSend) } -type wsSubscription struct { - sub *rpcfilters.Subscription - unsubscribed chan struct{} // closed when unsubscribing - wsConn *wsConn - query string -} - // pubSubAPI is the eth_ prefixed set of APIs in the Web3 JSON-RPC spec type pubSubAPI struct { events *rpcfilters.EventSystem - filtersMu *sync.RWMutex - filters map[rpc.ID]*wsSubscription logger log.Logger clientCtx client.Context } @@ -302,80 +309,51 @@ func newPubSubAPI(clientCtx client.Context, logger log.Logger, tmWSClient *rpccl logger = logger.With("module", "websocket-client") return &pubSubAPI{ events: rpcfilters.NewEventSystem(logger, tmWSClient), - filtersMu: new(sync.RWMutex), - filters: make(map[rpc.ID]*wsSubscription), logger: logger, clientCtx: clientCtx, } } -func (api *pubSubAPI) subscribe(wsConn *wsConn, params []interface{}) (rpc.ID, error) { +func (api *pubSubAPI) subscribe(wsConn *wsConn, subID rpc.ID, params []interface{}) (pubsub.UnsubscribeFunc, error) { method, ok := params[0].(string) if !ok { - return "0", errors.New("invalid parameters") + return nil, errors.New("invalid parameters") } switch method { case "newHeads": // TODO: handle extra params - return api.subscribeNewHeads(wsConn) + return api.subscribeNewHeads(wsConn, subID) case "logs": if len(params) > 1 { - return api.subscribeLogs(wsConn, params[1]) + return api.subscribeLogs(wsConn, subID, params[1]) } - return api.subscribeLogs(wsConn, nil) + return api.subscribeLogs(wsConn, subID, nil) case "newPendingTransactions": - return api.subscribePendingTransactions(wsConn) + return api.subscribePendingTransactions(wsConn, subID) case "syncing": - return api.subscribeSyncing(wsConn) + return api.subscribeSyncing(wsConn, subID) default: - return "0", errors.Errorf("unsupported method %s", method) - } -} - -func (api *pubSubAPI) unsubscribe(id rpc.ID) bool { - api.filtersMu.Lock() - defer api.filtersMu.Unlock() - - wsSub, ok := api.filters[id] - if !ok { - return false + return nil, errors.Errorf("unsupported method %s", method) } - - wsSub.sub.Unsubscribe(api.events) - close(api.filters[id].unsubscribed) - delete(api.filters, id) - return true } -func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn) (rpc.ID, error) { - query := "subscribeNewHeads" - subID := rpc.NewID() - - sub, _, err := api.events.SubscribeNewHeads() +func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn, subID rpc.ID) (pubsub.UnsubscribeFunc, error) { + sub, unsubFn, err := api.events.SubscribeNewHeads() if err != nil { - return "", errors.Wrap(err, "error creating block filter") + return nil, errors.Wrap(err, "error creating block filter") } // TODO: use events baseFee := big.NewInt(params.InitialBaseFee) - unsubscribed := make(chan struct{}) - api.filtersMu.Lock() - api.filters[subID] = &wsSubscription{ - sub: sub, - wsConn: wsConn, - unsubscribed: unsubscribed, - query: query, - } - api.filtersMu.Unlock() - - go func(headersCh <-chan coretypes.ResultEvent, errCh <-chan error) { + go func() { + headersCh := sub.Event() + errCh := sub.Err() for { select { case event, ok := <-headersCh: if !ok { - api.unsubscribe(subID) return } @@ -387,59 +365,36 @@ func (api *pubSubAPI) subscribeNewHeads(wsConn *wsConn) (rpc.ID, error) { header := types.EthHeaderFromTendermint(data.Header, ethtypes.Bloom{}, baseFee) - api.filtersMu.RLock() - for subID, wsSub := range api.filters { - subID := subID - wsSub := wsSub - if wsSub.query != query { - continue - } - // write to ws conn - res := &SubscriptionNotification{ - Jsonrpc: "2.0", - Method: "eth_subscription", - Params: &SubscriptionResult{ - Subscription: subID, - Result: header, - }, - } - - err = wsSub.wsConn.WriteJSON(res) - if err != nil { - api.logger.Error("error writing header, will drop peer", "error", err.Error()) - - try(func() { - api.filtersMu.RUnlock() - api.filtersMu.Lock() - defer func() { - api.filtersMu.Unlock() - api.filtersMu.RLock() - }() + // write to ws conn + res := &SubscriptionNotification{ + Jsonrpc: "2.0", + Method: "eth_subscription", + Params: &SubscriptionResult{ + Subscription: subID, + Result: header, + }, + } - if err != websocket.ErrCloseSent { - _ = wsSub.wsConn.Close() - } + err = wsConn.WriteJSON(res) + if err != nil { + api.logger.Error("error writing header, will drop peer", "error", err.Error()) - delete(api.filters, subID) - close(wsSub.unsubscribed) - }, api.logger, "closing websocket peer sub") - } + try(func() { + if err != websocket.ErrCloseSent { + _ = wsConn.Close() + } + }, api.logger, "closing websocket peer sub") } - api.filtersMu.RUnlock() case err, ok := <-errCh: if !ok { - api.unsubscribe(subID) return } api.logger.Debug("dropping NewHeads WebSocket subscription", "subscription-id", subID, "error", err.Error()) - api.unsubscribe(subID) - case <-unsubscribed: - return } } - }(sub.Event(), sub.Err()) + }() - return subID, nil + return unsubFn, nil } func try(fn func(), l log.Logger, desc string) { @@ -459,7 +414,7 @@ func try(fn func(), l log.Logger, desc string) { fn() } -func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID, error) { +func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, subID rpc.ID, extra interface{}) (pubsub.UnsubscribeFunc, error) { crit := filters.FilterCriteria{} if extra != nil { @@ -467,7 +422,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID, if !ok { err := errors.New("invalid criteria") api.logger.Debug("invalid criteria", "type", fmt.Sprintf("%T", extra)) - return "", err + return nil, err } if params["address"] != nil { @@ -476,7 +431,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID, if !ok && !sok { err := errors.New("invalid addresses; must be address or array of addresses") api.logger.Debug("invalid addresses", "type", fmt.Sprintf("%T", params["address"])) - return "", err + return nil, err } if ok { @@ -490,7 +445,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID, if !ok { err := errors.New("invalid address") api.logger.Debug("invalid address", "type", fmt.Sprintf("%T", addr)) - return "", err + return nil, err } crit.Addresses = append(crit.Addresses, common.HexToAddress(address)) @@ -503,7 +458,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID, if !ok { err := errors.Errorf("invalid topics: %s", topics) api.logger.Error("invalid topics", "type", fmt.Sprintf("%T", topics)) - return "", err + return nil, err } crit.Topics = make([][]common.Hash, len(topics)) @@ -528,7 +483,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID, // in case we don't have list, but a single topic value if topic, ok := subtopics.(string); ok { if err := addCritTopic(topicIdx, topic); err != nil { - return "", err + return nil, err } continue @@ -539,7 +494,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID, if !ok { err := errors.New("invalid subtopics") api.logger.Error("invalid subtopic", "type", fmt.Sprintf("%T", subtopics)) - return "", err + return nil, err } subtopicsCollect := make([]common.Hash, len(subtopicsList)) @@ -548,7 +503,7 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID, if !ok { err := errors.Errorf("invalid subtopic: %s", subtopic) api.logger.Error("invalid subtopic", "type", fmt.Sprintf("%T", subtopic)) - return "", err + return nil, err } subtopicsCollect[idx] = common.HexToHash(tstr) @@ -559,37 +514,19 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID, } } - critBz, err := json.Marshal(crit) - if err != nil { - api.logger.Error("failed to JSON marshal criteria", "error", err.Error()) - return rpc.ID(""), err - } - - query := "subscribeLogs" + string(critBz) - subID := rpc.NewID() - - sub, _, err := api.events.SubscribeLogs(crit) + sub, unsubFn, err := api.events.SubscribeLogs(crit) if err != nil { api.logger.Error("failed to subscribe logs", "error", err.Error()) - return rpc.ID(""), err - } - - unsubscribed := make(chan struct{}) - api.filtersMu.Lock() - api.filters[subID] = &wsSubscription{ - sub: sub, - wsConn: wsConn, - unsubscribed: unsubscribed, - query: query, + return nil, err } - api.filtersMu.Unlock() - go func(ch <-chan coretypes.ResultEvent, errCh <-chan error, subID rpc.ID) { + go func() { + ch := sub.Event() + errCh := sub.Err() for { select { case event, ok := <-ch: if !ok { - api.unsubscribe(subID) return } @@ -610,14 +547,6 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID, continue } - api.filtersMu.RLock() - wsSub, ok := api.filters[subID] - if !ok { - api.logger.Debug("subID not in filters", subID) - return - } - api.filtersMu.RUnlock() - for _, ethLog := range logs { res := &SubscriptionNotification{ Jsonrpc: "2.0", @@ -628,57 +557,36 @@ func (api *pubSubAPI) subscribeLogs(wsConn *wsConn, extra interface{}) (rpc.ID, }, } - err = wsSub.wsConn.WriteJSON(res) + err = wsConn.WriteJSON(res) if err != nil { try(func() { - api.filtersMu.Lock() - defer api.filtersMu.Unlock() - if err != websocket.ErrCloseSent { - _ = wsSub.wsConn.Close() + _ = wsConn.Close() } - - delete(api.filters, subID) - close(wsSub.unsubscribed) }, api.logger, "closing websocket peer sub") } } case err, ok := <-errCh: if !ok { - api.unsubscribe(subID) return } api.logger.Debug("dropping Logs WebSocket subscription", "subscription-id", subID, "error", err.Error()) - api.unsubscribe(subID) - case <-unsubscribed: - return } } - }(sub.Event(), sub.Err(), subID) + }() - return subID, nil + return unsubFn, nil } -func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn) (rpc.ID, error) { - query := "subscribePendingTransactions" - subID := rpc.NewID() - - sub, _, err := api.events.SubscribePendingTxs() +func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn, subID rpc.ID) (pubsub.UnsubscribeFunc, error) { + sub, unsubFn, err := api.events.SubscribePendingTxs() if err != nil { - return "", errors.Wrap(err, "error creating block filter: %s") - } - - unsubscribed := make(chan struct{}) - api.filtersMu.Lock() - api.filters[subID] = &wsSubscription{ - sub: sub, - wsConn: wsConn, - unsubscribed: unsubscribed, - query: query, + return nil, errors.Wrap(err, "error creating block filter: %s") } - api.filtersMu.Unlock() - go func(txsCh <-chan coretypes.ResultEvent, errCh <-chan error) { + go func() { + txsCh := sub.Event() + errCh := sub.Err() for { select { case ev := <-txsCh: @@ -689,67 +597,40 @@ func (api *pubSubAPI) subscribePendingTransactions(wsConn *wsConn) (rpc.ID, erro continue } - api.filtersMu.RLock() for _, ethTx := range ethTxs { - for subID, wsSub := range api.filters { - subID := subID - wsSub := wsSub - if wsSub.query != query { - continue - } - // write to ws conn - res := &SubscriptionNotification{ - Jsonrpc: "2.0", - Method: "eth_subscription", - Params: &SubscriptionResult{ - Subscription: subID, - Result: ethTx.Hash, - }, - } + // write to ws conn + res := &SubscriptionNotification{ + Jsonrpc: "2.0", + Method: "eth_subscription", + Params: &SubscriptionResult{ + Subscription: subID, + Result: ethTx.Hash, + }, + } - err = wsSub.wsConn.WriteJSON(res) - if err != nil { - api.logger.Debug("error writing header, will drop peer", "error", err.Error()) - - try(func() { - // Release the initial read lock in .RUnlock() before - // invoking .Lock() to avoid the deadlock in - // https://github.com/tharsis/ethermint/issues/821#issuecomment-1033959984 - // and as documented at https://pkg.go.dev/sync#RWMutex - api.filtersMu.RUnlock() - api.filtersMu.Lock() - defer func() { - api.filtersMu.Unlock() - api.filtersMu.RLock() - }() - - if err != websocket.ErrCloseSent { - _ = wsSub.wsConn.Close() - } - - delete(api.filters, subID) - close(wsSub.unsubscribed) - }, api.logger, "closing websocket peer sub") - } + err = wsConn.WriteJSON(res) + if err != nil { + api.logger.Debug("error writing header, will drop peer", "error", err.Error()) + + try(func() { + if err != websocket.ErrCloseSent { + _ = wsConn.Close() + } + }, api.logger, "closing websocket peer sub") } } - api.filtersMu.RUnlock() case err, ok := <-errCh: if !ok { - api.unsubscribe(subID) return } api.logger.Debug("dropping PendingTransactions WebSocket subscription", subID, "error", err.Error()) - api.unsubscribe(subID) - case <-unsubscribed: - return } } - }(sub.Event(), sub.Err()) + }() - return subID, nil + return unsubFn, nil } -func (api *pubSubAPI) subscribeSyncing(wsConn *wsConn) (rpc.ID, error) { - return "", nil +func (api *pubSubAPI) subscribeSyncing(wsConn *wsConn, subID rpc.ID) (pubsub.UnsubscribeFunc, error) { + return nil, errors.New("syncing subscription is not implemented") }