From 7e210bcc259b2f7bf57598260aaee072a0f7ced8 Mon Sep 17 00:00:00 2001 From: ucwong Date: Fri, 5 Jan 2024 12:49:31 +0000 Subject: [PATCH] eth/downloader, eth/filters: use defer to call Unsubscribe (#28762) --- eth/downloader/api.go | 3 +-- eth/filters/api.go | 10 ++++------ 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/eth/downloader/api.go b/eth/downloader/api.go index b3f7113bcde9..606c6d4e7ec1 100644 --- a/eth/downloader/api.go +++ b/eth/downloader/api.go @@ -101,16 +101,15 @@ func (api *DownloaderAPI) Syncing(ctx context.Context) (*rpc.Subscription, error go func() { statuses := make(chan interface{}) sub := api.SubscribeSyncStatus(statuses) + defer sub.Unsubscribe() for { select { case status := <-statuses: notifier.Notify(rpcSub.ID, status) case <-rpcSub.Err(): - sub.Unsubscribe() return case <-notifier.Closed(): - sub.Unsubscribe() return } } diff --git a/eth/filters/api.go b/eth/filters/api.go index a4eaa9cec805..5dc59d01cd71 100644 --- a/eth/filters/api.go +++ b/eth/filters/api.go @@ -159,6 +159,8 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) go func() { txs := make(chan []*types.Transaction, 128) pendingTxSub := api.events.SubscribePendingTxs(txs) + defer pendingTxSub.Unsubscribe() + chainConfig := api.sys.backend.ChainConfig() for { @@ -176,10 +178,8 @@ func (api *FilterAPI) NewPendingTransactions(ctx context.Context, fullTx *bool) } } case <-rpcSub.Err(): - pendingTxSub.Unsubscribe() return case <-notifier.Closed(): - pendingTxSub.Unsubscribe() return } } @@ -233,16 +233,15 @@ func (api *FilterAPI) NewHeads(ctx context.Context) (*rpc.Subscription, error) { go func() { headers := make(chan *types.Header) headersSub := api.events.SubscribeNewHeads(headers) + defer headersSub.Unsubscribe() for { select { case h := <-headers: notifier.Notify(rpcSub.ID, h) case <-rpcSub.Err(): - headersSub.Unsubscribe() return case <-notifier.Closed(): - headersSub.Unsubscribe() return } } @@ -267,6 +266,7 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc if err != nil { return nil, err } + defer logsSub.Unsubscribe() go func() { for { @@ -277,10 +277,8 @@ func (api *FilterAPI) Logs(ctx context.Context, crit FilterCriteria) (*rpc.Subsc notifier.Notify(rpcSub.ID, &log) } case <-rpcSub.Err(): // client send an unsubscribe request - logsSub.Unsubscribe() return case <-notifier.Closed(): // connection dropped - logsSub.Unsubscribe() return } }