diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 284cc42212..74df9d31a2 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1470,11 +1470,14 @@ func (d *AuthenticatedGossiper) networkHandler() { // We'll set up any dependent, and wait until a free // slot for this job opens up, this allow us to not // have thousands of goroutines active. - validationBarrier.InitJobDependencies(announcement.msg) + annJobID := validationBarrier.InitJobDependencies( + announcement.msg, + ) d.wg.Add(1) go d.handleNetworkMessages( - announcement, &announcements, validationBarrier, + announcement, &announcements, + validationBarrier, annJobID, ) // The trickle timer has ticked, which indicates we should @@ -1525,7 +1528,8 @@ func (d *AuthenticatedGossiper) networkHandler() { // // NOTE: must be run as a goroutine. func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, - deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) { + deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier, + jobID graph.JobID) { defer d.wg.Done() defer vb.CompleteJob() @@ -1536,7 +1540,7 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, // If this message has an existing dependency, then we'll wait until // that has been fully validated before we proceed. - err := vb.WaitForDependants(nMsg.msg) + err := vb.WaitForParents(jobID, nMsg.msg) if err != nil { log.Debugf("Validating network message %s got err: %v", nMsg.msg.MsgType(), err) @@ -1566,7 +1570,7 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, // If this message had any dependencies, then we can now signal them to // continue. - vb.SignalDependants(nMsg.msg, allow) + vb.SignalDependents(nMsg.msg, jobID) // If the announcement was accepted, then add the emitted announcements // to our announce batch to be broadcast once the trickle timer ticks diff --git a/graph/validation_barrier.go b/graph/validation_barrier.go index 3cbe950ee3..7efdc268a6 100644 --- a/graph/validation_barrier.go +++ b/graph/validation_barrier.go @@ -3,29 +3,33 @@ package graph import ( "fmt" "sync" + "sync/atomic" + "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) -// validationSignals contains two signals which allows the ValidationBarrier to -// communicate back to the caller whether a dependent should be processed or not -// based on whether its parent was successfully validated. Only one of these -// signals is to be used at a time. -type validationSignals struct { - // allow is the signal used to allow a dependent to be processed. - allow chan struct{} +// JobID identifies an active job in the validation barrier. It is large so +// that we don't need to worry about overflows. +type JobID uint64 - // deny is the signal used to prevent a dependent from being processed. - deny chan struct{} +// jobInfo stores job dependency info for a set of dependent gossip messages. +type jobInfo struct { + // activeParentJobIDs is the set of active parent job ids. + activeParentJobIDs fn.Set[JobID] + + // activeDependentJobs is the set of active dependent job ids. + activeDependentJobs fn.Set[JobID] } -// ValidationBarrier is a barrier used to ensure proper validation order while -// concurrently validating new announcements for channel edges, and the -// attributes of channel edges. It uses this set of maps (protected by this -// mutex) to track validation dependencies. For a given channel our -// dependencies look like this: chanAnn <- chanUp <- nodeAnn. That is we must -// validate the item on the left of the arrow before that on the right. +// ValidationBarrier is a barrier used to enforce a strict validation order +// while concurrently validating other updates for channel edges. It uses a set +// of maps to track validation dependencies. This is needed in practice because +// gossip messages for a given channel may arive in order, but then due to +// scheduling in different goroutines, may be validated in the wrong order. +// With the ValidationBarrier, the dependent update will wait until the parent +// update completes. type ValidationBarrier struct { // validationSemaphore is a channel of structs which is used as a // semaphore. Initially we'll fill this with a buffered channel of the @@ -33,23 +37,25 @@ type ValidationBarrier struct { // from this channel, then restore the value upon completion. validationSemaphore chan struct{} - // chanAnnFinSignal is map that keep track of all the pending - // ChannelAnnouncement like validation job going on. Once the job has - // been completed, the channel will be closed unblocking any - // dependants. - chanAnnFinSignal map[lnwire.ShortChannelID]*validationSignals + // jobIDMap stores the set of job ids for each channel. + // NOTE: This MUST be used with the mutex. + // NOTE: We don't need to worry about collisions between + // lnire.ShortChannelID and route.Vertex because they are of different + // length and entries therefore cannot hash to the same keys. + jobInfoMap map[any]*jobInfo + + // jobDependencies is a mapping from a child's JobID to the set of + // parent JobID that it depends on. + // NOTE: This MUST be used with the mutex. + jobDependencies map[JobID]fn.Set[JobID] - // chanEdgeDependencies tracks any channel edge updates which should - // wait until the completion of the ChannelAnnouncement before - // proceeding. This is a dependency, as we can't validate the update - // before we validate the announcement which creates the channel - // itself. - chanEdgeDependencies map[lnwire.ShortChannelID]*validationSignals + // childJobChans stores the notification channel that each child job + // listens on for parent job completions. + // NOTE: This MUST be used with the mutex. + childJobChans map[JobID]chan struct{} - // nodeAnnDependencies tracks any pending NodeAnnouncement validation - // jobs which should wait until the completion of the - // ChannelAnnouncement before proceeding. - nodeAnnDependencies map[route.Vertex]*validationSignals + // idCtr is an atomic integer that is used to assign JobIDs. + idCtr atomic.Uint64 quit chan struct{} sync.Mutex @@ -62,10 +68,10 @@ func NewValidationBarrier(numActiveReqs int, quitChan chan struct{}) *ValidationBarrier { v := &ValidationBarrier{ - chanAnnFinSignal: make(map[lnwire.ShortChannelID]*validationSignals), - chanEdgeDependencies: make(map[lnwire.ShortChannelID]*validationSignals), - nodeAnnDependencies: make(map[route.Vertex]*validationSignals), - quit: quitChan, + jobInfoMap: make(map[any]*jobInfo), + jobDependencies: make(map[JobID]fn.Set[JobID]), + childJobChans: make(map[JobID]chan struct{}), + quit: quitChan, } // We'll first initialize a set of semaphores to limit our concurrency @@ -92,7 +98,7 @@ func (v *ValidationBarrier) FetchJobSlot() { // InitJobDependencies will wait for a new job slot to become open, and then // sets up any dependent signals/trigger for the new job -func (v *ValidationBarrier) InitJobDependencies(job interface{}) { +func (v *ValidationBarrier) InitJobDependencies(job interface{}) JobID { // We'll wait for either a new slot to become open, or for the quit // channel to be closed. select { @@ -106,47 +112,100 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) { // Once a slot is open, we'll examine the message of the job, to see if // there need to be any dependent barriers set up. switch msg := job.(type) { - - // If this is a channel announcement, then we'll need to set up den - // tenancies, as we'll need to verify this before we verify any - // ChannelUpdates for the same channel, or NodeAnnouncements of nodes - // that are involved in this channel. This goes for both the wire - // type,s and also the types that we use within the database. case *lnwire.ChannelAnnouncement1: + id := JobID(v.idCtr.Add(1)) - // We ensure that we only create a new announcement signal iff, - // one doesn't already exist, as there may be duplicate - // announcements. We'll close this signal once the - // ChannelAnnouncement has been validated. This will result in - // all the dependent jobs being unlocked so they can finish - // execution themselves. - if _, ok := v.chanAnnFinSignal[msg.ShortChannelID]; !ok { - // We'll create the channel that we close after we - // validate this announcement. All dependants will - // point to this same channel, so they'll be unblocked - // at the same time. - signals := &validationSignals{ - allow: make(chan struct{}), - deny: make(chan struct{}), - } - - v.chanAnnFinSignal[msg.ShortChannelID] = signals - v.chanEdgeDependencies[msg.ShortChannelID] = signals + v.updateOrCreateJobInfo(msg.ShortChannelID, id) + v.updateOrCreateJobInfo(route.Vertex(msg.NodeID1), id) + v.updateOrCreateJobInfo(route.Vertex(msg.NodeID2), id) - v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals - v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals - } + return id - // These other types don't have any dependants, so no further - // initialization needs to be done beyond just occupying a job slot. + // Populate the dependency mappings for the below child jobs. case *lnwire.ChannelUpdate1: - return + childJobID := JobID(v.idCtr.Add(1)) + v.populateDependencies(childJobID, msg.ShortChannelID) + + return childJobID case *lnwire.NodeAnnouncement: - // TODO(roasbeef): node ann needs to wait on existing channel updates - return + childJobID := JobID(v.idCtr.Add(1)) + v.populateDependencies(childJobID, route.Vertex(msg.NodeID)) + + return childJobID case *lnwire.AnnounceSignatures1: // TODO(roasbeef): need to wait on chan ann? - return + // - We can do the above by calling populateDependencies. For + // now, while we evaluate potential side effects, don't do + // anything with childJobID and just return it. + childJobID := JobID(v.idCtr.Add(1)) + return childJobID + + default: + // + // return err + return JobID(0) + } +} + +// updateOrCreateJobInfo modifies the set of activeParentJobs for this annID +// and updates jobInfoMap. This must only be called from InitJobDependencies. +// NOTE: MUST be called with the mutex held. +func (v *ValidationBarrier) updateOrCreateJobInfo(annID any, annJobID JobID) { + info, ok := v.jobInfoMap[annID] + if ok { + // If an entry already exists for annID, then a job related to + // it is being validated. Add to the set of parent job ids. + // This addition will only affect _later_, _child_ jobs for the + // annID. + info.activeParentJobIDs.Add(annJobID) + } else { + // No entry exists for annID, meaning that we should create + // one. + parentJobSet := fn.NewSet(annJobID) + + info := &jobInfo{ + activeParentJobIDs: parentJobSet, + activeDependentJobs: fn.NewSet[JobID](), + } + v.jobInfoMap[annID] = info + } +} + +// populateDependencies populates the job dependency mappings (i.e. which jobs +// should complete after another) for the (childJobID, annID) tuple. This must +// only be called from InitJobDependencies. +// NOTE: MUST be called with the mutex held. +func (v ValidationBarrier) populateDependencies(childJobID JobID, + annID any) { + + // If there is no entry in the jobInfoMap, we don't have to wait on any + // parent jobs to finish. + info, ok := v.jobInfoMap[annID] + if ok { + // We want to see a snapshot of active parent jobs for this + // annID that are already registered in activeParentJobIDs. The + // child job identified by childJobID can only run after these + // parent jobs have run. After grabbing the snapshot, we then + // want to persist a slice of these jobs. + + // Create the notification chan that parent jobs will send (or + // close) on when they complete. + jobChan := make(chan struct{}) + + // Add to set of activeDependentJobs for this annID. + info.activeDependentJobs.Add(childJobID) + + // Store in childJobChans. The parent jobs will fetch this chan + // to notify on. The child job will later fetch this chan to + // listen on when WaitForParents is called. + v.childJobChans[childJobID] = jobChan + + // Copy over the parent job IDs at this moment for this annID. + // This job must be processed AFTER these parent IDs. + parentJobs := info.activeParentJobIDs.Union(fn.NewSet[JobID]()) + + // Populate the jobDependencies mapping. + v.jobDependencies[childJobID] = parentJobs } } @@ -161,16 +220,21 @@ func (v *ValidationBarrier) CompleteJob() { } } -// WaitForDependants will block until any jobs that this job dependants on have -// finished executing. This allows us a graceful way to schedule goroutines -// based on any pending uncompleted dependent jobs. If this job doesn't have an -// active dependent, then this function will return immediately. -func (v *ValidationBarrier) WaitForDependants(job interface{}) error { +// WaitForParents will block until all parent job dependencies have went +// through the validation pipeline. This allows us a graceful way to run jobs +// in goroutines and still have strict ordering guarantees. If this job doesn't +// have any validation dependencies, then this function will return +// immediately. +func (v *ValidationBarrier) WaitForParents(childJobID JobID, + job interface{}) error { var ( - signals *validationSignals ok bool jobDesc string + + parentJobIDs fn.Set[JobID] + annID any + jobChan chan struct{} ) // Acquire a lock to read ValidationBarrier. @@ -180,16 +244,22 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the // completion of any active ChannelAnnouncement jobs related to them. case *lnwire.ChannelUpdate1: - signals, ok = v.chanEdgeDependencies[msg.ShortChannelID] + annID = msg.ShortChannelID + + // TODO: If ok is false, we have serious issues. + parentJobIDs, ok = v.jobDependencies[childJobID] jobDesc = fmt.Sprintf("job=lnwire.ChannelUpdate, scid=%v", msg.ShortChannelID.ToUint64()) case *lnwire.NodeAnnouncement: - vertex := route.Vertex(msg.NodeID) - signals, ok = v.nodeAnnDependencies[vertex] + annID = route.Vertex(msg.NodeID) + + // TODO: If ok is false, we have serious issues. + parentJobIDs, ok = v.jobDependencies[childJobID] + jobDesc = fmt.Sprintf("job=lnwire.NodeAnnouncement, pub=%s", - vertex) + route.Vertex(msg.NodeID)) // Other types of jobs can be executed immediately, so we'll just // return directly. @@ -210,58 +280,156 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { log.Debugf("Waiting for dependent on %s", jobDesc) - // If we do have an active job, then we'll wait until either the signal - // is closed, or the set of jobs exits. - select { - case <-v.quit: - return NewErrf(ErrVBarrierShuttingDown, - "validation barrier shutting down") - - case <-signals.deny: - log.Debugf("Signal deny for %s", jobDesc) - return NewErrf(ErrParentValidationFailed, - "parent validation failed") + v.Lock() + jobChan, ok = v.childJobChans[childJobID] + if !ok { + v.Unlock() - case <-signals.allow: - log.Tracef("Signal allow for %s", jobDesc) + // The entry may not exist because this job does not depend on + // any parent jobs. return nil } + v.Unlock() + + for { + select { + case <-v.quit: + return NewErrf(ErrVBarrierShuttingDown, + "validation barrier shutting down") + + case <-jobChan: + // Every time this is sent on or if it's closed, a + // parent job has finished. The parent jobs have to + // also potentially close the channel because if all + // the parent jobs finish and call SignalDependents + // before the goroutine running WaitForParents has a + // chance to grab the notification chan from + // childJobChans, then the running goroutine will wait + // here for a notification forever. By having the last + // parent job close the notificiation chan, we avoid + // this issue. + + // Check and see if we have any parent jobs left. If we + // don't, we can finish up. + v.Lock() + info, found := v.jobInfoMap[annID] + if !found { + v.Unlock() + + // No parent job info found, proceed with + // validation. + return nil + } + + x := parentJobIDs.Intersect(info.activeParentJobIDs) + v.Unlock() + if x.Empty() { + // The parent jobs have all completed. We can + // proceed with validation. + return nil + } + + // If we've reached this point, we are still waiting on + // a parent job to complete. + } + } } -// SignalDependants will allow/deny any jobs that are dependent on this job that -// they can continue execution. If the job doesn't have any dependants, then -// this function sill exit immediately. -func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) { +// SignalDependents signals to any child jobs that this parent job has +// finished. +func (v *ValidationBarrier) SignalDependents(job interface{}, id JobID) { + v.Lock() defer v.Unlock() switch msg := job.(type) { - - // If we've just finished executing a ChannelAnnouncement, then we'll - // close out the signal, and remove the signal from the map of active - // ones. This will allow/deny any dependent jobs to continue execution. case *lnwire.ChannelAnnouncement1: - finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID] - if ok { - if allow { - close(finSignals.allow) - } else { - close(finSignals.deny) - } - delete(v.chanAnnFinSignal, msg.ShortChannelID) - } + // Signal to the child jobs that parent validation has + // finished. We have to call removeParentJob for each annID + // that this ChannelAnnouncement can be associated with. + v.removeParentJob(msg.ShortChannelID, id) + v.removeParentJob(route.Vertex(msg.NodeID1), id) + v.removeParentJob(route.Vertex(msg.NodeID2), id) - delete(v.chanEdgeDependencies, msg.ShortChannelID) - - // For all other job types, we'll delete the tracking entries from the - // map, as if we reach this point, then all dependants have already - // finished executing and we can proceed. case *lnwire.NodeAnnouncement: - delete(v.nodeAnnDependencies, route.Vertex(msg.NodeID)) + // Remove child job info. + v.removeChildJob(route.Vertex(msg.NodeID), id) case *lnwire.ChannelUpdate1: - delete(v.chanEdgeDependencies, msg.ShortChannelID) + // Remove child job info. + v.removeChildJob(msg.ShortChannelID, id) + return case *lnwire.AnnounceSignatures1: + // No dependency mappings are stored for AnnounceSignatures1, + // so do nothing. + return + } +} + +// removeParentJob removes parentJobID from the set of active parent jobs and +// notifies to annID's child jobs that it has finished validating. This must be +// called from SignalDependents. +// NOTE: MUST be called with a mutex. +func (v *ValidationBarrier) removeParentJob(annID any, parentJobID JobID) { + + // Remove the parentJobID from activeParentJobIDs. + jobInfo, found := v.jobInfoMap[annID] + if !found { + // TODO: Something seriously wrong has happened. return } + + jobInfo.activeParentJobIDs.Remove(parentJobID) + + lastJob := jobInfo.activeParentJobIDs.Empty() + + // Notify all dependent jobs that a parent job has completed. + for child := range jobInfo.activeDependentJobs { + notifyChan, ok := v.childJobChans[child] + if !ok { + // TODO: something seriously wrong has happened. + return + } + + // We don't want to block when sending out the signal. + select { + case notifyChan <- struct{}{}: + default: + } + + // If this is the last parent job for this annID, also close + // the channel. This is needed because it's possible that the + // parent job cleans up the job mappings before the goroutine + // handling the child job has a chance to call + // WaitForParents and catch the signal sent above. We are + // allowed to close because no other parent job will be able to + // send along the channel (or close) as we're removing the + // entry from the jobInfoMap below. + if lastJob { + close(notifyChan) + } + } + + // Remove from jobInfoMap if last job. + if lastJob { + delete(v.jobInfoMap, annID) + } +} + +// removeChildJob removes childJobID from the set of dependent jobs for annID +// and cleans up its job dependency mappings. This MUST be called from +// SignalDependents. +// NOTE: MUST be called with the mutex held. +func (v *ValidationBarrier) removeChildJob(annID any, childJobID JobID) { + // Check jobInfoMap and remove this job from activeDependentJobs. + info, ok := v.jobInfoMap[annID] + if ok { + info.activeDependentJobs.Remove(childJobID) + } + + // Remove the notification chan from childJobChans. + delete(v.childJobChans, childJobID) + + // Remove this job's dependency mapping. + delete(v.jobDependencies, childJobID) }