From 7acf3216296df7019d6275b4cc883a43b227e8c6 Mon Sep 17 00:00:00 2001 From: Eugene Siegel Date: Fri, 1 Nov 2024 15:13:24 -0400 Subject: [PATCH] discovery+graph: track job set dependencies in ValidationBarrier Prior to this commit, it was rare, but possible that proper validation order was not adhered to when using the ValidationBarrier. This commit does two things that fix this: - removes the concept of allow / deny. Having this in place was a minor optimization and removing it makes the solution simpler. - changes the job dependency tracking to track sets of parent jobs rather than individual parent jobs. --- discovery/gossiper.go | 14 +- graph/validation_barrier.go | 398 +++++++++++++++++++++++++----------- 2 files changed, 292 insertions(+), 120 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 284cc42212..74df9d31a2 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -1470,11 +1470,14 @@ func (d *AuthenticatedGossiper) networkHandler() { // We'll set up any dependent, and wait until a free // slot for this job opens up, this allow us to not // have thousands of goroutines active. - validationBarrier.InitJobDependencies(announcement.msg) + annJobID := validationBarrier.InitJobDependencies( + announcement.msg, + ) d.wg.Add(1) go d.handleNetworkMessages( - announcement, &announcements, validationBarrier, + announcement, &announcements, + validationBarrier, annJobID, ) // The trickle timer has ticked, which indicates we should @@ -1525,7 +1528,8 @@ func (d *AuthenticatedGossiper) networkHandler() { // // NOTE: must be run as a goroutine. func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, - deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier) { + deDuped *deDupedAnnouncements, vb *graph.ValidationBarrier, + jobID graph.JobID) { defer d.wg.Done() defer vb.CompleteJob() @@ -1536,7 +1540,7 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, // If this message has an existing dependency, then we'll wait until // that has been fully validated before we proceed. - err := vb.WaitForDependants(nMsg.msg) + err := vb.WaitForParents(jobID, nMsg.msg) if err != nil { log.Debugf("Validating network message %s got err: %v", nMsg.msg.MsgType(), err) @@ -1566,7 +1570,7 @@ func (d *AuthenticatedGossiper) handleNetworkMessages(nMsg *networkMsg, // If this message had any dependencies, then we can now signal them to // continue. - vb.SignalDependants(nMsg.msg, allow) + vb.SignalDependents(nMsg.msg, jobID) // If the announcement was accepted, then add the emitted announcements // to our announce batch to be broadcast once the trickle timer ticks diff --git a/graph/validation_barrier.go b/graph/validation_barrier.go index 3cbe950ee3..7efdc268a6 100644 --- a/graph/validation_barrier.go +++ b/graph/validation_barrier.go @@ -3,29 +3,33 @@ package graph import ( "fmt" "sync" + "sync/atomic" + "github.com/lightningnetwork/lnd/fn" "github.com/lightningnetwork/lnd/lnwire" "github.com/lightningnetwork/lnd/routing/route" ) -// validationSignals contains two signals which allows the ValidationBarrier to -// communicate back to the caller whether a dependent should be processed or not -// based on whether its parent was successfully validated. Only one of these -// signals is to be used at a time. -type validationSignals struct { - // allow is the signal used to allow a dependent to be processed. - allow chan struct{} +// JobID identifies an active job in the validation barrier. It is large so +// that we don't need to worry about overflows. +type JobID uint64 - // deny is the signal used to prevent a dependent from being processed. - deny chan struct{} +// jobInfo stores job dependency info for a set of dependent gossip messages. +type jobInfo struct { + // activeParentJobIDs is the set of active parent job ids. + activeParentJobIDs fn.Set[JobID] + + // activeDependentJobs is the set of active dependent job ids. + activeDependentJobs fn.Set[JobID] } -// ValidationBarrier is a barrier used to ensure proper validation order while -// concurrently validating new announcements for channel edges, and the -// attributes of channel edges. It uses this set of maps (protected by this -// mutex) to track validation dependencies. For a given channel our -// dependencies look like this: chanAnn <- chanUp <- nodeAnn. That is we must -// validate the item on the left of the arrow before that on the right. +// ValidationBarrier is a barrier used to enforce a strict validation order +// while concurrently validating other updates for channel edges. It uses a set +// of maps to track validation dependencies. This is needed in practice because +// gossip messages for a given channel may arive in order, but then due to +// scheduling in different goroutines, may be validated in the wrong order. +// With the ValidationBarrier, the dependent update will wait until the parent +// update completes. type ValidationBarrier struct { // validationSemaphore is a channel of structs which is used as a // semaphore. Initially we'll fill this with a buffered channel of the @@ -33,23 +37,25 @@ type ValidationBarrier struct { // from this channel, then restore the value upon completion. validationSemaphore chan struct{} - // chanAnnFinSignal is map that keep track of all the pending - // ChannelAnnouncement like validation job going on. Once the job has - // been completed, the channel will be closed unblocking any - // dependants. - chanAnnFinSignal map[lnwire.ShortChannelID]*validationSignals + // jobIDMap stores the set of job ids for each channel. + // NOTE: This MUST be used with the mutex. + // NOTE: We don't need to worry about collisions between + // lnire.ShortChannelID and route.Vertex because they are of different + // length and entries therefore cannot hash to the same keys. + jobInfoMap map[any]*jobInfo + + // jobDependencies is a mapping from a child's JobID to the set of + // parent JobID that it depends on. + // NOTE: This MUST be used with the mutex. + jobDependencies map[JobID]fn.Set[JobID] - // chanEdgeDependencies tracks any channel edge updates which should - // wait until the completion of the ChannelAnnouncement before - // proceeding. This is a dependency, as we can't validate the update - // before we validate the announcement which creates the channel - // itself. - chanEdgeDependencies map[lnwire.ShortChannelID]*validationSignals + // childJobChans stores the notification channel that each child job + // listens on for parent job completions. + // NOTE: This MUST be used with the mutex. + childJobChans map[JobID]chan struct{} - // nodeAnnDependencies tracks any pending NodeAnnouncement validation - // jobs which should wait until the completion of the - // ChannelAnnouncement before proceeding. - nodeAnnDependencies map[route.Vertex]*validationSignals + // idCtr is an atomic integer that is used to assign JobIDs. + idCtr atomic.Uint64 quit chan struct{} sync.Mutex @@ -62,10 +68,10 @@ func NewValidationBarrier(numActiveReqs int, quitChan chan struct{}) *ValidationBarrier { v := &ValidationBarrier{ - chanAnnFinSignal: make(map[lnwire.ShortChannelID]*validationSignals), - chanEdgeDependencies: make(map[lnwire.ShortChannelID]*validationSignals), - nodeAnnDependencies: make(map[route.Vertex]*validationSignals), - quit: quitChan, + jobInfoMap: make(map[any]*jobInfo), + jobDependencies: make(map[JobID]fn.Set[JobID]), + childJobChans: make(map[JobID]chan struct{}), + quit: quitChan, } // We'll first initialize a set of semaphores to limit our concurrency @@ -92,7 +98,7 @@ func (v *ValidationBarrier) FetchJobSlot() { // InitJobDependencies will wait for a new job slot to become open, and then // sets up any dependent signals/trigger for the new job -func (v *ValidationBarrier) InitJobDependencies(job interface{}) { +func (v *ValidationBarrier) InitJobDependencies(job interface{}) JobID { // We'll wait for either a new slot to become open, or for the quit // channel to be closed. select { @@ -106,47 +112,100 @@ func (v *ValidationBarrier) InitJobDependencies(job interface{}) { // Once a slot is open, we'll examine the message of the job, to see if // there need to be any dependent barriers set up. switch msg := job.(type) { - - // If this is a channel announcement, then we'll need to set up den - // tenancies, as we'll need to verify this before we verify any - // ChannelUpdates for the same channel, or NodeAnnouncements of nodes - // that are involved in this channel. This goes for both the wire - // type,s and also the types that we use within the database. case *lnwire.ChannelAnnouncement1: + id := JobID(v.idCtr.Add(1)) - // We ensure that we only create a new announcement signal iff, - // one doesn't already exist, as there may be duplicate - // announcements. We'll close this signal once the - // ChannelAnnouncement has been validated. This will result in - // all the dependent jobs being unlocked so they can finish - // execution themselves. - if _, ok := v.chanAnnFinSignal[msg.ShortChannelID]; !ok { - // We'll create the channel that we close after we - // validate this announcement. All dependants will - // point to this same channel, so they'll be unblocked - // at the same time. - signals := &validationSignals{ - allow: make(chan struct{}), - deny: make(chan struct{}), - } - - v.chanAnnFinSignal[msg.ShortChannelID] = signals - v.chanEdgeDependencies[msg.ShortChannelID] = signals + v.updateOrCreateJobInfo(msg.ShortChannelID, id) + v.updateOrCreateJobInfo(route.Vertex(msg.NodeID1), id) + v.updateOrCreateJobInfo(route.Vertex(msg.NodeID2), id) - v.nodeAnnDependencies[route.Vertex(msg.NodeID1)] = signals - v.nodeAnnDependencies[route.Vertex(msg.NodeID2)] = signals - } + return id - // These other types don't have any dependants, so no further - // initialization needs to be done beyond just occupying a job slot. + // Populate the dependency mappings for the below child jobs. case *lnwire.ChannelUpdate1: - return + childJobID := JobID(v.idCtr.Add(1)) + v.populateDependencies(childJobID, msg.ShortChannelID) + + return childJobID case *lnwire.NodeAnnouncement: - // TODO(roasbeef): node ann needs to wait on existing channel updates - return + childJobID := JobID(v.idCtr.Add(1)) + v.populateDependencies(childJobID, route.Vertex(msg.NodeID)) + + return childJobID case *lnwire.AnnounceSignatures1: // TODO(roasbeef): need to wait on chan ann? - return + // - We can do the above by calling populateDependencies. For + // now, while we evaluate potential side effects, don't do + // anything with childJobID and just return it. + childJobID := JobID(v.idCtr.Add(1)) + return childJobID + + default: + // + // return err + return JobID(0) + } +} + +// updateOrCreateJobInfo modifies the set of activeParentJobs for this annID +// and updates jobInfoMap. This must only be called from InitJobDependencies. +// NOTE: MUST be called with the mutex held. +func (v *ValidationBarrier) updateOrCreateJobInfo(annID any, annJobID JobID) { + info, ok := v.jobInfoMap[annID] + if ok { + // If an entry already exists for annID, then a job related to + // it is being validated. Add to the set of parent job ids. + // This addition will only affect _later_, _child_ jobs for the + // annID. + info.activeParentJobIDs.Add(annJobID) + } else { + // No entry exists for annID, meaning that we should create + // one. + parentJobSet := fn.NewSet(annJobID) + + info := &jobInfo{ + activeParentJobIDs: parentJobSet, + activeDependentJobs: fn.NewSet[JobID](), + } + v.jobInfoMap[annID] = info + } +} + +// populateDependencies populates the job dependency mappings (i.e. which jobs +// should complete after another) for the (childJobID, annID) tuple. This must +// only be called from InitJobDependencies. +// NOTE: MUST be called with the mutex held. +func (v ValidationBarrier) populateDependencies(childJobID JobID, + annID any) { + + // If there is no entry in the jobInfoMap, we don't have to wait on any + // parent jobs to finish. + info, ok := v.jobInfoMap[annID] + if ok { + // We want to see a snapshot of active parent jobs for this + // annID that are already registered in activeParentJobIDs. The + // child job identified by childJobID can only run after these + // parent jobs have run. After grabbing the snapshot, we then + // want to persist a slice of these jobs. + + // Create the notification chan that parent jobs will send (or + // close) on when they complete. + jobChan := make(chan struct{}) + + // Add to set of activeDependentJobs for this annID. + info.activeDependentJobs.Add(childJobID) + + // Store in childJobChans. The parent jobs will fetch this chan + // to notify on. The child job will later fetch this chan to + // listen on when WaitForParents is called. + v.childJobChans[childJobID] = jobChan + + // Copy over the parent job IDs at this moment for this annID. + // This job must be processed AFTER these parent IDs. + parentJobs := info.activeParentJobIDs.Union(fn.NewSet[JobID]()) + + // Populate the jobDependencies mapping. + v.jobDependencies[childJobID] = parentJobs } } @@ -161,16 +220,21 @@ func (v *ValidationBarrier) CompleteJob() { } } -// WaitForDependants will block until any jobs that this job dependants on have -// finished executing. This allows us a graceful way to schedule goroutines -// based on any pending uncompleted dependent jobs. If this job doesn't have an -// active dependent, then this function will return immediately. -func (v *ValidationBarrier) WaitForDependants(job interface{}) error { +// WaitForParents will block until all parent job dependencies have went +// through the validation pipeline. This allows us a graceful way to run jobs +// in goroutines and still have strict ordering guarantees. If this job doesn't +// have any validation dependencies, then this function will return +// immediately. +func (v *ValidationBarrier) WaitForParents(childJobID JobID, + job interface{}) error { var ( - signals *validationSignals ok bool jobDesc string + + parentJobIDs fn.Set[JobID] + annID any + jobChan chan struct{} ) // Acquire a lock to read ValidationBarrier. @@ -180,16 +244,22 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { // Any ChannelUpdate or NodeAnnouncement jobs will need to wait on the // completion of any active ChannelAnnouncement jobs related to them. case *lnwire.ChannelUpdate1: - signals, ok = v.chanEdgeDependencies[msg.ShortChannelID] + annID = msg.ShortChannelID + + // TODO: If ok is false, we have serious issues. + parentJobIDs, ok = v.jobDependencies[childJobID] jobDesc = fmt.Sprintf("job=lnwire.ChannelUpdate, scid=%v", msg.ShortChannelID.ToUint64()) case *lnwire.NodeAnnouncement: - vertex := route.Vertex(msg.NodeID) - signals, ok = v.nodeAnnDependencies[vertex] + annID = route.Vertex(msg.NodeID) + + // TODO: If ok is false, we have serious issues. + parentJobIDs, ok = v.jobDependencies[childJobID] + jobDesc = fmt.Sprintf("job=lnwire.NodeAnnouncement, pub=%s", - vertex) + route.Vertex(msg.NodeID)) // Other types of jobs can be executed immediately, so we'll just // return directly. @@ -210,58 +280,156 @@ func (v *ValidationBarrier) WaitForDependants(job interface{}) error { log.Debugf("Waiting for dependent on %s", jobDesc) - // If we do have an active job, then we'll wait until either the signal - // is closed, or the set of jobs exits. - select { - case <-v.quit: - return NewErrf(ErrVBarrierShuttingDown, - "validation barrier shutting down") - - case <-signals.deny: - log.Debugf("Signal deny for %s", jobDesc) - return NewErrf(ErrParentValidationFailed, - "parent validation failed") + v.Lock() + jobChan, ok = v.childJobChans[childJobID] + if !ok { + v.Unlock() - case <-signals.allow: - log.Tracef("Signal allow for %s", jobDesc) + // The entry may not exist because this job does not depend on + // any parent jobs. return nil } + v.Unlock() + + for { + select { + case <-v.quit: + return NewErrf(ErrVBarrierShuttingDown, + "validation barrier shutting down") + + case <-jobChan: + // Every time this is sent on or if it's closed, a + // parent job has finished. The parent jobs have to + // also potentially close the channel because if all + // the parent jobs finish and call SignalDependents + // before the goroutine running WaitForParents has a + // chance to grab the notification chan from + // childJobChans, then the running goroutine will wait + // here for a notification forever. By having the last + // parent job close the notificiation chan, we avoid + // this issue. + + // Check and see if we have any parent jobs left. If we + // don't, we can finish up. + v.Lock() + info, found := v.jobInfoMap[annID] + if !found { + v.Unlock() + + // No parent job info found, proceed with + // validation. + return nil + } + + x := parentJobIDs.Intersect(info.activeParentJobIDs) + v.Unlock() + if x.Empty() { + // The parent jobs have all completed. We can + // proceed with validation. + return nil + } + + // If we've reached this point, we are still waiting on + // a parent job to complete. + } + } } -// SignalDependants will allow/deny any jobs that are dependent on this job that -// they can continue execution. If the job doesn't have any dependants, then -// this function sill exit immediately. -func (v *ValidationBarrier) SignalDependants(job interface{}, allow bool) { +// SignalDependents signals to any child jobs that this parent job has +// finished. +func (v *ValidationBarrier) SignalDependents(job interface{}, id JobID) { + v.Lock() defer v.Unlock() switch msg := job.(type) { - - // If we've just finished executing a ChannelAnnouncement, then we'll - // close out the signal, and remove the signal from the map of active - // ones. This will allow/deny any dependent jobs to continue execution. case *lnwire.ChannelAnnouncement1: - finSignals, ok := v.chanAnnFinSignal[msg.ShortChannelID] - if ok { - if allow { - close(finSignals.allow) - } else { - close(finSignals.deny) - } - delete(v.chanAnnFinSignal, msg.ShortChannelID) - } + // Signal to the child jobs that parent validation has + // finished. We have to call removeParentJob for each annID + // that this ChannelAnnouncement can be associated with. + v.removeParentJob(msg.ShortChannelID, id) + v.removeParentJob(route.Vertex(msg.NodeID1), id) + v.removeParentJob(route.Vertex(msg.NodeID2), id) - delete(v.chanEdgeDependencies, msg.ShortChannelID) - - // For all other job types, we'll delete the tracking entries from the - // map, as if we reach this point, then all dependants have already - // finished executing and we can proceed. case *lnwire.NodeAnnouncement: - delete(v.nodeAnnDependencies, route.Vertex(msg.NodeID)) + // Remove child job info. + v.removeChildJob(route.Vertex(msg.NodeID), id) case *lnwire.ChannelUpdate1: - delete(v.chanEdgeDependencies, msg.ShortChannelID) + // Remove child job info. + v.removeChildJob(msg.ShortChannelID, id) + return case *lnwire.AnnounceSignatures1: + // No dependency mappings are stored for AnnounceSignatures1, + // so do nothing. + return + } +} + +// removeParentJob removes parentJobID from the set of active parent jobs and +// notifies to annID's child jobs that it has finished validating. This must be +// called from SignalDependents. +// NOTE: MUST be called with a mutex. +func (v *ValidationBarrier) removeParentJob(annID any, parentJobID JobID) { + + // Remove the parentJobID from activeParentJobIDs. + jobInfo, found := v.jobInfoMap[annID] + if !found { + // TODO: Something seriously wrong has happened. return } + + jobInfo.activeParentJobIDs.Remove(parentJobID) + + lastJob := jobInfo.activeParentJobIDs.Empty() + + // Notify all dependent jobs that a parent job has completed. + for child := range jobInfo.activeDependentJobs { + notifyChan, ok := v.childJobChans[child] + if !ok { + // TODO: something seriously wrong has happened. + return + } + + // We don't want to block when sending out the signal. + select { + case notifyChan <- struct{}{}: + default: + } + + // If this is the last parent job for this annID, also close + // the channel. This is needed because it's possible that the + // parent job cleans up the job mappings before the goroutine + // handling the child job has a chance to call + // WaitForParents and catch the signal sent above. We are + // allowed to close because no other parent job will be able to + // send along the channel (or close) as we're removing the + // entry from the jobInfoMap below. + if lastJob { + close(notifyChan) + } + } + + // Remove from jobInfoMap if last job. + if lastJob { + delete(v.jobInfoMap, annID) + } +} + +// removeChildJob removes childJobID from the set of dependent jobs for annID +// and cleans up its job dependency mappings. This MUST be called from +// SignalDependents. +// NOTE: MUST be called with the mutex held. +func (v *ValidationBarrier) removeChildJob(annID any, childJobID JobID) { + // Check jobInfoMap and remove this job from activeDependentJobs. + info, ok := v.jobInfoMap[annID] + if ok { + info.activeDependentJobs.Remove(childJobID) + } + + // Remove the notification chan from childJobChans. + delete(v.childJobChans, childJobID) + + // Remove this job's dependency mapping. + delete(v.jobDependencies, childJobID) }