Skip to content

Commit

Permalink
refactor(networking): unify peer data models, remove StoredInfo (#1597)
Browse files Browse the repository at this point in the history
  • Loading branch information
alrevuelta authored Mar 9, 2023
1 parent 7639d8d commit 622ec27
Show file tree
Hide file tree
Showing 10 changed files with 191 additions and 176 deletions.
14 changes: 7 additions & 7 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ procSuite "Peer Manager":
node.peerManager.peerStore.peers().len == 3
node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
it.addrs.contains(filterLoc) and
it.protos.contains(WakuFilterCodec))
it.protocols.contains(WakuFilterCodec))
node.peerManager.peerStore.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
it.addrs.contains(swapLoc) and
it.protos.contains(WakuSwapCodec))
it.protocols.contains(WakuSwapCodec))
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
it.addrs.contains(storeLoc) and
it.protos.contains(WakuStoreCodec))
it.protocols.contains(WakuStoreCodec))

await node.stop()

Expand Down Expand Up @@ -270,7 +270,7 @@ procSuite "Peer Manager":
# Currently connected to node2
node1.peerManager.peerStore.peers().len == 1
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peerStore.peers().anyIt(it.protos.contains(node2.wakuRelay.codec))
node1.peerManager.peerStore.peers().anyIt(it.protocols.contains(node2.wakuRelay.codec))
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected

# Simulate restart by initialising a new node using the same storage
Expand All @@ -286,7 +286,7 @@ procSuite "Peer Manager":
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(betaCodec))
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected

await node3.start() # This should trigger a reconnect
Expand All @@ -295,8 +295,8 @@ procSuite "Peer Manager":
# Reconnected to node2 after "restart"
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(stableCodec))
node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(betaCodec))
node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(stableCodec))
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected

await allFutures([node1.stop(), node2.stop(), node3.stop()])
Expand Down
70 changes: 46 additions & 24 deletions tests/v2/test_peer_storage.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,64 +22,86 @@ suite "Peer Storage":
peerKey = generateEcdsaKey()
peer = PeerInfo.new(peerKey, @[peerLoc])
peerProto = "/waku/2/default-waku/codec"
stored = StoredInfo(peerId: peer.peerId, addrs: @[peerLoc], protos: @[peerProto], publicKey: peerKey.getPublicKey().tryGet())
conn = Connectedness.CanConnect
connectedness = Connectedness.CanConnect
disconn = 999999
stored = RemotePeerInfo(
peerId: peer.peerId,
addrs: @[peerLoc],
protocols: @[peerProto],
publicKey: peerKey.getPublicKey().tryGet(),
connectedness: connectedness,
disconnectTime: disconn)

defer: storage.close()

# Test insert and retrieve

discard storage.put(peer.peerId, stored, conn, disconn)
require storage.put(peer.peerId, stored, connectedness, disconn).isOk

var responseCount = 0
# flags to check data matches what was stored (default true)
var peerIdFlag, storedInfoFlag, connectednessFlag, disconnectFlag: bool

proc data(peerId: PeerID, storedInfo: StoredInfo,
# Fetched variables from callback
var resPeerId: PeerId
var resStoredInfo: RemotePeerInfo
var resConnectedness: Connectedness
var resDisconnect: int64

proc data(peerId: PeerID, storedInfo: RemotePeerInfo,
connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} =
responseCount += 1

# Note: cannot use `check` within `{.raises: [Defect].}` block
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
# These flags are checked outside this block.
peerIdFlag = peerId == peer.peerId
storedInfoFlag = storedInfo == stored
connectednessFlag = connectedness == conn
disconnectFlag = disconnectTime == disconn
resPeerId = peerId
resStoredInfo = storedInfo
resConnectedness = connectedness
resDisconnect = disconnectTime

let res = storage.getAll(data)

check:
res.isErr == false
responseCount == 1
peerIdFlag
storedInfoFlag
connectednessFlag
disconnectFlag
resPeerId == peer.peerId
resStoredInfo.peerId == peer.peerId
resStoredInfo.addrs == @[peerLoc]
resStoredInfo.protocols == @[peerProto]
resStoredInfo.publicKey == peerKey.getPublicKey().tryGet()
# TODO: For compatibility, we don't store connectedness and disconnectTime
#resStoredInfo.connectedness == connectedness
#resStoredInfo.disconnectTime == disconn
resConnectedness == Connectedness.CanConnect
resDisconnect == disconn

# Test replace and retrieve (update an existing entry)
discard storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10)
require storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10).isOk

