From 26d1a41926215238c2297af5680268bbf0ef6610 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 20 Nov 2022 07:59:16 +0800 Subject: [PATCH 1/8] discovery+routing: add more logs to reveal channel update flow --- discovery/gossiper.go | 70 ++++++++++++++++++++++++++++++++----------- routing/router.go | 28 ++++++++++++++--- 2 files changed, 76 insertions(+), 22 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index bd2d3b605a..f3c497025e 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1242,6 +1242,9 @@ func (d *AuthenticatedGossiper) networkHandler() { announcement.msg, ) if err != nil { + log.Debugf("Validating network message %s got err: %v", + announcement.msg.MsgType(), err) + if !routing.IsError( err, routing.ErrVBarrierShuttingDown, @@ -1824,10 +1827,6 @@ func (d *AuthenticatedGossiper) isPremature(chanID lnwire.ShortChannelID, func (d *AuthenticatedGossiper) processNetworkAnnouncement( nMsg *networkMsg) ([]networkMsg, bool) { - log.Debugf("Processing network message: peer=%v, source=%x, msg=%s, "+ - "is_remote=%v", nMsg.peer, nMsg.source.SerializeCompressed(), - nMsg.msg.MsgType(), nMsg.isRemote) - // If this is a remote update, we set the scheduler option to lazily // add it to the graph. var schedulerOp []batch.SchedulerOption @@ -1947,7 +1946,7 @@ func (d *AuthenticatedGossiper) isMsgStale(msg lnwire.Message) bool { } if err != nil { log.Debugf("Unable to retrieve channel=%v from graph: "+ - "%v", err) + "%v", chanInfo.ChannelID, err) return false } @@ -2145,6 +2144,9 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, timestamp := time.Unix(int64(nodeAnn.Timestamp), 0) + log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+ + "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID) + // We'll quickly ask the router if it already has a newer update for // this node so we can skip validating signatures if not required. if d.cfg.Router.IsStaleNode(nodeAnn.NodeID, timestamp) { @@ -2199,6 +2201,10 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, nMsg.err <- nil // TODO(roasbeef): get rid of the above + + log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+ + "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID) + return announcements, true } @@ -2207,6 +2213,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, ann *lnwire.ChannelAnnouncement, ops []batch.SchedulerOption) ([]networkMsg, bool) { + log.Debugf("Processing ChannelAnnouncement: peer=%v, short_chan_id=%v", + nMsg.peer, ann.ShortChannelID.ToUint64()) + // We'll ignore any channel announcements that target any chain other // than the set of chains we know of. if !bytes.Equal(ann.ChainHash[:], d.cfg.ChainHash[:]) { @@ -2327,6 +2336,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, } } + log.Debugf("Adding edge for short_chan_id: %v", + ann.ShortChannelID.ToUint64()) + // We will add the edge to the channel router. If the nodes present in // this channel are not present in the database, a partial node will be // added to represent each node while we wait for a node announcement. @@ -2338,6 +2350,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, d.channelMtx.Lock(ann.ShortChannelID.ToUint64()) err := d.cfg.Router.AddEdge(edge, ops...) if err != nil { + log.Debugf("Router rejected edge for short_chan_id(%v): %v", + ann.ShortChannelID.ToUint64(), err) + defer d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) // If the edge was rejected due to already being known, then it @@ -2359,6 +2374,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, return nil, false } + log.Debugf("Extracted %v announcements from rejected "+ + "msgs", len(anns)) + // If while processing this rejected edge, we realized // there's a set of announcements we could extract, // then we'll return those directly. @@ -2367,11 +2385,8 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, return anns, true } - // Otherwise, this is just a regular rejected edge. - log.Debugf("Router rejected channel edge: %v", err) } else { - log.Debugf("Router rejected channel edge: %v", err) - + // Otherwise, this is just a regular rejected edge. key := newRejectCacheKey( ann.ShortChannelID.ToUint64(), sourceToPub(nMsg.source), @@ -2386,6 +2401,9 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, // If err is nil, release the lock immediately. d.channelMtx.Unlock(ann.ShortChannelID.ToUint64()) + log.Debugf("Finish adding edge for short_chan_id: %v", + ann.ShortChannelID.ToUint64()) + // If we earlier received any ChannelUpdates for this channel, we can // now process them, as the channel is added to the graph. shortChanID := ann.ShortChannelID.ToUint64() @@ -2456,6 +2474,10 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, } nMsg.err <- nil + + log.Debugf("Processed ChannelAnnouncement: peer=%v, short_chan_id=%v", + nMsg.peer, ann.ShortChannelID.ToUint64()) + return announcements, true } @@ -2464,6 +2486,9 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, upd *lnwire.ChannelUpdate, ops []batch.SchedulerOption) ([]networkMsg, bool) { + log.Debugf("Processing ChannelUpdate: peer=%v, short_chan_id=%v, ", + nMsg.peer, upd.ShortChannelID.ToUint64()) + // We'll ignore any channel updates that target any chain other than // the set of chains we know of. if !bytes.Equal(upd.ChainHash[:], d.cfg.ChainHash[:]) { @@ -2523,10 +2548,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, graphScid, timestamp, upd.ChannelFlags, ) { - log.Debugf("Ignored stale edge policy: peer=%v, source=%x, "+ - "msg=%s, is_remote=%v", nMsg.peer, - nMsg.source.SerializeCompressed(), nMsg.msg.MsgType(), - nMsg.isRemote, + log.Debugf("Ignored stale edge policy for short_chan_id(%v): "+ + "peer=%v, source=%x, msg=%s, is_remote=%v", shortChanID, + nMsg.peer, nMsg.source.SerializeCompressed(), + nMsg.msg.MsgType(), nMsg.isRemote, ) nMsg.err <- nil @@ -2649,6 +2674,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, edgeToUpdate = e2 } + log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+ + "edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(), + edgeToUpdate != nil) + // Validate the channel announcement with the expected public key and // channel capacity. In the case of an invalid channel update, we'll // return an error to the caller and exit early. @@ -2743,7 +2772,8 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, routing.ErrVBarrierShuttingDown, ) { - log.Debug(err) + log.Debugf("Update edge for short_chan_id(%v) got: %v", + shortChanID, err) } else { // Since we know the stored SCID in the graph, we'll // cache that SCID. @@ -2753,7 +2783,8 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, ) _, _ = d.recentRejects.Put(key, &cachedReject{}) - log.Error(err) + log.Errorf("Update edge for short_chan_id(%v) got: %v", + shortChanID, err) } nMsg.err <- err @@ -2801,8 +2832,7 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, ) log.Debugf("The message %v has no AuthProof, sending the "+ - "update to remote peer %x", upd.MsgType(), - remotePubKey) + "update to remote peer %x", upd.MsgType(), remotePubKey) // Now we'll attempt to send the channel update message // reliably to the remote peer in the background, so that we @@ -2832,6 +2862,10 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, } nMsg.err <- nil + + log.Debugf("Processed ChannelUpdate: peer=%v, short_chan_id=%v, "+ + "timestamp=%v", nMsg.peer, upd.ShortChannelID.ToUint64(), + timestamp) return announcements, true } @@ -2848,7 +2882,7 @@ func (d *AuthenticatedGossiper) handleAnnSig(nMsg *networkMsg, prefix = "remote" } - log.Infof("Received new %v channel announcement for %v", prefix, + log.Infof("Received new %v announcement signature for %v", prefix, ann.ShortChannelID) // By the specification, channel announcement proofs should be sent diff --git a/routing/router.go b/routing/router.go index 01d4b50222..8d61a25f25 100644 --- a/routing/router.go +++ b/routing/router.go @@ -1099,6 +1099,8 @@ func (r *ChannelRouter) networkHandler() { update.msg, allowDependents, ) if err != nil { + log.Debugf("process network updates "+ + "got: %v", err) return } @@ -1439,6 +1441,9 @@ func (r *ChannelRouter) processUpdate(msg interface{}, r.stats.incNumNodeUpdates() case *channeldb.ChannelEdgeInfo: + log.Debugf("Received ChannelEdgeInfo for channel %v", + msg.ChannelID) + // Prior to processing the announcement we first check if we // already know of this channel, if so, then we can exit early. _, _, exists, isZombie, err := r.cfg.Graph.HasChannelEdge( @@ -1584,7 +1589,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}, return errors.Errorf("unable to add edge: %v", err) } - log.Tracef("New channel discovered! Link "+ + log.Debugf("New channel discovered! Link "+ "connects %x and %x with ChannelPoint(%v): "+ "chan_id=%v, capacity=%v", msg.NodeKey1Bytes, msg.NodeKey2Bytes, @@ -1610,6 +1615,9 @@ func (r *ChannelRouter) processUpdate(msg interface{}, } case *channeldb.ChannelEdgePolicy: + log.Debugf("Received ChannelEdgePolicy for channel %v", + msg.ChannelID) + // We make sure to hold the mutex for this channel ID, // such that no other goroutine is concurrently doing // database accesses for the same channel ID. @@ -1685,7 +1693,7 @@ func (r *ChannelRouter) processUpdate(msg interface{}, return err } - log.Tracef("New channel update applied: %v", + log.Debugf("New channel update applied: %v", newLogClosure(func() string { return spew.Sdump(msg) })) r.stats.incNumChannelUpdates() @@ -2608,10 +2616,18 @@ func (r *ChannelRouter) AddProof(chanID lnwire.ShortChannelID, // target node with a more recent timestamp. // // NOTE: This method is part of the ChannelGraphSource interface. -func (r *ChannelRouter) IsStaleNode(node route.Vertex, timestamp time.Time) bool { +func (r *ChannelRouter) IsStaleNode(node route.Vertex, + timestamp time.Time) bool { + // If our attempt to assert that the node announcement is fresh fails, // then we know that this is actually a stale announcement. - return r.assertNodeAnnFreshness(node, timestamp) != nil + err := r.assertNodeAnnFreshness(node, timestamp) + if err != nil { + log.Debugf("Checking stale node %x got %v", node, err) + return true + } + + return false } // IsPublicNode determines whether the given vertex is seen as a public node in @@ -2641,6 +2657,7 @@ func (r *ChannelRouter) IsStaleEdgePolicy(chanID lnwire.ShortChannelID, edge1Timestamp, edge2Timestamp, exists, isZombie, err := r.cfg.Graph.HasChannelEdge(chanID.ToUint64()) if err != nil { + log.Debugf("Check stale edge policy got error: %v", err) return false } @@ -2788,6 +2805,7 @@ func (r *ChannelRouter) BuildRoute(amt *lnwire.MilliSatoshi, // Exit if there are no channels. unifiedPolicy, ok := u.policies[fromNode] if !ok { + log.Errorf("Cannot find policy for node %v", fromNode) return nil, ErrNoChannel{ fromNode: fromNode, position: i, @@ -2806,6 +2824,8 @@ func (r *ChannelRouter) BuildRoute(amt *lnwire.MilliSatoshi, // to forward. policy := unifiedPolicy.getPolicy(runningAmt, bandwidthHints) if policy == nil { + log.Errorf("Cannot find policy with amt=%v for node %v", + runningAmt, fromNode) return nil, ErrNoChannel{ fromNode: fromNode, position: i, From 29c24588317dd592d5ad641443097a8045c9c633 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Wed, 16 Nov 2022 04:01:38 +0800 Subject: [PATCH 2/8] routing: add method `handleNetworkUpdate` to process the network update This commit refactors the `networkHandler` to use the new method `handleNetworkUpdate`. Because the `select` is called inside a for loop, which is equivalent of firing goroutine inside range loop, it's possible that a variable used inside a previous goroutine is referencing the current one. This is now fixed by making the goroutine taking the params used for network update. --- routing/router.go | 143 +++++++++++++++++++++++----------------------- 1 file changed, 73 insertions(+), 70 deletions(-) diff --git a/routing/router.go b/routing/router.go index 8d61a25f25..0588d51226 100644 --- a/routing/router.go +++ b/routing/router.go @@ -991,6 +991,78 @@ func (r *ChannelRouter) pruneZombieChans() error { return nil } +// handleNetworkUpdate is responsible for processing the update message and +// notifies topology changes, if any. +// +// NOTE: must be run inside goroutine. +func (r *ChannelRouter) handleNetworkUpdate(vb *ValidationBarrier, + update *routingMsg) { + + defer r.wg.Done() + defer vb.CompleteJob() + + // If this message has an existing dependency, then we'll wait until + // that has been fully validated before we proceed. + err := vb.WaitForDependants(update.msg) + if err != nil { + switch { + case IsError(err, ErrVBarrierShuttingDown): + update.err <- err + + case IsError(err, ErrParentValidationFailed): + update.err <- newErrf(ErrIgnored, err.Error()) + + default: + log.Warnf("unexpected error during validation "+ + "barrier shutdown: %v", err) + update.err <- err + } + + return + } + + // Process the routing update to determine if this is either a new + // update from our PoV or an update to a prior vertex/edge we + // previously accepted. + err = r.processUpdate(update.msg, update.op...) + update.err <- err + + // If this message had any dependencies, then we can now signal them to + // continue. + allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated) + vb.SignalDependants(update.msg, allowDependents) + + // If the error is not nil here, there's no need to send topology + // change. + if err != nil { + // We now decide to log an error or not. If allowDependents is + // false, it means there is an error and the error is neither + // ErrIgnored or ErrOutdated. In this case, we'll log an error. + // Otherwise, we'll add debug log only. + if allowDependents { + log.Debugf("process network updates got: %v", err) + } else { + log.Errorf("process network updates got: %v", err) + } + + return + } + + // Otherwise, we'll send off a new notification for the newly accepted + // update, if any. + topChange := &TopologyChange{} + err = addToTopologyChange(r.cfg.Graph, topChange, update.msg) + if err != nil { + log.Errorf("unable to update topology change notification: %v", + err) + return + } + + if !topChange.isEmpty() { + r.notifyTopologyChange(topChange) + } +} + // networkHandler is the primary goroutine for the ChannelRouter. The roles of // this goroutine include answering queries related to the state of the // network, pruning the graph on new block notification, applying network @@ -1050,76 +1122,7 @@ func (r *ChannelRouter) networkHandler() { validationBarrier.InitJobDependencies(update.msg) r.wg.Add(1) - go func() { - defer r.wg.Done() - defer validationBarrier.CompleteJob() - - // If this message has an existing dependency, - // then we'll wait until that has been fully - // validated before we proceed. - err := validationBarrier.WaitForDependants( - update.msg, - ) - if err != nil { - switch { - case IsError( - err, ErrVBarrierShuttingDown, - ): - update.err <- err - - case IsError( - err, ErrParentValidationFailed, - ): - update.err <- newErrf( - ErrIgnored, err.Error(), - ) - - default: - log.Warnf("unexpected error "+ - "during validation "+ - "barrier shutdown: %v", - err) - update.err <- err - } - return - } - - // Process the routing update to determine if - // this is either a new update from our PoV or - // an update to a prior vertex/edge we - // previously accepted. - err = r.processUpdate(update.msg, update.op...) - update.err <- err - - // If this message had any dependencies, then - // we can now signal them to continue. - allowDependents := err == nil || - IsError(err, ErrIgnored, ErrOutdated) - validationBarrier.SignalDependants( - update.msg, allowDependents, - ) - if err != nil { - log.Debugf("process network updates "+ - "got: %v", err) - return - } - - // Send off a new notification for the newly - // accepted update. - topChange := &TopologyChange{} - err = addToTopologyChange( - r.cfg.Graph, topChange, update.msg, - ) - if err != nil { - log.Errorf("unable to update topology "+ - "change notification: %v", err) - return - } - - if !topChange.isEmpty() { - r.notifyTopologyChange(topChange) - } - }() + go r.handleNetworkUpdate(validationBarrier, update) // TODO(roasbeef): remove all unconnected vertexes // after N blocks pass with no corresponding From b237dbfd742b1618e09d38a480577c76727ca05c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Tue, 22 Nov 2022 05:14:07 +0800 Subject: [PATCH 3/8] discovery: add method `handleNetworkMessages` to process messages --- discovery/gossiper.go | 129 ++++++++++++++++++++---------------------- 1 file changed, 62 insertions(+), 67 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index f3c497025e..5b9d025da6 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1231,73 +1231,10 @@ func (d *AuthenticatedGossiper) networkHandler() { validationBarrier.InitJobDependencies(announcement.msg) d.wg.Add(1) - go func() { - defer d.wg.Done() - defer validationBarrier.CompleteJob() - - // If this message has an existing dependency, - // then we'll wait until that has been fully - // validated before we proceed. - err := validationBarrier.WaitForDependants( - announcement.msg, - ) - if err != nil { - log.Debugf("Validating network message %s got err: %v", - announcement.msg.MsgType(), err) - - if !routing.IsError( - err, - routing.ErrVBarrierShuttingDown, - routing.ErrParentValidationFailed, - ) { - - log.Warnf("unexpected error "+ - "during validation "+ - "barrier shutdown: %v", - err) - } - announcement.err <- err - return - } - - // Process the network announcement to - // determine if this is either a new - // announcement from our PoV or an edges to a - // prior vertex/edge we previously proceeded. - emittedAnnouncements, allowDependents := d.processNetworkAnnouncement( - announcement, - ) - - log.Tracef("Processed network message %s, "+ - "returned len(announcements)=%v, "+ - "allowDependents=%v", - announcement.msg.MsgType(), - len(emittedAnnouncements), - allowDependents) - - // If this message had any dependencies, then - // we can now signal them to continue. - validationBarrier.SignalDependants( - announcement.msg, allowDependents, - ) - - // If the announcement was accepted, then add - // the emitted announcements to our announce - // batch to be broadcast once the trickle timer - // ticks gain. - if emittedAnnouncements != nil && shouldBroadcast { - // TODO(roasbeef): exclude peer that - // sent. - announcements.AddMsgs( - emittedAnnouncements..., - ) - } else if emittedAnnouncements != nil { - log.Trace("Skipping broadcast of " + - "announcements received " + - "during initial graph sync") - } - - }() + go d.handleNetworkMessages( + announcement, &announcements, + validationBarrier, shouldBroadcast, + ) // The trickle timer has ticked, which indicates we should // flush to the network the pending batch of new announcements @@ -1362,6 +1299,64 @@ func (d *AuthenticatedGossiper) networkHandler() { } } +// handleNetworkMessages is responsible for waiting for dependencies for a +// given network message and processing the message. Once processed, it will +// signal its dependants and add the new announcements to the announce batch. +// +// NOTE: must be run as a goroutine. +func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, + deDuped *deDupedAnnouncements, vb *routing.ValidationBarrier, + shouldBroadcast bool) { + + defer d.wg.Done() + defer vb.CompleteJob() + + // If this message has an existing dependency, then we'll wait until + // that has been fully validated before we proceed. + err := vb.WaitForDependants(nMsg.msg) + if err != nil { + log.Debugf("Validating network message %s got err: %v", + nMsg.msg.MsgType(), err) + + if !routing.IsError( + err, + routing.ErrVBarrierShuttingDown, + routing.ErrParentValidationFailed, + ) { + + log.Warnf("unexpected error during validation "+ + "barrier shutdown: %v", err) + } + nMsg.err <- err + + return + } + + // Process the network announcement to determine if this is either a + // new announcement from our PoV or an edges to a prior vertex/edge we + // previously proceeded. + newAnns, allow := d.processNetworkAnnouncement(nMsg) + + log.Tracef("Processed network message %s, returned "+ + "len(announcements)=%v, allowDependents=%v", + nMsg.msg.MsgType(), len(newAnns), allow) + + // If this message had any dependencies, then we can now signal them to + // continue. + vb.SignalDependants(nMsg.msg, allow) + + // If the announcement was accepted, then add the emitted announcements + // to our announce batch to be broadcast once the trickle timer ticks + // gain. + if newAnns != nil && shouldBroadcast { + // TODO(roasbeef): exclude peer that sent. + deDuped.AddMsgs(newAnns...) + } else if newAnns != nil { + log.Trace("Skipping broadcast of announcements received " + + "during initial graph sync") + } +} + // TODO(roasbeef): d/c peers that send updates not on our chain // InitSyncState is called by outside sub-systems when a connection is From 152a438fbe6cc4fc519e9374c3cdb99f2bd3bc10 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 20 Nov 2022 08:25:01 +0800 Subject: [PATCH 4/8] discovery: move `shouldBroadcast` inside goroutine This commit moves the `shouldBroadcast` logic closer to the execution logic of deciding whether we want to broadcast the announcements. This is a pure code refactor and should make no difference in announcing message unless the `d.syncMgr.IsGraphSynced()` gives different results inside the goroutine. --- discovery/gossiper.go | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 5b9d025da6..77d23bde18 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1187,12 +1187,6 @@ func (d *AuthenticatedGossiper) networkHandler() { announcement.msg.MsgType(), announcement.isRemote) - // We should only broadcast this message forward if it - // originated from us or it wasn't received as part of - // our initial historical sync. - shouldBroadcast := !announcement.isRemote || - d.syncMgr.IsGraphSynced() - switch announcement.msg.(type) { // Channel announcement signatures are amongst the only // messages that we'll process serially. @@ -1232,8 +1226,7 @@ func (d *AuthenticatedGossiper) networkHandler() { d.wg.Add(1) go d.handleNetworkMessages( - announcement, &announcements, - validationBarrier, shouldBroadcast, + announcement, &announcements, validationBarrier, ) // The trickle timer has ticked, which indicates we should @@ -1305,12 +1298,15 @@ func (d *AuthenticatedGossiper) networkHandler() { // // NOTE: must be run as a goroutine. func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, - deDuped *deDupedAnnouncements, vb *routing.ValidationBarrier, - shouldBroadcast bool) { + deDuped *deDupedAnnouncements, vb *routing.ValidationBarrier) { defer d.wg.Done() defer vb.CompleteJob() + // We should only broadcast this message forward if it originated from + // us or it wasn't received as part of our initial historical sync. + shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced() + // If this message has an existing dependency, then we'll wait until // that has been fully validated before we proceed. err := vb.WaitForDependants(nMsg.msg) From 8dceb739ff62d8aa8e91be717c89068c16aa65e2 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Sun, 20 Nov 2022 10:53:23 +0800 Subject: [PATCH 5/8] routing: refactor `WaitForDependants` to allow detailed logging --- routing/validation_barrier.go | 62 ++++++++++++++++++++++++----------- 1 file changed, 42 insertions(+), 20 deletions(-) diff --git a/routing/validation_barrier.go b/routing/validation_barrier.go index 80866acd50..ed1ac71f0f 100644 --- a/routing/validation_barrier.go +++ b/routing/validation_barrier.go @@ -1,6 +1,7 @@ package routing import ( + "fmt" "sync" "github.com/lightningnetwork/lnd/channeldb" @@ -177,56 +178,77 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { var ( signals *validationSignals ok bool + jobDesc string ) + // Acquire a lock to read ValidationBarrier. v.Lock() - switch msg := job.(type) { + switch msg := job.(type) { // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the // completion of any active ChannelAnnouncement jobs related to them. case *channeldb.ChannelEdgePolicy: shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID) signals, ok = v.chanEdgeDependencies[shortID] + + jobDesc = fmt.Sprintf("job=lnwire.ChannelEdgePolicy, scid=%v", + msg.ChannelID) + case *channeldb.LightningNode: vertex := route.Vertex(msg.PubKeyBytes) signals, ok = v.nodeAnnDependencies[vertex] + + jobDesc = fmt.Sprintf("job=channeldb.LightningNode, pub=%x", + vertex) + case *lnwire.ChannelUpdate: signals, ok = v.chanEdgeDependencies[msg.ShortChannelID] + + jobDesc = fmt.Sprintf("job=lnwire.ChannelUpdate, scid=%v", + msg.ShortChannelID.ToUint64()) + case *lnwire.NodeAnnouncement: vertex := route.Vertex(msg.NodeID) signals, ok = v.nodeAnnDependencies[vertex] + jobDesc = fmt.Sprintf("job=lnwire.NodeAnnouncement, pub=%x", + vertex) // Other types of jobs can be executed immediately, so we'll just // return directly. case *lnwire.AnnounceSignatures: // TODO(roasbeef): need to wait on chan ann? - v.Unlock() - return nil case *channeldb.ChannelEdgeInfo: - v.Unlock() - return nil case *lnwire.ChannelAnnouncement: - v.Unlock() - return nil } + + // Release the lock once the above read is finished. v.Unlock() + // If it's not ok, it means either the job is not a dependent type, or + // it doesn't have a dependency signal. Either way, we can return + // early. + if !ok { + return nil + } + + log.Debugf("Waiting for dependent on %s", jobDesc) + // If we do have an active job, then we'll wait until either the signal // is closed, or the set of jobs exits. - if ok { - select { - case <-v.quit: - return newErrf(ErrVBarrierShuttingDown, - "validation barrier shutting down") - case <-signals.deny: - return newErrf(ErrParentValidationFailed, - "parent validation failed") - case <-signals.allow: - return nil - } - } + select { + case <-v.quit: + return newErrf(ErrVBarrierShuttingDown, + "validation barrier shutting down") + + case <-signals.deny: + log.Debugf("Signal deny for %s", jobDesc) + return newErrf(ErrParentValidationFailed, + "parent validation failed") - return nil + case <-signals.allow: + log.Tracef("Signal allow for %s", jobDesc) + return nil + } } // SignalDependants will allow/deny any jobs that are dependent on this job that From 716c685f10c6cfdb84abc74900773c7be2b96f4c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 21 Nov 2022 01:18:23 +0800 Subject: [PATCH 6/8] discovery+peer: add logs to reveal shutdown flow Also adds a `TODO` for checking the err chan. --- discovery/gossiper.go | 1 + discovery/reliable_sender.go | 3 +++ discovery/sync_manager.go | 3 +++ discovery/syncer.go | 3 +++ peer/brontide.go | 2 ++ 5 files changed, 12 insertions(+) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 77d23bde18..c4dd5185ee 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -658,6 +658,7 @@ func (d *AuthenticatedGossiper) Stop() error { func (d *AuthenticatedGossiper) stop() { log.Info("Authenticated Gossiper is stopping") + defer log.Info("Authenticated Gossiper stopped") d.blockEpochs.Cancel() diff --git a/discovery/reliable_sender.go b/discovery/reliable_sender.go index b22a15ae1e..b4d32e73fd 100644 --- a/discovery/reliable_sender.go +++ b/discovery/reliable_sender.go @@ -84,6 +84,9 @@ func (s *reliableSender) Start() error { // Stop halts the reliable sender from sending messages to peers. func (s *reliableSender) Stop() { s.stop.Do(func() { + log.Debugf("reliableSender is stopping") + defer log.Debugf("reliableSender stopped") + close(s.quit) s.wg.Wait() }) diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index d834be6b2f..8123c37eb1 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -183,6 +183,9 @@ func (m *SyncManager) Start() { // Stop stops the SyncManager from performing its duties. func (m *SyncManager) Stop() { m.stop.Do(func() { + log.Debugf("SyncManager is stopping") + defer log.Debugf("SyncManager stopped") + close(m.quit) m.wg.Wait() diff --git a/discovery/syncer.go b/discovery/syncer.go index 11424a61ea..f5c1bace53 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -450,6 +450,9 @@ func (g *GossipSyncer) Start() { // exited. func (g *GossipSyncer) Stop() { g.stopped.Do(func() { + log.Debugf("Stopping GossipSyncer(%x)", g.cfg.peerPub[:]) + defer log.Debugf("GossipSyncer(%x) stopped", g.cfg.peerPub[:]) + close(g.quit) g.wg.Wait() }) diff --git a/peer/brontide.go b/peer/brontide.go index 018f81245d..7ac65aaf84 100644 --- a/peer/brontide.go +++ b/peer/brontide.go @@ -1401,6 +1401,8 @@ func newChanMsgStream(p *Brontide, cid lnwire.ChannelID) *msgStream { // channel announcements. func newDiscMsgStream(p *Brontide) *msgStream { apply := func(msg lnwire.Message) { + // TODO(yy): `ProcessRemoteAnnouncement` returns an error chan + // and we need to process it. p.cfg.AuthGossiper.ProcessRemoteAnnouncement(msg, p) } From 443095a9073b5da055a8e3fcd774c9ac52c9fb8c Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 21 Nov 2022 09:40:03 +0800 Subject: [PATCH 7/8] discovery: signal allow for ignored channel announcements This commit makes the `handleChanAnnouncement` always returning `true` for messages processed but ignored by router, even when the extracted announcements are empty. Previously we'd return false when the announcements are empty, which could cause `ChannelUpdate`s being ignored since the validation barrier will signal deny for the jobs. This can easily be trigger using following setup, 1. Alice connects to Bob and open a channel. 2. Alice connects to Carol, Bob connects to Carol. 3. Once the channel is open, Alice and Bob will both announce it to Carol. At some point, we'd have the following messages in Carol's node, - Alice's ChannelAnnouncement - Alice's ChannelUpdates, for both directions - Bob's ChannelAnnouncement - Bob's ChannelUpdates, for both directions And a bug could happen, if, - Alice's ChannelAnnouncement is processed by router, hence added to db, but not reporting back to gossiper yet, so the validation barrier hasn't sent signal allow. - Bob's ChannelAnnouncement is processed by router, and returned `ErrIgnored` as the edge info is already in db, and reported back to gossiper, the validation barrier will signal deny to all the ChannelUpdates jobs. - Depending on how fast Alice's ChannelAnnouncement is processed, we may get zero to four denies to the above ChannelUpdates, causing a channel edge policy never being updated. --- discovery/gossiper.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index c4dd5185ee..8bb5a52145 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -2372,11 +2372,12 @@ func (d *AuthenticatedGossiper) handleChanAnnouncement(nMsg *networkMsg, // If while processing this rejected edge, we realized // there's a set of announcements we could extract, // then we'll return those directly. - if len(anns) != 0 { - nMsg.err <- nil - return anns, true - } + // + // NOTE: since this is an ErrIgnored, we can return + // true here to signal "allow" to its dependants. + nMsg.err <- nil + return anns, true } else { // Otherwise, this is just a regular rejected edge. key := newRejectCacheKey( From 12061746487aa2adad532d1cb3e1b17dc127dd3f Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Mon, 21 Nov 2022 13:17:25 +0800 Subject: [PATCH 8/8] docs: add release notes for gossip fix --- docs/release-notes/release-notes-0.16.0.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/release-notes/release-notes-0.16.0.md b/docs/release-notes/release-notes-0.16.0.md index b01399301f..03d6ebee56 100644 --- a/docs/release-notes/release-notes-0.16.0.md +++ b/docs/release-notes/release-notes-0.16.0.md @@ -215,6 +215,9 @@ certain large transactions](https://github.com/lightningnetwork/lnd/pull/7100). `getblockhash`, and `getbestblock`. These commands provide access to chain block data. +* [Fixed a bug](https://github.com/lightningnetwork/lnd/pull/7186) that might + lead to channel updates being missed, causing channel graph being incomplete. + ## Code Health * [test: use `T.TempDir` to create temporary test