Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(networking): unify peer data models, remove StoredInfo #1597

Merged
merged 6 commits into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions tests/v2/test_peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ procSuite "Peer Manager":
node.peerManager.peerStore.peers().len == 3
node.peerManager.peerStore.peers(WakuFilterCodec).allIt(it.peerId == filterPeer.peerId and
it.addrs.contains(filterLoc) and
it.protos.contains(WakuFilterCodec))
it.protocols.contains(WakuFilterCodec))
node.peerManager.peerStore.peers(WakuSwapCodec).allIt(it.peerId == swapPeer.peerId and
it.addrs.contains(swapLoc) and
it.protos.contains(WakuSwapCodec))
it.protocols.contains(WakuSwapCodec))
node.peerManager.peerStore.peers(WakuStoreCodec).allIt(it.peerId == storePeer.peerId and
it.addrs.contains(storeLoc) and
it.protos.contains(WakuStoreCodec))
it.protocols.contains(WakuStoreCodec))

await node.stop()

Expand Down Expand Up @@ -270,7 +270,7 @@ procSuite "Peer Manager":
# Currently connected to node2
node1.peerManager.peerStore.peers().len == 1
node1.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node1.peerManager.peerStore.peers().anyIt(it.protos.contains(node2.wakuRelay.codec))
node1.peerManager.peerStore.peers().anyIt(it.protocols.contains(node2.wakuRelay.codec))
node1.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected

# Simulate restart by initialising a new node using the same storage
Expand All @@ -286,7 +286,7 @@ procSuite "Peer Manager":
# Node2 has been loaded after "restart", but we have not yet reconnected
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(betaCodec))
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == NotConnected

await node3.start() # This should trigger a reconnect
Expand All @@ -295,8 +295,8 @@ procSuite "Peer Manager":
# Reconnected to node2 after "restart"
node3.peerManager.peerStore.peers().len == 1
node3.peerManager.peerStore.peers().anyIt(it.peerId == peerInfo2.peerId)
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(betaCodec))
node3.peerManager.peerStore.peers().anyIt(it.protos.contains(stableCodec))
node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(betaCodec))
node3.peerManager.peerStore.peers().anyIt(it.protocols.contains(stableCodec))
node3.peerManager.peerStore.connectedness(peerInfo2.peerId) == Connected

await allFutures([node1.stop(), node2.stop(), node3.stop()])
Expand Down
70 changes: 46 additions & 24 deletions tests/v2/test_peer_storage.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,64 +22,86 @@ suite "Peer Storage":
peerKey = generateEcdsaKey()
peer = PeerInfo.new(peerKey, @[peerLoc])
peerProto = "/waku/2/default-waku/codec"
stored = StoredInfo(peerId: peer.peerId, addrs: @[peerLoc], protos: @[peerProto], publicKey: peerKey.getPublicKey().tryGet())
conn = Connectedness.CanConnect
connectedness = Connectedness.CanConnect
disconn = 999999
stored = RemotePeerInfo(
peerId: peer.peerId,
addrs: @[peerLoc],
protocols: @[peerProto],
publicKey: peerKey.getPublicKey().tryGet(),
connectedness: connectedness,
disconnectTime: disconn)

defer: storage.close()

# Test insert and retrieve

discard storage.put(peer.peerId, stored, conn, disconn)
require storage.put(peer.peerId, stored, connectedness, disconn).isOk

var responseCount = 0
# flags to check data matches what was stored (default true)
var peerIdFlag, storedInfoFlag, connectednessFlag, disconnectFlag: bool