responseCount = 0
proc replacedData(peerId: PeerID, storedInfo: StoredInfo,
proc replacedData(peerId: PeerID, storedInfo: RemotePeerInfo,
connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} =
responseCount += 1

# Note: cannot use `check` within `{.raises: [Defect].}` block
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
# These flags are checked outside this block.
peerIdFlag = peerId == peer.peerId
storedInfoFlag = storedInfo == stored
connectednessFlag = connectedness == CannotConnect
disconnectFlag = disconnectTime == disconn + 10
resPeerId = peerId
resStoredInfo = storedInfo
resConnectedness = connectedness
resDisconnect = disconnectTime

let repRes = storage.getAll(replacedData)

check:
repRes.isErr == false
responseCount == 1
peerIdFlag
storedInfoFlag
connectednessFlag
disconnectFlag
resPeerId == peer.peerId
resStoredInfo.peerId == peer.peerId
resStoredInfo.addrs == @[peerLoc]
resStoredInfo.protocols == @[peerProto]
resStoredInfo.publicKey == peerKey.getPublicKey().tryGet()
# TODO: For compatibility, we don't store connectedness and disconnectTime
#resStoredInfo.connectedness == connectedness
#resStoredInfo.disconnectTime == disconn
resConnectedness == Connectedness.CannotConnect
resDisconnect == disconn + 10
69 changes: 33 additions & 36 deletions tests/v2/test_peer_store_extended.nim
Original file line number Diff line number Diff line change
Expand Up @@ -103,37 +103,37 @@ suite "Extended nim-libp2p Peer Store":

test "get() returns the correct StoredInfo for a given PeerId":
# When
let storedInfoPeer1 = peerStore.get(p1)
let storedInfoPeer6 = peerStore.get(p6)
let peer1 = peerStore.get(p1)
let peer6 = peerStore.get(p6)

# Then
check:
# regression on nim-libp2p fields
storedInfoPeer1.peerId == p1
storedInfoPeer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
storedInfoPeer1.protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
storedInfoPeer1.agent == "nwaku"
storedInfoPeer1.protoVersion == "protoVersion1"
peer1.peerId == p1
peer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
peer1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
peer1.agent == "nwaku"
peer1.protoVersion == "protoVersion1"

# our extended fields
storedInfoPeer1.connectedness == Connected
storedInfoPeer1.disconnectTime == 0
storedInfoPeer1.origin == Discv5
storedInfoPeer1.numberFailedConn == 1
storedInfoPeer1.lastFailedConn == Moment.init(1001, Second)
peer1.connectedness == Connected
peer1.disconnectTime == 0
peer1.origin == Discv5
peer1.numberFailedConn == 1
peer1.lastFailedConn == Moment.init(1001, Second)

check:
# fields are empty, not part of the peerstore
storedInfoPeer6.peerId == p6
storedInfoPeer6.addrs.len == 0
storedInfoPeer6.protos.len == 0
storedInfoPeer6.agent == default(string)
storedInfoPeer6.protoVersion == default(string)
storedInfoPeer6.connectedness == default(Connectedness)
storedInfoPeer6.disconnectTime == default(int)
storedInfoPeer6.origin == default(PeerOrigin)
storedInfoPeer6.numberFailedConn == default(int)
storedInfoPeer6.lastFailedConn == default(Moment)
peer6.peerId == p6
peer6.addrs.len == 0
peer6.protocols.len == 0
peer6.agent == default(string)
peer6.protoVersion == default(string)
peer6.connectedness == default(Connectedness)
peer6.disconnectTime == default(int)
peer6.origin == default(PeerOrigin)
peer6.numberFailedConn == default(int)
peer6.lastFailedConn == default(Moment)

test "peers() returns all StoredInfo of the PeerStore":
# When
Expand All @@ -153,7 +153,7 @@ suite "Extended nim-libp2p Peer Store":
check:
# regression on nim-libp2p fields
p3.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
p3.protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
p3.protocols == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
p3.agent == "gowaku"
p3.protoVersion == "protoVersion3"

Expand All @@ -180,7 +180,7 @@ suite "Extended nim-libp2p Peer Store":
# Only p3 supports that protocol
lpPeers.len == 1
lpPeers.anyIt(it.peerId == p3)
lpPeers[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
lpPeers[0].protocols == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]

test "peers() returns all StoredInfo matching a given protocolMatcher":
# When
Expand All @@ -197,28 +197,25 @@ suite "Extended nim-libp2p Peer Store":
pMatcherStorePeers.anyIt(it.peerId == p5)

check:
pMatcherStorePeers.filterIt(it.peerId == p1)[0].protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
pMatcherStorePeers.filterIt(it.peerId == p2)[0].protos == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
pMatcherStorePeers.filterIt(it.peerId == p3)[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
pMatcherStorePeers.filterIt(it.peerId == p5)[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
pMatcherStorePeers.filterIt(it.peerId == p1)[0].protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
pMatcherStorePeers.filterIt(it.peerId == p2)[0].protocols == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
pMatcherStorePeers.filterIt(it.peerId == p3)[0].protocols == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
pMatcherStorePeers.filterIt(it.peerId == p5)[0].protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]

check:
pMatcherSwapPeers.len == 1
pMatcherSwapPeers.anyIt(it.peerId == p5)
pMatcherSwapPeers[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
pMatcherSwapPeers[0].protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]

test "toRemotePeerInfo() converts a StoredInfo to a RemotePeerInfo":
# Given
let storedInfoPeer1 = peerStore.get(p1)

# When
let remotePeerInfo1 = storedInfoPeer1.toRemotePeerInfo()
let peer1 = peerStore.get(p1)

# Then
check:
remotePeerInfo1.peerId == p1
remotePeerInfo1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
remotePeerInfo1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
peer1.peerId == p1
peer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
peer1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]

test "connectedness() returns the connection status of a given PeerId":
check:
Expand Down
7 changes: 0 additions & 7 deletions waku/v2/node/jsonrpc/admin/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string =
return ""
constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId)

