From c89b616e7dcc9bda056240bf89c3eb61798ab87d Mon Sep 17 00:00:00 2001 From: Elle Mouton Date: Thu, 20 Feb 2025 10:40:15 -0300 Subject: [PATCH] graph: refactor Builder network message handling The exposed AddNode, AddEdge and UpdateEdge methods of the Builder are currently synchronous since even though they pass messages to the network handler which spins off the handling in a goroutine, the public methods still wait for a response from the handling before returning. The only part that is actually done asynchronously is the topology notifications. We previously tried to simplify things in [this commit](https://github.com/lightningnetwork/lnd/pull/9476/commits/d757b3bcfc77b0fbb543ef5db800e6c6b058930f) but we soon realised that there was a reason for sending the messages to the central/synchronous network handler first: it was to ensure consistency for topology clients: ie, the ordering between when there is a new topology client or if it is cancelled needs to be consistent and handled synchronously with new network updates. So for example, if a new update comes in right after a topology client cancels its subscription, then it should _not_ be notified. Similariy for new subscriptions. So this commit was reverted soon after. We can, however, still simplify things as is done in this commit by noting that _only topology subscriptions and notifications_ need to be handled separately. The actual network updates do not need to. So that is what is done here. This refactor will make moving the topology subscription logic to a new subsystem later on much easier. --- graph/builder.go | 141 +++++++++++++++++------------------------------ 1 file changed, 51 insertions(+), 90 deletions(-) diff --git a/graph/builder.go b/graph/builder.go index c379fe93f5..3e11155535 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -123,10 +123,10 @@ type Builder struct { // of our currently known best chain are sent over. staleBlocks <-chan *chainview.FilteredBlock - // networkUpdates is a channel that carries new topology updates + // topologyUpdates is a channel that carries new topology updates // messages from outside the Builder to be processed by the // networkHandler. - networkUpdates chan *routingMsg + topologyUpdates chan any // topologyClients maps a client's unique notification ID to a // topologyClient client that contains its notification dispatch @@ -164,7 +164,7 @@ var _ ChannelGraphSource = (*Builder)(nil) func NewBuilder(cfg *Config) (*Builder, error) { return &Builder{ cfg: cfg, - networkUpdates: make(chan *routingMsg), + topologyUpdates: make(chan any), topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{}, ntfnClientUpdates: make(chan *topologyClientUpdate), channelEdgeMtx: multimutex.NewMutex[uint64](), @@ -656,59 +656,26 @@ func (b *Builder) pruneZombieChans() error { return nil } -// handleNetworkUpdate is responsible for processing the update message and -// notifies topology changes, if any. +// handleTopologyUpdate is responsible for sending any topology changes +// notifications to registered clients. // // NOTE: must be run inside goroutine. -func (b *Builder) handleNetworkUpdate(update *routingMsg) { +func (b *Builder) handleTopologyUpdate(update any) { defer b.wg.Done() - // Process the routing update to determine if this is either a new - // update from our PoV or an update to a prior vertex/edge we - // previously accepted. - var err error - switch msg := update.msg.(type) { - case *models.LightningNode: - err = b.addNode(msg, update.op...) - - case *models.ChannelEdgeInfo: - err = b.addEdge(msg, update.op...) - - case *models.ChannelEdgePolicy: - err = b.updateEdge(msg, update.op...) - - default: - err = errors.Errorf("wrong routing update message type") - } - update.err <- err - - // If the error is not nil here, there's no need to send topology - // change. - if err != nil { - // Log as a debug message if this is not an error we need to be - // concerned about. - if IsError(err, ErrIgnored, ErrOutdated) { - log.Debugf("process network updates got: %v", err) - } else { - log.Errorf("process network updates got: %v", err) - } - - return - } - - // Otherwise, we'll send off a new notification for the newly accepted - // update, if any. topChange := &TopologyChange{} - err = addToTopologyChange(b.cfg.Graph, topChange, update.msg) + err := addToTopologyChange(b.cfg.Graph, topChange, update) if err != nil { log.Errorf("unable to update topology change notification: %v", err) return } - if !topChange.isEmpty() { - b.notifyTopologyChange(topChange) + if topChange.isEmpty() { + return } + + b.notifyTopologyChange(topChange) } // networkHandler is the primary goroutine for the Builder. The roles of @@ -734,12 +701,11 @@ func (b *Builder) networkHandler() { } select { - // A new fully validated network update has just arrived. As a - // result we'll modify the channel graph accordingly depending - // on the exact type of the message. - case update := <-b.networkUpdates: + // A new fully validated topology update has just arrived. + // We'll notify any registered clients. + case update := <-b.topologyUpdates: b.wg.Add(1) - go b.handleNetworkUpdate(update) + go b.handleTopologyUpdate(update) // TODO(roasbeef): remove all unconnected vertexes // after N blocks pass with no corresponding @@ -1033,14 +999,6 @@ func (b *Builder) MarkZombieEdge(chanID uint64) error { return nil } -// routingMsg couples a routing related routing topology update to the -// error channel. -type routingMsg struct { - msg interface{} - op []batch.SchedulerOption - err chan error -} - // ApplyChannelUpdate validates a channel update and if valid, applies it to the // database. It returns a bool indicating whether the updates were successful. func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool { @@ -1102,23 +1060,20 @@ func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool { func (b *Builder) AddNode(node *models.LightningNode, op ...batch.SchedulerOption) error { - rMsg := &routingMsg{ - msg: node, - op: op, - err: make(chan error, 1), + err := b.addNode(node, op...) + if err != nil { + logNetworkMsgProcessError(err) + + return err } select { - case b.networkUpdates <- rMsg: - select { - case err := <-rMsg.err: - return err - case <-b.quit: - return ErrGraphBuilderShuttingDown - } + case b.topologyUpdates <- node: case <-b.quit: return ErrGraphBuilderShuttingDown } + + return nil } // addNode does some basic checks on the given LightningNode against what we @@ -1155,23 +1110,20 @@ func (b *Builder) addNode(node *models.LightningNode, func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo, op ...batch.SchedulerOption) error { - rMsg := &routingMsg{ - msg: edge, - op: op, - err: make(chan error, 1), + err := b.addEdge(edge, op...) + if err != nil { + logNetworkMsgProcessError(err) + + return err } select { - case b.networkUpdates <- rMsg: - select { - case err := <-rMsg.err: - return err - case <-b.quit: - return ErrGraphBuilderShuttingDown - } + case b.topologyUpdates <- edge: case <-b.quit: return ErrGraphBuilderShuttingDown } + + return nil } // addEdge does some validation on the new channel edge against what we @@ -1265,23 +1217,20 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo, func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy, op ...batch.SchedulerOption) error { - rMsg := &routingMsg{ - msg: update, - op: op, - err: make(chan error, 1), + err := b.updateEdge(update, op...) + if err != nil { + logNetworkMsgProcessError(err) + + return err } select { - case b.networkUpdates <- rMsg: - select { - case err := <-rMsg.err: - return err - case <-b.quit: - return ErrGraphBuilderShuttingDown - } + case b.topologyUpdates <- update: case <-b.quit: return ErrGraphBuilderShuttingDown } + + return nil } // updateEdge validates the new edge policy against what we currently have @@ -1375,6 +1324,18 @@ func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy, return nil } +// logNetworkMsgProcessError logs the error received from processing a network +// message. It logs as a debug message if the error is not critical. +func logNetworkMsgProcessError(err error) { + if IsError(err, ErrIgnored, ErrOutdated) { + log.Debugf("process network updates got: %v", err) + + return + } + + log.Errorf("process network updates got: %v", err) +} + // CurrentBlockHeight returns the block height from POV of the router subsystem. // // NOTE: This method is part of the ChannelGraphSource interface.