-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(networking): add service slots to peer manager #1473
Conversation
Jenkins BuildsClick to see older builds (2)
|
aede3f2
to
1ae87f3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, check my comments
if pm.peerStore.connectedness(servicePeer.peerId) != Connected: | ||
# Attempt to dial peer. Note that service peers do not respect any backoff | ||
let conn = await pm.dialPeer(servicePeer.peerId, servicePeer.addrs, serviceProto) | ||
if conn.isNone: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if conn.isNone: | |
if conn.isNone(): |
# Log a summary of slot peers connected/notconnected | ||
let connectedServicePeers = toSeq(pm.serviceSlots.pairs).filterIt(pm.peerStore.connectedness(it[1].peerId) == Connected) | ||
if connectedServicePeers.len > 0: | ||
info "Connected service peers", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I commented in another PR, this should be set to debug
level. The abuse of the info
log level is bloating the node logs with debug information.
# Ensure we are always connected to the slotted service peers | ||
proc serviceConnectivityLoop*(pm: PeerManager) {.async.} = | ||
pm.serviceLoopUp = true | ||
info "Starting service connectivity loop" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I commented in another PR, this should be set to debug
level. The abuse of the info
log level is bloating the node logs with debug information.
@@ -335,7 +359,9 @@ proc connectToNodes*(pm: PeerManager, | |||
|
|||
# Ensures a healthy amount of connected relay peers | |||
proc relayConnectivityLoop*(pm: PeerManager) {.async.} = | |||
while true: | |||
pm.relayLoopUp = true | |||
info "Starting relay connectivity loop" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I commented in another PR, this should be set to debug
level. The abuse of the info
log level is bloating the node logs with debug information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree re keeping info
clearer. Also goes for e.g. Adding peer to service slots
above. Ideally the metrics dump every minute should be enough to convey all the info
necessary to monitor a node that's not being debugged. It should be possible to extend that with more important metrics as they get added. (I understand that many other p2p apps logs at info
level any maintenance pulses, but nwaku
has too many such to not clutter the logs IMO.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dont hold a strong opinion so can move to debug. but my rationale behind this:
both Starting relay connectivity loop
and Starting service connectivity loop
are a one time thing, and mimic "mounting relay protocol", "mounting libp2p ping protocol", "starting relay protocol", etc, which i think are quite relevant.
I also think Adding peer to service slots
is quite relevant since it shows the service peers we are using, and the impact of this i quite high (eg in store). As it is is a one time thing (taken from cli flags).
Regarding logging connected peers every few seconds, sure I can use this metrics dump every minute for that. iirc the connected peers metrics is not there (just the delta aka new connection since last cycle)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done! using debug now
servicePeers = connectedServicePeers.mapIt(it[1].addrs), | ||
respectiveProtocols = connectedServicePeers.mapIt(it[0]) | ||
|
||
let notConnectedServicePeers = toSeq(pm.serviceSlots.pairs).filterIt(pm.peerStore.connectedness(it[1].peerId) != Connected) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cognitive load of this line makes it hard to understand. Please, create procs that reduce this complexity. This one-liner is hard to debug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Magic numbers: it[1]
Where does the [1]
come from? Where does the [0]
at L414 come from?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the loop, so this code does not exist anymore.
|
||
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] = | ||
# Selects the best peer for a given protocol | ||
let peers = pm.peerStore.peers().filterIt(it.protos.contains(proto)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extract a proc: peerStore.getPeersByProto(proto: string)
waku/v2/node/waku_node.nim
Outdated
node.peerManager.serviceLoopUp = false | ||
node.peerManager.relayLoopUp = false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
☠️ ☠️ ☠️ ☠️ ☠️ ☠️
Add a stop
method to the different event loops.
|
||
await sleepAsync(ServicePeersInterval) | ||
|
||
proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The proc
name is too generic. Rename this method to something like selectPeerByProto
if pm.serviceSlots.len == 0: | ||
warn "No service peers configured, but service loop is running" | ||
for serviceProto, servicePeer in pm.serviceSlots.pairs: | ||
if pm.peerStore.connectedness(servicePeer.peerId) != Connected: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add isConnected(peerId)
method to peerStore
.
if pm.peerStore.connectedness(servicePeer.peerId) != Connected: | |
if !pm.peerStore.isConnected(servicePeer.peerId): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added isConnected function!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. In general LGTM! Ready to approve, but would like to get clarity on what the advantage of maintaining connectivity to service peers are. Please also see comments around tying loop lifecycles more formally to a clean node.stop()
shutdown and reining in the use of info
. :)
while true: | ||
pm.relayLoopUp = true | ||
info "Starting relay connectivity loop" | ||
while pm.relayLoopUp: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't this be tied to node.started
so that the all loops' existence is cleared by the formal node.stop()
method? Not sure we want to keep track via individual variables of each loop's scheduling, especially since some are started conditionally, others under all conditions, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...realising this may not be accessible from here. Perhaps then adding a stop()
method and (private) .started
bool
to the PeerManager which gets formally stopped when we do node.stop()
? I think these loops can at least be tied to the lifecycle of the peer manager and should not need to be tracked separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added start() and stop()!
@@ -335,7 +359,9 @@ proc connectToNodes*(pm: PeerManager, | |||
|
|||
# Ensures a healthy amount of connected relay peers | |||
proc relayConnectivityLoop*(pm: PeerManager) {.async.} = | |||
while true: | |||
pm.relayLoopUp = true | |||
info "Starting relay connectivity loop" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree re keeping info
clearer. Also goes for e.g. Adding peer to service slots
above. Ideally the metrics dump every minute should be enough to convey all the info
necessary to monitor a node that's not being debugged. It should be possible to extend that with more important metrics as they get added. (I understand that many other p2p apps logs at info
level any maintenance pulses, but nwaku
has too many such to not clutter the logs IMO.)
@@ -364,3 +390,53 @@ proc relayConnectivityLoop*(pm: PeerManager) {.async.} = | |||
await pm.connectToNodes(outsideBackoffPeers[0..<numPeersToConnect], WakuRelayCodec) | |||
|
|||
await sleepAsync(ConnectivityLoopInterval) | |||
|
|||
# Ensure we are always connected to the slotted service peers | |||
proc serviceConnectivityLoop*(pm: PeerManager) {.async.} = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would be the main advantage of maintaining our connections to service peers? Most service protocols are very opportunistic and may work fine with ad-hoc connections (e.g. when making a store query, filter query, etc.). In fact, we assume that client nodes making use of services may often have connectivity restrictions, so will for periods of time be unable to reach the service nodes. It is up to the application then to attempt connecting to service nodes (store, filter) to retrieve messages it may have missed during bad connectivity.
Certainly agree that we need to keep slots for service peers, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
main reasons are to i) shorten the time, since the connection is already made and ii) detect slot peers problems asap, and not wait until we need it. But I see your point.
Will remove it, but perhaps we will still have a similar behaviour? I mean, with keepAlive and one store request we will keep connected, until ofc the keepAlive fails or the connection is dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the serviceConnectivityLoop
c5c70f3
to
c93875e
Compare
changes were addressed, can we get this in? approval required. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
Related #1461
Background:
Summary of changes:
serviceSlots
, which contains a set of "slotted" or "preferred" peers for the services, or in other words, any protocol that is notWakuRelayProtocol
.WakuStoreCodec
WakuFilterCodec
WakuLightPushCodec
WakuPeerExchangeCodec
The peer manager constantly checks that we are connected to these "slot" peers (if any), and if not it attempts a connection everyx
interval. This can speed things up if at some point we need something from that peers, since we are already connected.selectPeer()
we get a different peer depending the protocol that is requested, with the following priorities.setstore
,setxxx
with the cli flags always take precendence.