proc data(peerId: PeerID, storedInfo: StoredInfo,
# Fetched variables from callback
var resPeerId: PeerId
var resStoredInfo: RemotePeerInfo
var resConnectedness: Connectedness
var resDisconnect: int64

proc data(peerId: PeerID, storedInfo: RemotePeerInfo,
connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} =
responseCount += 1

# Note: cannot use `check` within `{.raises: [Defect].}` block
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
# These flags are checked outside this block.
peerIdFlag = peerId == peer.peerId
storedInfoFlag = storedInfo == stored
connectednessFlag = connectedness == conn
disconnectFlag = disconnectTime == disconn
resPeerId = peerId
resStoredInfo = storedInfo
resConnectedness = connectedness
resDisconnect = disconnectTime

let res = storage.getAll(data)

check:
res.isErr == false
responseCount == 1
peerIdFlag
storedInfoFlag
connectednessFlag
disconnectFlag
resPeerId == peer.peerId
resStoredInfo.peerId == peer.peerId
resStoredInfo.addrs == @[peerLoc]
resStoredInfo.protocols == @[peerProto]
resStoredInfo.publicKey == peerKey.getPublicKey().tryGet()
# TODO: For compatibility, we don't store connectedness and disconnectTime
#resStoredInfo.connectedness == connectedness
#resStoredInfo.disconnectTime == disconn
resConnectedness == Connectedness.CanConnect
resDisconnect == disconn

# Test replace and retrieve (update an existing entry)
discard storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10)
require storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10).isOk

responseCount = 0
proc replacedData(peerId: PeerID, storedInfo: StoredInfo,
proc replacedData(peerId: PeerID, storedInfo: RemotePeerInfo,
connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} =
responseCount += 1

# Note: cannot use `check` within `{.raises: [Defect].}` block
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
# These flags are checked outside this block.
peerIdFlag = peerId == peer.peerId
storedInfoFlag = storedInfo == stored
connectednessFlag = connectedness == CannotConnect
disconnectFlag = disconnectTime == disconn + 10
resPeerId = peerId
resStoredInfo = storedInfo
resConnectedness = connectedness
resDisconnect = disconnectTime

let repRes = storage.getAll(replacedData)

check:
repRes.isErr == false
responseCount == 1
peerIdFlag
storedInfoFlag
connectednessFlag
disconnectFlag
resPeerId == peer.peerId
resStoredInfo.peerId == peer.peerId
resStoredInfo.addrs == @[peerLoc]
resStoredInfo.protocols == @[peerProto]
resStoredInfo.publicKey == peerKey.getPublicKey().tryGet()
# TODO: For compatibility, we don't store connectedness and disconnectTime
#resStoredInfo.connectedness == connectedness
#resStoredInfo.disconnectTime == disconn
resConnectedness == Connectedness.CannotConnect
resDisconnect == disconn + 10
69 changes: 33 additions & 36 deletions tests/v2/test_peer_store_extended.nim
Original file line number Diff line number Diff line change
Expand Up @@ -103,37 +103,37 @@ suite "Extended nim-libp2p Peer Store":

test "get() returns the correct StoredInfo for a given PeerId":
# When
let storedInfoPeer1 = peerStore.get(p1)
let storedInfoPeer6 = peerStore.get(p6)
let peer1 = peerStore.get(p1)
let peer6 = peerStore.get(p6)

# Then
check:
# regression on nim-libp2p fields
storedInfoPeer1.peerId == p1
storedInfoPeer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
storedInfoPeer1.protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
storedInfoPeer1.agent == "nwaku"
storedInfoPeer1.protoVersion == "protoVersion1"
peer1.peerId == p1
peer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
peer1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
peer1.agent == "nwaku"
peer1.protoVersion == "protoVersion1"

# our extended fields
storedInfoPeer1.connectedness == Connected
storedInfoPeer1.disconnectTime == 0
storedInfoPeer1.origin == Discv5
storedInfoPeer1.numberFailedConn == 1
storedInfoPeer1.lastFailedConn == Moment.init(1001, Second)
peer1.connectedness == Connected
peer1.disconnectTime == 0
peer1.origin == Discv5
peer1.numberFailedConn == 1
peer1.lastFailedConn == Moment.init(1001, Second)

check:
# fields are empty, not part of the peerstore
storedInfoPeer6.peerId == p6
storedInfoPeer6.addrs.len == 0
storedInfoPeer6.protos.len == 0
storedInfoPeer6.agent == default(string)
storedInfoPeer6.protoVersion == default(string)
storedInfoPeer6.connectedness == default(Connectedness)
storedInfoPeer6.disconnectTime == default(int)
storedInfoPeer6.origin == default(PeerOrigin)
storedInfoPeer6.numberFailedConn == default(int)
storedInfoPeer6.lastFailedConn == default(Moment)
peer6.peerId == p6
peer6.addrs.len == 0
peer6.protocols.len == 0
peer6.agent == default(string)
peer6.protoVersion == default(string)
peer6.connectedness == default(Connectedness)
peer6.disconnectTime == default(int)
peer6.origin == default(PeerOrigin)
peer6.numberFailedConn == default(int)
peer6.lastFailedConn == default(Moment)

test "peers() returns all StoredInfo of the PeerStore":
# When
Expand All @@ -153,7 +153,7 @@ suite "Extended nim-libp2p Peer Store":
check:
# regression on nim-libp2p fields
p3.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/3").tryGet()]
p3.protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
p3.protocols == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
p3.agent == "gowaku"
p3.protoVersion == "protoVersion3"

Expand All @@ -180,7 +180,7 @@ suite "Extended nim-libp2p Peer Store":
# Only p3 supports that protocol
lpPeers.len == 1
lpPeers.anyIt(it.peerId == p3)
lpPeers[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
lpPeers[0].protocols == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]

test "peers() returns all StoredInfo matching a given protocolMatcher":
# When
Expand All @@ -197,28 +197,25 @@ suite "Extended nim-libp2p Peer Store":
pMatcherStorePeers.anyIt(it.peerId == p5)

check:
pMatcherStorePeers.filterIt(it.peerId == p1)[0].protos == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
pMatcherStorePeers.filterIt(it.peerId == p2)[0].protos == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
pMatcherStorePeers.filterIt(it.peerId == p3)[0].protos == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
pMatcherStorePeers.filterIt(it.peerId == p5)[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
pMatcherStorePeers.filterIt(it.peerId == p1)[0].protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
pMatcherStorePeers.filterIt(it.peerId == p2)[0].protocols == @["/vac/waku/relay/2.0.0", "/vac/waku/store/2.0.0"]
pMatcherStorePeers.filterIt(it.peerId == p3)[0].protocols == @["/vac/waku/lightpush/2.0.0", "/vac/waku/store/2.0.0-beta1"]
pMatcherStorePeers.filterIt(it.peerId == p5)[0].protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]

check:
pMatcherSwapPeers.len == 1
pMatcherSwapPeers.anyIt(it.peerId == p5)
pMatcherSwapPeers[0].protos == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]
pMatcherSwapPeers[0].protocols == @["/vac/waku/swap/2.0.0", "/vac/waku/store/2.0.0-beta2"]