proc constructMultiaddrStr*(storedInfo: StoredInfo): string =
# Constructs a multiaddress with both location (wire) address and p2p identity
if storedInfo.addrs.len == 0:
return ""
constructMultiaddrStr(storedInfo.addrs[0], storedInfo.peerId)


proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =

rpcsrv.rpc("post_waku_v2_admin_v1_peers") do (peers: seq[string]) -> bool:
Expand Down
24 changes: 12 additions & 12 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ proc protocolMatcher*(codec: string): Matcher =

proc insertOrReplace(ps: PeerStorage,
peerId: PeerID,
storedInfo: StoredInfo,
remotePeerInfo: RemotePeerInfo,
connectedness: Connectedness,
disconnectTime: int64 = 0) =
# Insert peer entry into persistent storage, or replace existing entry with updated info
let res = ps.put(peerId, storedInfo, connectedness, disconnectTime)
let res = ps.put(peerId, remotePeerInfo, connectedness, disconnectTime)
if res.isErr:
warn "failed to store peers", err = res.error
waku_peers_errors.inc(labelValues = ["storage_failure"])
Expand Down Expand Up @@ -137,24 +137,24 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
proc loadFromStorage(pm: PeerManager) =
debug "loading peers from storage"
# Load peers from storage, if available
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) =
trace "loading peer", peerId= $peerId, storedInfo= $storedInfo, connectedness=connectedness
proc onData(peerId: PeerID, remotePeerInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64) =
trace "loading peer", peerId=peerId, connectedness=connectedness

if peerId == pm.switch.peerInfo.peerId:
# Do not manage self
return

# nim-libp2p books
pm.peerStore[AddressBook][peerId] = storedInfo.addrs
pm.peerStore[ProtoBook][peerId] = storedInfo.protos
pm.peerStore[KeyBook][peerId] = storedInfo.publicKey
pm.peerStore[AgentBook][peerId] = storedInfo.agent
pm.peerStore[ProtoVersionBook][peerId] = storedInfo.protoVersion
pm.peerStore[AddressBook][peerId] = remotePeerInfo.addrs
pm.peerStore[ProtoBook][peerId] = remotePeerInfo.protocols
pm.peerStore[KeyBook][peerId] = remotePeerInfo.publicKey
pm.peerStore[AgentBook][peerId] = remotePeerInfo.agent
pm.peerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion

# custom books
pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state
pm.peerStore[DisconnectBook][peerId] = disconnectTime
pm.peerStore[SourceBook][peerId] = storedInfo.origin
pm.peerStore[SourceBook][peerId] = remotePeerInfo.origin

let res = pm.storage.getAll(onData)
if res.isErr:
Expand Down Expand Up @@ -461,7 +461,7 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
if peers.len > 0:
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
return some(peers[0].toRemotePeerInfo())
return some(peers[0])
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

Expand All @@ -473,7 +473,7 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
# If not slotted, we select a random peer for the given protocol
if peers.len > 0:
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
return some(peers[0].toRemotePeerInfo())
return some(peers[0])
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

Expand Down
11 changes: 6 additions & 5 deletions waku/v2/node/peer_manager/peer_store/peer_storage.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,25 @@ else:
import
stew/results
import
../waku_peer_store
../waku_peer_store,
../../../utils/peers

## This module defines a peer storage interface. Implementations of
## PeerStorage are used to store and retrieve peers

type
PeerStorage* = ref object of RootObj

PeerStorageResult*[T] = Result[T, string]

DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo,
DataProc* = proc(peerId: PeerID, remotePeerInfo: RemotePeerInfo,
connectedness: Connectedness, disconnectTime: int64) {.closure, raises: [Defect].}

# PeerStorage interface
method put*(db: PeerStorage,
peerId: PeerID,
storedInfo: StoredInfo,
remotePeerInfo: RemotePeerInfo,
connectedness: Connectedness,
disconnectTime: int64): PeerStorageResult[void] {.base.} = discard

method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard
method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard
Loading

0 comments on commit 622ec27

Please sign in to comment.