Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

graph: refactor Builder network message handling #9534

Merged
merged 1 commit into from
Feb 21, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 51 additions & 90 deletions graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ type Builder struct {
// of our currently known best chain are sent over.
staleBlocks <-chan *chainview.FilteredBlock

// networkUpdates is a channel that carries new topology updates
// topologyUpdates is a channel that carries new topology updates
// messages from outside the Builder to be processed by the
// networkHandler.
networkUpdates chan *routingMsg
topologyUpdates chan any

// topologyClients maps a client's unique notification ID to a
// topologyClient client that contains its notification dispatch
Expand Down Expand Up @@ -164,7 +164,7 @@ var _ ChannelGraphSource = (*Builder)(nil)
func NewBuilder(cfg *Config) (*Builder, error) {
return &Builder{
cfg: cfg,
networkUpdates: make(chan *routingMsg),
topologyUpdates: make(chan any),
topologyClients: &lnutils.SyncMap[uint64, *topologyClient]{},
ntfnClientUpdates: make(chan *topologyClientUpdate),
channelEdgeMtx: multimutex.NewMutex[uint64](),
Expand Down Expand Up @@ -656,59 +656,26 @@ func (b *Builder) pruneZombieChans() error {
return nil
}

// handleNetworkUpdate is responsible for processing the update message and
// notifies topology changes, if any.
// handleTopologyUpdate is responsible for sending any topology changes
// notifications to registered clients.
//
// NOTE: must be run inside goroutine.
func (b *Builder) handleNetworkUpdate(update *routingMsg) {
func (b *Builder) handleTopologyUpdate(update any) {
defer b.wg.Done()

// Process the routing update to determine if this is either a new
// update from our PoV or an update to a prior vertex/edge we
// previously accepted.
var err error
switch msg := update.msg.(type) {
case *models.LightningNode:
err = b.addNode(msg, update.op...)

case *models.ChannelEdgeInfo:
err = b.addEdge(msg, update.op...)

case *models.ChannelEdgePolicy:
err = b.updateEdge(msg, update.op...)

default:
err = errors.Errorf("wrong routing update message type")
}
update.err <- err

// If the error is not nil here, there's no need to send topology
// change.
if err != nil {
// Log as a debug message if this is not an error we need to be
// concerned about.
if IsError(err, ErrIgnored, ErrOutdated) {
log.Debugf("process network updates got: %v", err)
} else {
log.Errorf("process network updates got: %v", err)
}

return
}

// Otherwise, we'll send off a new notification for the newly accepted
// update, if any.
topChange := &TopologyChange{}
err = addToTopologyChange(b.cfg.Graph, topChange, update.msg)
err := addToTopologyChange(b.cfg.Graph, topChange, update)
if err != nil {
log.Errorf("unable to update topology change notification: %v",
err)
return
}

if !topChange.isEmpty() {
b.notifyTopologyChange(topChange)
if topChange.isEmpty() {
return
}

b.notifyTopologyChange(topChange)
}

// networkHandler is the primary goroutine for the Builder. The roles of
Expand All @@ -734,12 +701,11 @@ func (b *Builder) networkHandler() {
}

select {
// A new fully validated network update has just arrived. As a
// result we'll modify the channel graph accordingly depending
// on the exact type of the message.
case update := <-b.networkUpdates:
// A new fully validated topology update has just arrived.
// We'll notify any registered clients.
case update := <-b.topologyUpdates:
b.wg.Add(1)
go b.handleNetworkUpdate(update)
go b.handleTopologyUpdate(update)

// TODO(roasbeef): remove all unconnected vertexes
// after N blocks pass with no corresponding
Expand Down Expand Up @@ -1033,14 +999,6 @@ func (b *Builder) MarkZombieEdge(chanID uint64) error {
return nil
}

// routingMsg couples a routing related routing topology update to the
// error channel.
type routingMsg struct {
msg interface{}
op []batch.SchedulerOption
err chan error
}

// ApplyChannelUpdate validates a channel update and if valid, applies it to the
// database. It returns a bool indicating whether the updates were successful.
func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
Expand Down Expand Up @@ -1102,23 +1060,20 @@ func (b *Builder) ApplyChannelUpdate(msg *lnwire.ChannelUpdate1) bool {
func (b *Builder) AddNode(node *models.LightningNode,
op ...batch.SchedulerOption) error {

rMsg := &routingMsg{
msg: node,
op: op,
err: make(chan error, 1),
err := b.addNode(node, op...)
if err != nil {
logNetworkMsgProcessError(err)

return err
}

select {
case b.networkUpdates <- rMsg:
select {
case err := <-rMsg.err:
return err
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
case b.topologyUpdates <- node:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}

return nil
}

// addNode does some basic checks on the given LightningNode against what we
Expand Down Expand Up @@ -1155,23 +1110,20 @@ func (b *Builder) addNode(node *models.LightningNode,
func (b *Builder) AddEdge(edge *models.ChannelEdgeInfo,
op ...batch.SchedulerOption) error {

rMsg := &routingMsg{
msg: edge,
op: op,
err: make(chan error, 1),
err := b.addEdge(edge, op...)
if err != nil {
logNetworkMsgProcessError(err)

return err
}

select {
case b.networkUpdates <- rMsg:
select {
case err := <-rMsg.err:
return err
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
case b.topologyUpdates <- edge:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}

return nil
}

// addEdge does some validation on the new channel edge against what we
Expand Down Expand Up @@ -1265,23 +1217,20 @@ func (b *Builder) addEdge(edge *models.ChannelEdgeInfo,
func (b *Builder) UpdateEdge(update *models.ChannelEdgePolicy,
op ...batch.SchedulerOption) error {

rMsg := &routingMsg{
msg: update,
op: op,
err: make(chan error, 1),
err := b.updateEdge(update, op...)
if err != nil {
logNetworkMsgProcessError(err)

return err
}

select {
case b.networkUpdates <- rMsg:
select {
case err := <-rMsg.err:
return err
case <-b.quit:
return ErrGraphBuilderShuttingDown
}
case b.topologyUpdates <- update:
case <-b.quit:
return ErrGraphBuilderShuttingDown
}

return nil
}

// updateEdge validates the new edge policy against what we currently have
Expand Down Expand Up @@ -1375,6 +1324,18 @@ func (b *Builder) updateEdge(policy *models.ChannelEdgePolicy,
return nil
}

// logNetworkMsgProcessError logs the error received from processing a network
// message. It logs as a debug message if the error is not critical.
func logNetworkMsgProcessError(err error) {
if IsError(err, ErrIgnored, ErrOutdated) {
log.Debugf("process network updates got: %v", err)

return
}

log.Errorf("process network updates got: %v", err)
}

// CurrentBlockHeight returns the block height from POV of the router subsystem.
//
// NOTE: This method is part of the ChannelGraphSource interface.
Expand Down
Loading