test "toRemotePeerInfo() converts a StoredInfo to a RemotePeerInfo":
# Given
let storedInfoPeer1 = peerStore.get(p1)

# When
let remotePeerInfo1 = storedInfoPeer1.toRemotePeerInfo()
let peer1 = peerStore.get(p1)

# Then
check:
remotePeerInfo1.peerId == p1
remotePeerInfo1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
remotePeerInfo1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]
peer1.peerId == p1
peer1.addrs == @[MultiAddress.init("/ip4/127.0.0.1/tcp/1").tryGet()]
peer1.protocols == @["/vac/waku/relay/2.0.0-beta1", "/vac/waku/store/2.0.0"]

test "connectedness() returns the connection status of a given PeerId":
check:
Expand Down
7 changes: 0 additions & 7 deletions waku/v2/node/jsonrpc/admin/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,6 @@ proc constructMultiaddrStr*(remotePeerInfo: RemotePeerInfo): string =
return ""
constructMultiaddrStr(remotePeerInfo.addrs[0], remotePeerInfo.peerId)

proc constructMultiaddrStr*(storedInfo: StoredInfo): string =
# Constructs a multiaddress with both location (wire) address and p2p identity
if storedInfo.addrs.len == 0:
return ""
constructMultiaddrStr(storedInfo.addrs[0], storedInfo.peerId)


proc installAdminApiHandlers*(node: WakuNode, rpcsrv: RpcServer) =

rpcsrv.rpc("post_waku_v2_admin_v1_peers") do (peers: seq[string]) -> bool:
Expand Down
24 changes: 12 additions & 12 deletions waku/v2/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,11 @@ proc protocolMatcher*(codec: string): Matcher =

