From 3ea749399facf9a61aa4f153e13c040455b9c933 Mon Sep 17 00:00:00 2001 From: Marten Seemann Date: Fri, 26 Nov 2021 17:14:43 +0400 Subject: [PATCH] only remove entries from peerstore after a grace period of one minute --- p2p/host/basic/basic_host.go | 49 +++++++++++++++++++++++++++++------- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/p2p/host/basic/basic_host.go b/p2p/host/basic/basic_host.go index bdd92608d3..28fa0c38d8 100644 --- a/p2p/host/basic/basic_host.go +++ b/p2p/host/basic/basic_host.go @@ -73,7 +73,9 @@ type BasicHost struct { // keep track of resources we need to wait on before shutting down refCount sync.WaitGroup - network network.Network + networkCtxCancel context.CancelFunc // the context is canceled once the network has shut down + network network.Network + mux *msmux.MultistreamMuxer ids identify.IDService hps *holepunch.Service @@ -353,8 +355,10 @@ func (h *BasicHost) updateLocalIpAddr() { // Start starts watchForAddrChanges tasks in the host func (h *BasicHost) Start() { h.refCount.Add(2) + ctx, cancel := context.WithCancel(context.Background()) + h.networkCtxCancel = cancel go h.watchForAddrChanges() - go h.gcPeerstore() + go h.gcPeerstore(ctx) } // newStreamHandler is the remote-opened stream handler for network.Network @@ -522,7 +526,10 @@ func (h *BasicHost) watchForAddrChanges() { } } -func (h *BasicHost) gcPeerstore() { +// gcPeerstore removes disconnected peers from the peer store one minute after they have disconnected. +// The ctx controls the shutdown of this function. +// Don't use the host's context here. The context here must not be closed before the network has shut down. +func (h *BasicHost) gcPeerstore(ctx context.Context) { defer h.refCount.Done() sub, err := h.EventBus().Subscribe(&event.EvtPeerConnectednessChanged{}) if err != nil { @@ -530,15 +537,36 @@ func (h *BasicHost) gcPeerstore() { return } defer sub.Close() + + m := make(map[peer.ID]time.Time) + ticker := time.NewTicker(20 * time.Second) + const gracePeriod = time.Minute // the time we keep old entries around for { - // Note that this might shut down before the swarm has closed all connections. select { - case <-h.ctx.Done(): + case <-ctx.Done(): + for p := range m { + h.Peerstore().RemovePeer(p) + } return case e := <-sub.Out(): ev := e.(event.EvtPeerConnectednessChanged) - if ev.Connectedness == network.NotConnected { - h.Peerstore().RemovePeer(ev.Peer) + p := ev.Peer + switch ev.Connectedness { + case network.NotConnected: + if _, ok := m[p]; !ok { + m[p] = time.Now() + } + case network.Connected: + // If we reconnect to the peer before we've cleared the information, + // keep it. + delete(m, p) + } + case now := <-ticker.C: + for p, disconnectTime := range m { + if now.Add(gracePeriod).Before(disconnectTime) { + h.Peerstore().RemovePeer(p) + delete(m, p) + } } } } @@ -1059,12 +1087,15 @@ func (h *BasicHost) Close() error { _ = h.emitters.evtLocalAddrsUpdated.Close() h.Network().Close() _ = h.emitters.evtPeerConnectednessChanged.Close() + if h.networkCtxCancel != nil { + h.networkCtxCancel() + } + + h.refCount.Wait() if h.Peerstore() != nil { h.Peerstore().Close() } - - h.refCount.Wait() }) return nil