From 1bdc5585248c9c77b82473ee2d05a4cd6e25db19 Mon Sep 17 00:00:00 2001 From: Hannah Howard Date: Thu, 17 Dec 2020 10:21:23 -0800 Subject: [PATCH] fix(responsemanager): fix network error propogation (#133) fix various issues causing network errors not to propogate in many cases --- messagequeue/messagequeue.go | 8 ++++++++ notifications/data_subscriber.go | 7 ++++++- peermanager/peermanager.go | 12 +++++++++++- .../peerresponsemanager/peerresponsesender.go | 16 ++++++++++++++++ responsemanager/queryexecutor.go | 12 +++++------- 5 files changed, 46 insertions(+), 9 deletions(-) diff --git a/messagequeue/messagequeue.go b/messagequeue/messagequeue.go index 555b91b0..264c6c78 100644 --- a/messagequeue/messagequeue.go +++ b/messagequeue/messagequeue.go @@ -118,6 +118,14 @@ func (mq *MessageQueue) runQueue() { case <-mq.outgoingWork: mq.sendMessage() case <-mq.done: + select { + case <-mq.outgoingWork: + message, topic := mq.extractOutgoingMessage() + if message != nil || !message.Empty() { + mq.eventPublisher.Publish(topic, Event{Name: Error, Err: fmt.Errorf("message queue shutdown")}) + } + default: + } if mq.sender != nil { mq.sender.Close() } diff --git a/notifications/data_subscriber.go b/notifications/data_subscriber.go index d8772142..18db2936 100644 --- a/notifications/data_subscriber.go +++ b/notifications/data_subscriber.go @@ -1,6 +1,8 @@ package notifications -import "sync" +import ( + "sync" +) type TopicDataSubscriber struct { idMapLk sync.RWMutex @@ -48,4 +50,7 @@ func (m *TopicDataSubscriber) OnClose(topic Topic) { for _, data := range m.getData(topic) { m.Subscriber.OnClose(data) } + m.idMapLk.Lock() + delete(m.data, topic) + m.idMapLk.Unlock() } diff --git a/peermanager/peermanager.go b/peermanager/peermanager.go index 1098431f..4a736276 100644 --- a/peermanager/peermanager.go +++ b/peermanager/peermanager.go @@ -83,8 +83,18 @@ func (pm *PeerManager) Disconnected(p peer.ID) { // GetProcess returns the process for the given peer func (pm *PeerManager) GetProcess( p peer.ID) PeerProcess { + // Usually this this is just a read + pm.peerProcessesLk.RLock() + pqi, ok := pm.peerProcesses[p] + if ok { + pm.peerProcessesLk.RUnlock() + return pqi.process + } + pm.peerProcessesLk.RUnlock() + // but sometimes it involves a create (we still need to do get or create cause it's possible + // another writer grabbed the Lock first and made the process) pm.peerProcessesLk.Lock() - pqi := pm.getOrCreate(p) + pqi = pm.getOrCreate(p) pm.peerProcessesLk.Unlock() return pqi.process } diff --git a/responsemanager/peerresponsemanager/peerresponsesender.go b/responsemanager/peerresponsemanager/peerresponsesender.go index 79a71ce0..2f2ead6f 100644 --- a/responsemanager/peerresponsemanager/peerresponsesender.go +++ b/responsemanager/peerresponsemanager/peerresponsesender.go @@ -78,6 +78,7 @@ type peerResponseSender struct { subscriber *notifications.TopicDataSubscriber allocatorSubscriber *notifications.TopicDataSubscriber publisher notifications.Publisher + messagesSending sync.WaitGroup } // PeerResponseSender handles batching, deduping, and sending responses for @@ -442,6 +443,7 @@ func (prs *peerResponseSender) signalWork() { func (prs *peerResponseSender) run() { defer func() { + prs.messagesSending.Wait() prs.publisher.Shutdown() prs.allocator.ReleasePeerMemory(prs.p) }() @@ -449,6 +451,17 @@ func (prs *peerResponseSender) run() { for { select { case <-prs.ctx.Done(): + select { + case <-prs.outgoingWork: + prs.responseBuildersLk.Lock() + builders := prs.responseBuilders + prs.responseBuilders = nil + prs.responseBuildersLk.Unlock() + for _, builder := range builders { + prs.publisher.Publish(builder.Topic(), Event{Name: Error, Err: fmt.Errorf("queue shutdown")}) + } + default: + } return case <-prs.outgoingWork: prs.sendResponseMessages() @@ -475,6 +488,7 @@ func (prs *peerResponseSender) sendResponseMessages() { log.Errorf("Unable to assemble GraphSync response: %s", err.Error()) } + prs.messagesSending.Add(1) prs.peerHandler.SendResponse(prs.p, responses, blks, notifications.Notifee{ Data: builder.Topic(), Subscriber: prs.subscriber, @@ -514,8 +528,10 @@ func (s *subscriber) OnNext(topic notifications.Topic, event notifications.Event switch msgEvent.Name { case messagequeue.Sent: s.prs.publisher.Publish(builderTopic, Event{Name: Sent}) + s.prs.messagesSending.Done() case messagequeue.Error: s.prs.publisher.Publish(builderTopic, Event{Name: Error, Err: fmt.Errorf("error sending message: %w", msgEvent.Err)}) + s.prs.messagesSending.Done() case messagequeue.Queued: select { case s.prs.queuedMessages <- builderTopic: diff --git a/responsemanager/queryexecutor.go b/responsemanager/queryexecutor.go index f9d7f0ce..0f8b1b23 100644 --- a/responsemanager/queryexecutor.go +++ b/responsemanager/queryexecutor.go @@ -114,11 +114,10 @@ func (qe *queryExecutor) prepareQuery(ctx context.Context, p peer.ID, request gsmsg.GraphSyncRequest, signals signals, sub *notifications.TopicDataSubscriber) (ipld.Loader, ipldutil.Traverser, bool, error) { result := qe.requestHooks.ProcessRequestHooks(p, request) - peerResponseSender := qe.peerManager.SenderForPeer(p) var transactionError error var isPaused bool failNotifee := notifications.Notifee{Data: graphsync.RequestFailedUnknown, Subscriber: sub} - err := peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error { + err := qe.peerManager.SenderForPeer(p).Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error { for _, extension := range result.Extensions { transaction.SendExtensionData(extension) } @@ -138,10 +137,10 @@ func (qe *queryExecutor) prepareQuery(ctx context.Context, if transactionError != nil { return nil, nil, false, transactionError } - if err := qe.processDedupByKey(request, peerResponseSender, failNotifee); err != nil { + if err := qe.processDedupByKey(request, qe.peerManager.SenderForPeer(p), failNotifee); err != nil { return nil, nil, false, err } - if err := qe.processDoNoSendCids(request, peerResponseSender, failNotifee); err != nil { + if err := qe.processDoNoSendCids(request, qe.peerManager.SenderForPeer(p), failNotifee); err != nil { return nil, nil, false, err } rootLink := cidlink.Link{Cid: request.Root()} @@ -201,10 +200,9 @@ func (qe *queryExecutor) executeQuery( signals signals, sub *notifications.TopicDataSubscriber) (graphsync.ResponseStatusCode, error) { updateChan := make(chan []gsmsg.GraphSyncRequest) - peerResponseSender := qe.peerManager.SenderForPeer(p) err := runtraversal.RunTraversal(loader, traverser, func(link ipld.Link, data []byte) error { var err error - _ = peerResponseSender.Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error { + _ = qe.peerManager.SenderForPeer(p).Transaction(request.ID(), func(transaction peerresponsemanager.PeerResponseTransactionSender) error { err = qe.checkForUpdates(p, request, signals, updateChan, transaction) if _, ok := err.(hooks.ErrPaused); !ok && err != nil { return nil @@ -228,7 +226,7 @@ func (qe *queryExecutor) executeQuery( return err }) var code graphsync.ResponseStatusCode - _ = peerResponseSender.Transaction(request.ID(), func(peerResponseSender peerresponsemanager.PeerResponseTransactionSender) error { + _ = qe.peerManager.SenderForPeer(p).Transaction(request.ID(), func(peerResponseSender peerresponsemanager.PeerResponseTransactionSender) error { if err != nil { _, isPaused := err.(hooks.ErrPaused) if isPaused {