proc insertOrReplace(ps: PeerStorage,
peerId: PeerID,
storedInfo: StoredInfo,
remotePeerInfo: RemotePeerInfo,
connectedness: Connectedness,
disconnectTime: int64 = 0) =
# Insert peer entry into persistent storage, or replace existing entry with updated info
let res = ps.put(peerId, storedInfo, connectedness, disconnectTime)
let res = ps.put(peerId, remotePeerInfo, connectedness, disconnectTime)
if res.isErr:
warn "failed to store peers", err = res.error
waku_peers_errors.inc(labelValues = ["storage_failure"])
Expand Down Expand Up @@ -137,24 +137,24 @@ proc dialPeer(pm: PeerManager, peerId: PeerID,
proc loadFromStorage(pm: PeerManager) =
debug "loading peers from storage"
# Load peers from storage, if available
proc onData(peerId: PeerID, storedInfo: StoredInfo, connectedness: Connectedness, disconnectTime: int64) =
trace "loading peer", peerId= $peerId, storedInfo= $storedInfo, connectedness=connectedness
proc onData(peerId: PeerID, remotePeerInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64) =
trace "loading peer", peerId=peerId, connectedness=connectedness

if peerId == pm.switch.peerInfo.peerId:
# Do not manage self
return

# nim-libp2p books
pm.peerStore[AddressBook][peerId] = storedInfo.addrs
pm.peerStore[ProtoBook][peerId] = storedInfo.protos
pm.peerStore[KeyBook][peerId] = storedInfo.publicKey
pm.peerStore[AgentBook][peerId] = storedInfo.agent
pm.peerStore[ProtoVersionBook][peerId] = storedInfo.protoVersion
pm.peerStore[AddressBook][peerId] = remotePeerInfo.addrs
pm.peerStore[ProtoBook][peerId] = remotePeerInfo.protocols
pm.peerStore[KeyBook][peerId] = remotePeerInfo.publicKey
pm.peerStore[AgentBook][peerId] = remotePeerInfo.agent
pm.peerStore[ProtoVersionBook][peerId] = remotePeerInfo.protoVersion

# custom books
pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state
pm.peerStore[DisconnectBook][peerId] = disconnectTime
pm.peerStore[SourceBook][peerId] = storedInfo.origin
pm.peerStore[SourceBook][peerId] = remotePeerInfo.origin

let res = pm.storage.getAll(onData)
if res.isErr:
Expand Down Expand Up @@ -461,7 +461,7 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
# TODO: proper heuristic here that compares peer scores and selects "best" one. For now the first peer for the given protocol is returned
if peers.len > 0:
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
return some(peers[0].toRemotePeerInfo())
return some(peers[0])
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

Expand All @@ -473,7 +473,7 @@ proc selectPeer*(pm: PeerManager, proto: string): Option[RemotePeerInfo] =
# If not slotted, we select a random peer for the given protocol
if peers.len > 0:
debug "Got peer from peerstore", peerId=peers[0].peerId, multi=peers[0].addrs[0], protocol=proto
return some(peers[0].toRemotePeerInfo())
return some(peers[0])
debug "No peer found for protocol", protocol=proto
return none(RemotePeerInfo)

Expand Down
11 changes: 6 additions & 5 deletions waku/v2/node/peer_manager/peer_store/peer_storage.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,25 @@ else:
import
stew/results
import
../waku_peer_store
../waku_peer_store,
../../../utils/peers

## This module defines a peer storage interface. Implementations of
## PeerStorage are used to store and retrieve peers

type
PeerStorage* = ref object of RootObj

PeerStorageResult*[T] = Result[T, string]

DataProc* = proc(peerId: PeerID, storedInfo: StoredInfo,
DataProc* = proc(peerId: PeerID, remotePeerInfo: RemotePeerInfo,
connectedness: Connectedness, disconnectTime: int64) {.closure, raises: [Defect].}

# PeerStorage interface
method put*(db: PeerStorage,
peerId: PeerID,
storedInfo: StoredInfo,
remotePeerInfo: RemotePeerInfo,
connectedness: Connectedness,
disconnectTime: int64): PeerStorageResult[void] {.base.} = discard

method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard
method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard
Loading