-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
multi: implement new query based channel graph synchronization #1106
Conversation
lnwire/query_short_chan_ids.go
Outdated
) | ||
|
||
// ErrUnknownShortChanIDEncoding is a parametrized error that indicates that we | ||
// came across an unkonw short channel ID encoding, and therefore were unable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/unkonw/unknown
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
discovery/syncer.go
Outdated
} | ||
|
||
// synchronizeChanIDs is called by the channelGraphSyncer when we need to query | ||
// the remote peer for its known set of channel ID"s within a particular block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: IDs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
discovery/syncer_test.go
Outdated
|
||
case msgs := <-msgChan: | ||
if len(msgs) != 3 { | ||
t.Fatalf("expected 2 messages instead got %v "+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expected 3
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
discovery/gossiper.go
Outdated
@@ -1596,7 +1754,7 @@ func (d *AuthenticatedGossiper) processNetworkAnnouncement(nMsg *networkMsg) []n | |||
// We'll ignore any channel announcements that target any chain | |||
// other than the set of chains we know of. | |||
if !bytes.Equal(msg.ChainHash[:], d.cfg.ChainHash[:]) { | |||
log.Errorf("Ignoring ChannelUpdate from "+ | |||
log.Error("Ignoring ChannelUpdate from "+ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
intended change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, reverted!
func (c *ChannelGraph) ChanUpdatesInHorizon(startTime, endTime time.Time) ([]ChannelEdge, error) { | ||
var edgesInHorizon []ChannelEdge | ||
|
||
err := c.db.View(func(tx *bolt.Tx) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you break this up into helper functions? IMO, ideally most functions should fit on a single screen or else it becomes difficult to hold the whole thing in one's head at the same time. Also unit testing becomes much easier.
I realize that edgesInHorizon is updated deep down, but I think that could be propagated up through the helper functions without much overhead.
I'm imagining that this entire anonymous function could be pulled out such that you end up with something like
err := c.db.View(func(tx *bolt.Tx) error {
edgesInHorizon, pErr = processEdges(tx)
return pErr
}
and then within processEdges (or whatever you want to call it) you'd extract the inner portion of the for loop into a processEdge function.
What do you think? If this is considered useful, there are a few other places in this PR were it applies.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd say yes, if the processing function were used in other locations in the package/file. Atm, they aren't. The general code style in the project is to favor a clear control flow over excessive function modularization. In this case, you can read a single function and grok the logic rather than following around several other newly introduced functions.
Not sure what screen you use for viewing code, but it fits on mine ;)
|
||
// We'll run through the set of chanIDs and collate only the | ||
// set of channel that are unable to be found within our db. | ||
var cidBytes [8]byte |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This constant of "8" keeps showing up. Might want to create a constant for it to make it more clear what it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started to add a constant everywhere, but IMO it started to get in the way of the control flow throughout the file. 8 is the size of an unsigned integer. In each of the locations, the variable names and comments itself are enough context to understand that 8 is the size of an unsigned integer.
channeldb/graph.go
Outdated
if edgeBytes := edges.Get(edgeKey[:]); edgeBytes != nil { | ||
// In order to delete the old entry, we'll need to obtain the | ||
// *prior* update time in order to delete it. | ||
updateEnd := 33 + (8 * 3) + 2 + 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you definitely want a constant for this value and a comment explaining what's going on. Maybe sub-constants too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went with adding additional detail to the constant here. Decided against a constant as it's only used in a single location in the file atm.
// query and are waiting for the final response from the remote | ||
// peer before we perform a diff to see with channels they know | ||
// of that we don't. | ||
case waitingQueryRangeReply: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use helper functions to reduce the amount of code that is in these case statements? Basically I'm imagining that for each top-level case, there would be one helper function that would contain almost all of the code this is currently contained in the case.
This would
- make channelGraphSyncer considerably shorter (easier to fit into one's head)
- create a clear separation between high-level details and low-level details (which would be contained in the helper functions)
- make unit testing way easier, cleaner, and more granular
- decrease vertical line noise by decreasing line-wrapping for over-indentation
Note that the helper functions will probably need multiple return values to allow for the "return", "continue", and standard case endings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of "transition" logic is already located in distinct functions. The logic in each of the case statements simply calls out to those, and handles the redundant case in each state. As is, the entire file already have a comprehensive set of unit tests for both the transition functions, and also the control flow interaction between two syncer instances.
Generally, in the codebase, we favor a clear control flow over excessive modularization. Vertical space isn't as much of an issue following this philosophy.
discovery/syncer.go
Outdated
|
||
// We'll give a few hours room in our update | ||
// horizon to ensure we don't miss any newer | ||
// itesm. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
items*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nits mostly, otherwise utACK 👍
discovery/syncer.go
Outdated
// this interface to determine if we're already in sync, or need to request | ||
// some new information from them. | ||
type ChannelGraphTimeSeries interface { | ||
// HighestChanID should return is the channel ID of the channel we know |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: is
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
discovery/syncer.go
Outdated
// HighestChanID should return is the channel ID of the channel we know | ||
// of that's furthest in the target chain. This channel will have a | ||
// block height that's close to the current tip of the main chain as we | ||
// know it. We'll use this to start our QueryChannelRange dance with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🕺
discovery/syncer.go
Outdated
// remote peer's QueryChannelRange message. | ||
FilterChannelRange(chain chainhash.Hash, | ||
startHeight, endHeight uint32) ([]lnwire.ShortChannelID, error) | ||
// FetchChanAnns returns a full set of channel announcements as well as |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
discovery/syncer.go
Outdated
} | ||
|
||
// Start starts the gossipSyncer and any goroutines that it needs to carry out | ||
// it duties. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: its
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
discovery/syncer.go
Outdated
log.Debugf("Starting gossipSyncer(%x)", g.peerPub[:]) | ||
|
||
g.wg.Add(1) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: extra newline
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
discovery/syncer.go
Outdated
"and %v blocks after", g.peerPub[:], startHeight, | ||
math.MaxUint32-startHeight) | ||
|
||
// Finally, we'll craft the channel range query, using out starting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/out/our
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
discovery/gossiper.go
Outdated
@@ -480,6 +501,15 @@ type msgWithSenders struct { | |||
senders map[routing.Vertex]struct{} | |||
} | |||
|
|||
// mergeSyncerMap is used to merge the set of senders of a particular message | |||
// with peers that we have an active gossipSyncer with. We do this to ensure | |||
// that we don't broadcast messages to any peers that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfinished comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
chan_series.go
Outdated
return nil, err | ||
} | ||
for _, channel := range chansInHorizon { | ||
if channel.Info.AuthProof == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add a comment to this check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@@ -202,6 +202,8 @@ type config struct { | |||
Color string `long:"color" description:"The color of the node in hex format (i.e. '#3399FF'). Used to customize node appearance in intelligence services"` | |||
MinChanSize int64 `long:"minchansize" description:"The smallest channel size (in satoshis) that we should accept. Incoming channels smaller than this will be rejected"` | |||
|
|||
NoChanUpdates bool `long:"nochanupdates" description:"If specified, lnd will not request real-time channel updates from connected peers. This option should be used by routing nodes to save bandwidth."` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since there might be routing nodes that wish to send active payments, I think this should instead say "nodes not not wishing to pay invoices" or similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well they can still send payments, they make just incur an extra routing attempt if they have a stale channel update.
49b9b1e
to
b86e621
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have not had a chance to test, but gave a pretty thorough review and seems solid. Like halseth, just a couple nits. Fantastic work! 🔥
discovery/syncer.go
Outdated
return "waitingQueryChanReply" | ||
|
||
case chansSynced: | ||
return "syncingChans" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/synchingChans/chansSynced/
chan_series.go
Outdated
} | ||
|
||
// FilterChannelRange returns the set of channels that we created between the | ||
// start height and the end height. We'll use this to to a remote peer's |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to to -> to respond to
chan_series.go
Outdated
if err != nil { | ||
return nil, err | ||
} | ||
updates = append(updates, &lnwire.NodeAnnouncement{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this section looks similar to makeNodeAnn
below, is there a reason this doesn't use it? maybe has to do with alias err handling?
channeldb/graph.go
Outdated
cursor := edgeIndex.Cursor() | ||
|
||
// We'll now iterate through the database, and find each | ||
// channel ID that redoes within the specified range. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
redoes?
channeldb/graph.go
Outdated
// channel ID that redoes within the specified range. | ||
var cid uint64 | ||
for k, _ := cursor.Seek(chanIDStart[:]); k != nil && | ||
bytes.Compare(k, chanIDEnd[:]) <= 0; k, _ = cursor.Next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this scan intended to be inclusive or exclusive wrt. to the end block? If inclusive, should we set max values for TxIndex
and TxPosition
in chanIDEnd
? It seems the current behavior is exclusive, though the predicate could include the end block's coinbase txn (I think?).
discovery/gossiper.go
Outdated
}, | ||
}) | ||
copy(syncer.peerPub[:], peer.SerializeCompressed()) | ||
d.peerSyncers[routing.NewVertex(peer)] = syncer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check if we already have a syncer? This is spawned in a go routine, so maybe not a bad idea
} | ||
|
||
// Otherwise, it's the remote peer performing a | ||
// query, which we'll attempt to deploy to. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deploy -> reply?
4275976
to
a9981a0
Compare
In this commit, we add recognition of the data loss protected feature bit. We already implement the full feature set, but then never added the bit to our set of known features.
…peer In this commit, introduce a new struct, the gossipSyncer. The role of this struct is to encapsulate the state machine required to implement the new gossip query range feature recently added to the spec. With this change, each peer that knows of this new feature will have a new goroutine that will be managed by the gossiper. Once created and started, the gossipSyncer will start to progress through each possible state, finally ending at the chansSynced stage. In this stage, it has synchronized state with the remote peer, and is simply awaiting any new messages from the gossiper to send directly to the peer. Each message will only be sent if the remote peer actually has a set update horizon, and the message isn't before or after that horizon. A set of unit tests has been added to ensure that two state machines properly terminate and synchronize channel state.
In this commit, we update the logic in the AuthenticatedGossiper to ensure that can properly create, manage, and dispatch messages to any gossipSyncer instances created by the server. With this set of changes, the gossip now has complete knowledge of the current set of peers we're conneted to that support the new range queries. Upon initial connect, InitSyncState will be called by the server if the new peer understands the set of gossip queries. This will then create a new spot in the peerSyncers map for the new syncer. For each new gossip query message, we'll then attempt to dispatch the message directly to the gossip syncer. When the peer has disconnected, we then expect the server to call the PruneSyncState method which will allow us to free up the resources. Finally, when we go to broadcast messages, we'll send the messages directly to the peers that have gossipSyncer instances active, so they can properly be filtered out. For those that don't we'll broadcast directly, ensuring we skip *all* peers that have an active gossip syncer.
…lGraphTimeSeries interface In this commit, we add a series of methods, and a new database index that we'll use to implement the new discovery.ChannelGraphTimeSeries interface interface. The primary change is that we now maintain two new indexes tracking the last update time for each node, and the last update time for each edge. These two indexes allow us to implement the NodeUpdatesInHorizon and ChanUpdatesInHorizon methods. The remaining methods added simply utilize the existing database indexes to allow us to respond to any peer gossip range queries. A set of new unit tests has been added to exercise the added logic.
In this commit, we add a new database migration required to update old database to the version of the database that tracks the update index for the nodes and edge policies. The migration is straight forward, we simply need to populate the new indexes for the all the nodes, and then all the edges.
…es interface In this commit, we create a new concrete implementation for the new discovery.ChannelGraphTimeSeries interface. We also export the createChannelAnnouncement method to allow the chanSeries struct to re-use the existing code for creating wire messages from the database structs.
In this commit, we add a new command line option to allow (ideally routing nodes) to disable receiving up-to-date channel updates all together. This may be desired as it'll allow routing nodes to save on bandwidth as they don't need the channel updates to passively forward HTLCs. In the scenario that they _do_ want to update their routing policies, the first failed HTLC due to policy inconsistency will then allow the routing node to propagate the new update to potential nodes trying to route through it.
…ge update index In this commit, we ensure that all indexes for a particular channel have any relevant keys deleted once a channel is removed from the database. Before this commit, if we pruned a channel due to closing, then its entry in the channel update index would ever be removed.
This new method allows outside callers to sample the current state of the gossipSyncer in a concurrent-safe manner. In order to achieve this, we now only modify the g.state variable atomically.
In this commit, we extend the AuthenticatedGossiper to take advantage of the new query features in the case that it gets a channel update w/o first receiving the full channel announcement. If this happens, we'll attempt to find a syncer that's fully synced, and request the channel announcement from it.
Just pushed out a new version rebase on top of the current master (no conflicts, yay!). Will do some local testing before merging through to master. It seems the |
In this commit we fix an existing bug caused by a scheduling race condition. We'll now ensure that if we get a gossip message from a peer before we create an instance for it, then we create one on the spot so we can service the message. Before this commit, we would drop the first message, and therefore never sync up with the peer at all, causing them to miss channel announcements.
In this commit, we implement the new gossip query features recently added to the specification. With this implemented, once most peers are updated, we can skip the extremely wasteful initial routing table dump on initial connect. We also add a new command line flag that allows nodes to opt out of real time channel updates all together. This is desirable, as passive routing nodes don't really need to be receiving any of the updates, and can save bandwidth by not receiving them at all.
One follow up to this PR we might want to consider (for greater savings) is to ensure that we only engage in a single channel graph sync outstanding. This would allow use to fully sync up using a single peer, then diff our state against the next peer using the new information gained, repeating until we think we're synced. This would only apply for the first few (3 or so) peers that we connect to though.
NOTE: This PR introduces a new database migration to populate the new indexes added for existing node deployments.
lnwire
In this PR, we add recognition of the data loss protected feature bit. We already implement the full feature set, but then never added the bit to our set of known features. Setting this will allow all the
eclair
mobile nodes on the network to recover their settled channel balances in the case of partial data loss.Additionally, we both bits for the new gossip query features along with defining all the new message types. Note that at this point, we haven't yet added the
zlib
short channel ID compression. We'll add that in a follow up PR to not allow this PR to swell anymore.discovery
We, introduce a new struct, the gossipSyncer. The role of this struct is to encapsulate the state machine required to implement the new gossip query range feature recently added to the spec. With this change, each peer that knows of this new feature will have a new goroutine that will be managed by the gossiper.
Once created and started, the gossipSyncer will start to progress through each possible state, finally ending at the chansSynced stage. In this stage, it has synchronized state with the remote peer, and is simply awaiting any new messages from the gossiper to send directly to the peer. Each message will only be sent if the remote peer actually has a set update horizon, and the message isn't before or after that horizon. A set of unit tests has been added to ensure that two state machines properly terminate and synchronize channel state.
The gossip now has complete knowledge of the current set of peers we're connected to that support the new range queries. Upon initial connect, InitSyncState will be called by the server if the new peer understands the set of gossip queries. This will then create a new spot in the peerSyncers map for the new syncer. For each new gossip query message, we'll then attempt to dispatch the message directly to the gossip syncer. When the peer has disconnected, we then expect the server to call the PruneSyncState method which will allow us to free up the resources.
Finally, when we go to broadcast messages, we'll send the messages directly to the peers that have gossipSyncer instances active, so they can properly be filtered out. For those that don't we'll broadcast directly, ensuring we skip all peers that have an active gossip syncer.
channeldb
We add a series of methods, and a new database index that we'll use to implement the new
discovery.ChannelGraphTimeSeries
interface interface. The primary change is that we now maintain two new indexes tracking the last update time for each node, and the last update time for each edge. These two indexes allow us to implement theNodeUpdatesInHorizon
andChanUpdatesInHorizon
methods. The remaining methods added simply utilize the existing database indexes to allow us to respond to any peer gossip range queries.We also add a new database migration required to update old database to the version of the database that tracks the update index for the nodes and edge policies. The migration is straight forward, we
simply need to populate the new indexes for the all the nodes, and then all the edges.
server
Finally, the server has been updated to signal the new feature bits that we understand, and also to decide if we should create a new gossip syncer, or just execute a full table dump upon initial connect.
Fixes #910