Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

discovery+graph: track job set dependencies in vb #9241

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 7 additions & 34 deletions graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,45 +675,20 @@ func (b *Builder) handleNetworkUpdate(vb *ValidationBarrier,
defer b.wg.Done()
defer vb.CompleteJob()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit title nit: this is not the router, it's the graph.Builder


// If this message has an existing dependency, then we'll wait until
// that has been fully validated before we proceed.
err := vb.WaitForDependants(update.msg)
if err != nil {
switch {
case IsError(err, ErrVBarrierShuttingDown):
update.err <- err

case IsError(err, ErrParentValidationFailed):
update.err <- NewErrf(ErrIgnored, err.Error()) //nolint

default:
log.Warnf("unexpected error during validation "+
"barrier shutdown: %v", err)
update.err <- err
}

return
}

// Process the routing update to determine if this is either a new
// update from our PoV or an update to a prior vertex/edge we
// previously accepted.
err = b.processUpdate(update.msg, update.op...)
err := b.processUpdate(update.msg, update.op...)
update.err <- err

// If this message had any dependencies, then we can now signal them to
// continue.
allowDependents := err == nil || IsError(err, ErrIgnored, ErrOutdated)
vb.SignalDependants(update.msg, allowDependents)
notError := err == nil || IsError(err, ErrIgnored, ErrOutdated)

// If the error is not nil here, there's no need to send topology
// change.
if err != nil {
// We now decide to log an error or not. If allowDependents is
// false, it means there is an error and the error is neither
// ErrIgnored or ErrOutdated. In this case, we'll log an error.
// Otherwise, we'll add debug log only.
if allowDependents {
// Log as a debug message if this is not an error we need to be
// concerned about.
if notError {
log.Debugf("process network updates got: %v", err)
} else {
log.Errorf("process network updates got: %v", err)
Expand Down Expand Up @@ -789,10 +764,8 @@ func (b *Builder) networkHandler() {
// result we'll modify the channel graph accordingly depending
// on the exact type of the message.
case update := <-b.networkUpdates:
// We'll set up any dependants, and wait until a free
// slot for this job opens up, this allows us to not
// have thousands of goroutines active.
validationBarrier.InitJobDependencies(update.msg)
// We only need to consume a job slot.
validationBarrier.FetchJobSlot()

b.wg.Add(1)
go b.handleNetworkUpdate(validationBarrier, update)
Expand Down
64 changes: 12 additions & 52 deletions graph/validation_barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"sync"

"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/channeldb/models"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)
Expand Down Expand Up @@ -80,6 +78,18 @@ func NewValidationBarrier(numActiveReqs int,
return v
}

// FetchJobSlot fetches a job slot. This is only used in the graph/builder.go
// code since it needs to use the sempahore and no other ValidationBarrier
// functionality.
func (v *ValidationBarrier) FetchJobSlot() {
// We'll wait for either a new slot to become open, or for the quit
Comment on lines +93 to +94
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the builder code is called by the gossiper code which itself also uses a semaphore - why isnt that inheritance enough?

// channel to be closed.
select {
case <-v.validationSemaphore:
case <-v.quit:
}
}

// InitJobDependencies will wait for a new job slot to become open, and then
// sets up any dependent signals/trigger for the new job
func (v *ValidationBarrier) InitJobDependencies(job interface{}) {
Expand Down Expand Up @@ -126,33 +136,14 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) {
v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals
v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals
}
case *models.ChannelEdgeInfo:

shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
if _, ok := v.chanAnnFinSignal[shortID]; !ok {
signals := &validationSignals{
allow: make(chan struct{}),
deny: make(chan struct{}),
}

v.chanAnnFinSignal[shortID] = signals
v.chanEdgeDependencies[shortID] = signals

v.nodeAnnDependencies[route.Vertex(msg.NodeKey1Bytes)] = signals
v.nodeAnnDependencies[route.Vertex(msg.NodeKey2Bytes)] = signals
}

// These other types don't have any dependants, so no further
// initialization needs to be done beyond just occupying a job slot.
case *models.ChannelEdgePolicy:
return
case *lnwire.ChannelUpdate1:
return
case *lnwire.NodeAnnouncement:
// TODO(roasbeef): node ann needs to wait on existing channel updates
return
case *channeldb.LightningNode:
return
case *lnwire.AnnounceSignatures1:
// TODO(roasbeef): need to wait on chan ann?
return
Expand Down Expand Up @@ -188,20 +179,6 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error {
switch msg := job.(type) {
// Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the
// completion of any active ChannelAnnouncement jobs related to them.
case *models.ChannelEdgePolicy:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
signals, ok = v.chanEdgeDependencies[shortID]

jobDesc = fmt.Sprintf("job=lnwire.ChannelEdgePolicy, scid=%v",
msg.ChannelID)

case *channeldb.LightningNode:
vertex := route.Vertex(msg.PubKeyBytes)
signals, ok = v.nodeAnnDependencies[vertex]

jobDesc = fmt.Sprintf("job=channeldb.LightningNode, pub=%s",
vertex)

case *lnwire.ChannelUpdate1:
signals, ok = v.chanEdgeDependencies[msg.ShortChannelID]

Expand All @@ -218,7 +195,6 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error {
// return directly.
case *lnwire.AnnounceSignatures1:
// TODO(roasbeef): need to wait on chan ann?
case *models.ChannelEdgeInfo:
case *lnwire.ChannelAnnouncement1:
}

Expand Down Expand Up @@ -264,17 +240,6 @@ func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) {
// If we've just finished executing a ChannelAnnouncement, then we'll
// close out the signal, and remove the signal from the map of active
// ones. This will allow/deny any dependent jobs to continue execution.
case *models.ChannelEdgeInfo:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
finSignals, ok := v.chanAnnFinSignal[shortID]
if ok {
if allow {
close(finSignals.allow)
} else {
close(finSignals.deny)
}
delete(v.chanAnnFinSignal, shortID)
}
case *lnwire.ChannelAnnouncement1:
finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID]
if ok {
Expand All @@ -291,15 +256,10 @@ func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) {
// For all other job types, we'll delete the tracking entries from the
// map, as if we reach this point, then all dependants have already
// finished executing and we can proceed.
case *channeldb.LightningNode:
delete(v.nodeAnnDependencies, route.Vertex(msg.PubKeyBytes))
case *lnwire.NodeAnnouncement:
delete(v.nodeAnnDependencies, route.Vertex(msg.NodeID))
case *lnwire.ChannelUpdate1:
delete(v.chanEdgeDependencies, msg.ShortChannelID)
case *models.ChannelEdgePolicy:
shortID := lnwire.NewShortChanIDFromInt(msg.ChannelID)
delete(v.chanEdgeDependencies, shortID)

case *lnwire.AnnounceSignatures1:
return
Expand Down