Skip to content

Commit 6f09fbf

Browse files
feat: use the lowest latency peer for protocols (#1540)
* maintain pings in a hashmap * convert `KeepAliveManager` into a singleton * chore: fix an unrelated cyclic dependency error * update `selectPeerForProtocol` to return peer with the lowest latency * use the new KeepAliveManager API * use the new API for `selectPeerForProtocol` * add tests * use PeerData to hold the ping instead of a new variable * improve tests for readability * move back KeepAliveManager from singleton * reenable all tests * minor improvements * improve error handling * convert .then() syntax to async/await
1 parent 1cfe0fc commit 6f09fbf

File tree

6 files changed

+247
-18
lines changed

6 files changed

+247
-18
lines changed

package-lock.json

+6-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/core/src/lib/connection_manager.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -340,7 +340,11 @@ export class ConnectionManager
340340
void (async () => {
341341
const peerId = evt.detail;
342342

343-
this.keepAliveManager.start(peerId, this.libp2p.services.ping);
343+
this.keepAliveManager.start(
344+
peerId,
345+
this.libp2p.services.ping,
346+
this.libp2p.peerStore
347+
);
344348

345349
const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
346350
Tags.BOOTSTRAP

packages/core/src/lib/keep_alive_manager.ts

+30-6
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
import type { PeerId } from "@libp2p/interface/peer-id";
2+
import type { PeerStore } from "@libp2p/interface/peer-store";
23
import type { IRelay } from "@waku/interfaces";
34
import type { KeepAliveOptions } from "@waku/interfaces";
5+
import { utf8ToBytes } from "@waku/utils/bytes";
46
import debug from "debug";
57
import type { PingService } from "libp2p/ping";
68

7-
import { createEncoder } from "../index.js";
9+
import { createEncoder } from "./message/version_0.js";
810

911
export const RelayPingContentTopic = "/relay-ping/1/ping/null";
1012
const log = debug("waku:keep-alive");
@@ -22,8 +24,12 @@ export class KeepAliveManager {
2224
this.relay = relay;
2325
}
2426

25-
public start(peerId: PeerId, libp2pPing: PingService): void {
26-
// Just in case a timer already exist for this peer
27+
public start(
28+
peerId: PeerId,
29+
libp2pPing: PingService,
30+
peerStore: PeerStore
31+
): void {
32+
// Just in case a timer already exists for this peer
2733
this.stop(peerId);
2834

2935
const { pingKeepAlive: pingPeriodSecs, relayKeepAlive: relayPeriodSecs } =
@@ -33,10 +39,28 @@ export class KeepAliveManager {
3339

3440
if (pingPeriodSecs !== 0) {
3541
const interval = setInterval(() => {
36-
libp2pPing.ping(peerId).catch((e) => {
37-
log(`Ping failed (${peerIdStr})`, e);
38-
});
42+
void (async () => {
43+
try {
44+
// ping the peer for keep alive
45+
// also update the peer store with the latency
46+
const ping = await libp2pPing.ping(peerId);
47+
log(`Ping succeeded (${peerIdStr})`, ping);
48+
49+
try {
50+
await peerStore.patch(peerId, {
51+
metadata: {
52+
ping: utf8ToBytes(ping.toString())
53+
}
54+
});
55+
} catch (e) {
56+
log("Failed to update ping", e);
57+
}
58+
} catch (e) {
59+
log(`Ping failed (${peerIdStr})`, e);
60+
}
61+
})();
3962
}, pingPeriodSecs * 1000);
63+
4064
this.pingKeepAliveTimers.set(peerIdStr, interval);
4165
}
4266

packages/tests/package.json

+4-3
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
"@waku/interfaces": "*",
5858
"@waku/utils": "*",
5959
"app-root-path": "^3.1.0",
60+
"chai-as-promised": "^7.1.1",
6061
"debug": "^4.3.4",
6162
"dockerode": "^3.3.5",
6263
"p-timeout": "^6.1.0",
@@ -66,20 +67,20 @@
6667
},
6768
"devDependencies": {
6869
"@libp2p/bootstrap": "^9.0.2",
69-
"@types/sinon": "^10.0.16",
7070
"@types/chai": "^4.3.5",
7171
"@types/dockerode": "^3.3.19",
7272
"@types/mocha": "^10.0.1",
73+
"@types/sinon": "^10.0.16",
7374
"@types/tail": "^2.2.1",
7475
"@typescript-eslint/eslint-plugin": "^5.57.0",
7576
"@typescript-eslint/parser": "^6.6.0",
76-
"@waku/sdk": "*",
7777
"@waku/dns-discovery": "*",
7878
"@waku/message-encryption": "*",
7979
"@waku/peer-exchange": "*",
80+
"@waku/sdk": "*",
8081
"chai": "^4.3.7",
81-
"datastore-core": "^9.2.2",
8282
"cspell": "^7.3.2",
83+
"datastore-core": "^9.2.2",
8384
"debug": "^4.3.4",
8485
"interface-datastore": "^8.2.3",
8586
"libp2p": "^0.46.9",

packages/tests/tests/utils.spec.ts

+155-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
import type { PeerStore } from "@libp2p/interface/peer-store";
2+
import type { Peer } from "@libp2p/interface/peer-store";
3+
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
14
import {
25
createDecoder,
36
createEncoder,
@@ -9,11 +12,16 @@ import { Protocols } from "@waku/interfaces";
912
import { createLightNode } from "@waku/sdk";
1013
import { toAsyncIterator } from "@waku/utils";
1114
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
12-
import { expect } from "chai";
15+
import { selectPeerForProtocol } from "@waku/utils/libp2p";
16+
import chai, { expect } from "chai";
17+
import chaiAsPromised from "chai-as-promised";
18+
import sinon from "sinon";
1319

14-
import { makeLogFileName, NOISE_KEY_1 } from "../src/index.js";
20+
import { delay, makeLogFileName, NOISE_KEY_1 } from "../src/index.js";
1521
import { NimGoNode } from "../src/node/node.js";
1622

23+
chai.use(chaiAsPromised);
24+
1725
const TestContentTopic = "/test/1/waku-filter";
1826
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
1927
const TestDecoder = createDecoder(TestContentTopic);
@@ -106,3 +114,148 @@ describe("Util: toAsyncIterator: Filter", () => {
106114
expect(result.done).to.eq(true);
107115
});
108116
});
117+
118+
const TestCodec = "test/1";
119+
120+
describe("selectPeerForProtocol", () => {
121+
let peerStore: PeerStore;
122+
const protocols = [TestCodec];
123+
124+
let lowPingPeer: Peer,
125+
midPingPeer: Peer,
126+
highPingPeer: Peer,
127+
differentCodecPeer: Peer,
128+
anotherDifferentCodecPeer: Peer;
129+
130+
beforeEach(async function () {
131+
this.timeout(10000);
132+
const waku = await createLightNode();
133+
await waku.start();
134+
await delay(3000);
135+
peerStore = waku.libp2p.peerStore;
136+
137+
const [
138+
lowPingPeerId,
139+
midPingPeerId,
140+
highPingPeerId,
141+
differentCodecPeerId,
142+
anotherDifferentCodecPeerId
143+
] = await Promise.all([
144+
createSecp256k1PeerId(),
145+
createSecp256k1PeerId(),
146+
createSecp256k1PeerId(),
147+
createSecp256k1PeerId(),
148+
createSecp256k1PeerId()
149+
]);
150+
151+
lowPingPeer = {
152+
id: lowPingPeerId,
153+
protocols: [TestCodec],
154+
metadata: new Map().set("ping", utf8ToBytes("50"))
155+
} as Peer;
156+
157+
midPingPeer = {
158+
id: midPingPeerId,
159+
protocols: [TestCodec],
160+
metadata: new Map().set("ping", utf8ToBytes("100"))
161+
} as Peer;
162+
163+
highPingPeer = {
164+
id: highPingPeerId,
165+
protocols: [TestCodec],
166+
metadata: new Map().set("ping", utf8ToBytes("500"))
167+
} as Peer;
168+
169+
differentCodecPeer = {
170+
id: differentCodecPeerId,
171+
protocols: ["DifferentCodec"]
172+
} as Peer;
173+
174+
anotherDifferentCodecPeer = {
175+
id: anotherDifferentCodecPeerId,
176+
protocols: ["AnotherDifferentCodec"]
177+
} as Peer;
178+
});
179+
180+
afterEach(() => {
181+
sinon.restore();
182+
});
183+
184+
it("should return the peer with the lowest ping", async function () {
185+
const mockPeers = [highPingPeer, lowPingPeer, midPingPeer];
186+
187+
sinon.stub(peerStore, "get").callsFake(async (peerId) => {
188+
return mockPeers.find((peer) => peer.id.equals(peerId))!;
189+
});
190+
191+
sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
192+
for (const peer of mockPeers) {
193+
callback(peer);
194+
}
195+
});
196+
197+
const result = await selectPeerForProtocol(peerStore, protocols);
198+
199+
expect(result.peer).to.deep.equal(lowPingPeer);
200+
expect(result.protocol).to.equal(TestCodec);
201+
});
202+
203+
it("should return the peer with the provided peerId", async function () {
204+
const targetPeer = await createSecp256k1PeerId();
205+
const mockPeer = { id: targetPeer, protocols: [TestCodec] } as Peer;
206+
sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer);
207+
208+
const result = await selectPeerForProtocol(
209+
peerStore,
210+
protocols,
211+
targetPeer
212+
);
213+
expect(result.peer).to.deep.equal(mockPeer);
214+
});
215+
216+
it("should return a random peer when all peers have the same latency", async function () {
217+
const mockPeers = [highPingPeer, highPingPeer, highPingPeer];
218+
219+
sinon.stub(peerStore, "get").callsFake(async (peerId) => {
220+
return mockPeers.find((peer) => peer.id.equals(peerId))!;
221+
});
222+
223+
sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
224+
for (const peer of mockPeers) {
225+
callback(peer);
226+
}
227+
});
228+
229+
const result = await selectPeerForProtocol(peerStore, protocols);
230+
231+
expect(mockPeers).to.deep.include(result.peer);
232+
});
233+
234+
it("should throw an error when no peer matches the given protocols", async function () {
235+
const mockPeers = [differentCodecPeer, anotherDifferentCodecPeer];
236+
237+
sinon.stub(peerStore, "forEach").callsFake(async (callback) => {
238+
for (const peer of mockPeers) {
239+
callback(peer);
240+
}
241+
});
242+
243+
await expect(
244+
selectPeerForProtocol(peerStore, protocols)
245+
).to.be.rejectedWith(
246+
`Failed to find known peer that registers protocols: ${protocols}`
247+
);
248+
});
249+
250+
it("should throw an error when the selected peer does not register the required protocols", async function () {
251+
const targetPeer = await createSecp256k1PeerId();
252+
const mockPeer = { id: targetPeer, protocols: ["DifferentCodec"] } as Peer;
253+
sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer);
254+
255+
await expect(
256+
selectPeerForProtocol(peerStore, protocols, targetPeer)
257+
).to.be.rejectedWith(
258+
`Peer does not register required protocols (${targetPeer.toString()}): ${protocols}`
259+
);
260+
});
261+
});

0 commit comments

Comments
 (0)