Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: upgrade to libp2p@0.45 #1400

Merged
merged 49 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
92a404d
upgrade libp2p version, partially update protocols, rename to IBasePr…
weboko Jun 9, 2023
fd9dfd8
complete transition for protocols
weboko Jun 9, 2023
54ebf65
complete transition of connection maanger
weboko Jun 12, 2023
e1f56be
finish sdk
weboko Jun 13, 2023
65917c8
complete core
weboko Jun 13, 2023
c02f224
complete relay
weboko Jun 13, 2023
6634606
complete peer-exchange
weboko Jun 15, 2023
483dc19
complete dns-discovery
weboko Jun 15, 2023
d52ed53
add components field to Libp2p interface and use it in core
weboko Jun 15, 2023
548cf5b
add type hack for Libp2p creation:
weboko Jun 15, 2023
866a6ac
finish waku node test
weboko Jun 15, 2023
209021f
complete relay test
weboko Jun 15, 2023
235a1c4
complete peer exchange
weboko Jun 15, 2023
00b6275
complete dns peer discovery test
weboko Jun 16, 2023
3ebca37
add missing dependency to relay
weboko Jun 16, 2023
b18fb90
fix new peer store integration
weboko Jun 16, 2023
10c59bb
improve initialization of pubsub
weboko Jun 16, 2023
644cf9d
add catch for missing peer
weboko Jun 16, 2023
021ae09
update test and remove extra dependency
weboko Jun 16, 2023
db56010
prevent error throw
weboko Jun 16, 2023
4730efe
merge with master
weboko Jun 19, 2023
ddc6656
fix edge case with peerStore
weboko Jun 19, 2023
f0cf4de
fix peer exchange
weboko Jun 19, 2023
8b11025
fix protocols used
weboko Jun 21, 2023
07f6590
Merge branch 'master' of github.com:waku-org/js-waku into weboko/libp…
weboko Jun 21, 2023
2c0350c
fix test with another evnet
weboko Jun 21, 2023
7acf23e
bump libp2p and interfaces
weboko Jun 23, 2023
06b3a88
add missing package
weboko Jun 25, 2023
4bf192d
fix peer-exchange problem
weboko Jul 3, 2023
6aa0cbf
merge with master
weboko Jul 3, 2023
d30888c
prefer libp2p peerDiscovery for integration tests
weboko Jul 3, 2023
aea44f9
Merge branch 'master' into weboko/libp2p.45
weboko Jul 10, 2023
e2db579
fix import
weboko Jul 10, 2023
e3b90a3
increate timeout
weboko Jul 10, 2023
b48b042
return test against Test fleet
weboko Jul 10, 2023
4575d06
Merge branch 'master' into weboko/libp2p.45
weboko Jul 13, 2023
70fd07c
remove await for peer:update
weboko Jul 15, 2023
1a634ff
increase timeout
weboko Jul 15, 2023
18aee0c
add await for peerStore
weboko Jul 15, 2023
ac2e900
comment event for testing
weboko Jul 15, 2023
33e5df8
merge with develop
weboko Jul 24, 2023
10e8f75
merge with master
weboko Jul 24, 2023
d15f34a
fix lint
weboko Jul 24, 2023
02dfd29
remove bind
weboko Jul 24, 2023
3a940ae
fix stub
weboko Jul 24, 2023
1b37fc5
decouple to separate test case
weboko Jul 24, 2023
cb83d54
move back to explicit build
weboko Jul 24, 2023
b037080
remove only
weboko Jul 24, 2023
32f6650
do not test event
weboko Jul 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5,113 changes: 2,477 additions & 2,636 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 6 additions & 6 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@
"uuid": "^9.0.0"
},
"devDependencies": {
"@libp2p/interface-connection": "^3.0.8",
"@libp2p/interface-libp2p": "^1.1.2",
"@libp2p/interface-peer-id": "^2.0.1",
"@libp2p/interface-peer-store": "^1.2.8",
"@libp2p/interface-registrar": "^2.0.8",
"@libp2p/interface-connection": "^5.1.0",
"@libp2p/interface-libp2p": "^3.2.0",
"@libp2p/interface-peer-id": "^2.0.2",
"@libp2p/interface-peer-store": "^2.0.3",
"@libp2p/interface-registrar": "^2.0.12",
"@multiformats/multiaddr": "^12.0.0",
"@rollup/plugin-commonjs": "^24.0.1",
"@rollup/plugin-json": "^6.0.0",
Expand Down Expand Up @@ -128,7 +128,7 @@
},
"peerDependencies": {
"@multiformats/multiaddr": "^12.0.0",
"libp2p": "^0.42.2"
"libp2p": "^0.45.5"
},
"peerDependenciesMeta": {
"@multiformats/multiaddr": {
Expand Down
32 changes: 23 additions & 9 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { Connection, Stream } from "@libp2p/interface-connection";
import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer, PeerStore } from "@libp2p/interface-peer-store";
import type { IBaseProtocol, Libp2pComponents } from "@waku/interfaces";
import {
getPeersForProtocol,
selectConnection,
Expand All @@ -11,19 +13,29 @@ import {
* A class with predefined helpers, to be used as a base to implement Waku
* Protocols.
*/
export class BaseProtocol {
constructor(
public multicodec: string,
public peerStore: PeerStore,
protected getConnections: (peerId?: PeerId) => Connection[]
) {}
export class BaseProtocol implements IBaseProtocol {
public readonly addLibp2pEventListener: Libp2p["addEventListener"];
public readonly removeLibp2pEventListener: Libp2p["removeEventListener"];

constructor(public multicodec: string, private components: Libp2pComponents) {
this.addLibp2pEventListener = components.events.addEventListener.bind(
components.events
);
this.removeLibp2pEventListener = components.events.removeEventListener.bind(
components.events
);
}

public get peerStore(): PeerStore {
return this.components.peerStore;
}

/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
async peers(): Promise<Peer[]> {
public async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}

Expand All @@ -36,7 +48,9 @@ export class BaseProtocol {
return peer;
}
protected async newStream(peer: Peer): Promise<Stream> {
const connections = this.getConnections(peer.id);
const connections = this.components.connectionManager.getConnections(
peer.id
);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
Expand Down
68 changes: 32 additions & 36 deletions packages/core/src/lib/connection_manager.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import type { Connection } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerInfo } from "@libp2p/interface-peer-info";
import type { ConnectionManagerOptions, IRelay } from "@waku/interfaces";
import { Tags } from "@waku/interfaces";
import { Libp2p, Tags } from "@waku/interfaces";
import debug from "debug";

import { KeepAliveManager, KeepAliveOptions } from "./keep_alive_manager.js";
Expand All @@ -18,7 +16,7 @@ export class ConnectionManager {
private static instances = new Map<string, ConnectionManager>();
private keepAliveManager: KeepAliveManager;
private options: ConnectionManagerOptions;
private libp2pComponents: Libp2p;
private libp2p: Libp2p;
private dialAttemptsForPeer: Map<string, number> = new Map();
private dialErrorsForPeer: Map<string, any> = new Map();

Expand Down Expand Up @@ -47,12 +45,12 @@ export class ConnectionManager {
}

private constructor(
libp2pComponents: Libp2p,
libp2p: Libp2p,
keepAliveOptions: KeepAliveOptions,
relay?: IRelay,
options?: Partial<ConnectionManagerOptions>
) {
this.libp2pComponents = libp2pComponents;
this.libp2p = libp2p;
this.options = {
maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER,
maxBootstrapPeersAllowed: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED,
Expand All @@ -73,13 +71,11 @@ export class ConnectionManager {
}

private async dialPeerStorePeers(): Promise<void> {
const peerInfos = await this.libp2pComponents.peerStore.all();
const peerInfos = await this.libp2p.peerStore.all();
const dialPromises = [];
for (const peerInfo of peerInfos) {
if (
this.libp2pComponents
.getConnections()
.find((c) => c.remotePeer === peerInfo.id)
this.libp2p.getConnections().find((c) => c.remotePeer === peerInfo.id)
)
continue;

Expand All @@ -101,15 +97,15 @@ export class ConnectionManager {

stop(): void {
this.keepAliveManager.stopAll();
this.libp2pComponents.removeEventListener(
this.libp2p.removeEventListener(
"peer:connect",
this.onEventHandlers["peer:connect"]
);
this.libp2pComponents.removeEventListener(
this.libp2p.removeEventListener(
"peer:disconnect",
this.onEventHandlers["peer:disconnect"]
);
this.libp2pComponents.removeEventListener(
this.libp2p.removeEventListener(
"peer:discovery",
this.onEventHandlers["peer:discovery"]
);
Expand All @@ -121,12 +117,12 @@ export class ConnectionManager {
while (dialAttempt <= this.options.maxDialAttemptsForPeer) {
try {
log(`Dialing peer ${peerId.toString()}`);
await this.libp2pComponents.dial(peerId);
await this.libp2p.dial(peerId);

const tags = await this.getTagNamesForPeer(peerId);
// add tag to connection describing discovery mechanism
// don't add duplicate tags
this.libp2pComponents
this.libp2p
.getConnections(peerId)
.forEach(
(conn) => (conn.tags = Array.from(new Set([...conn.tags, ...tags])))
Expand Down Expand Up @@ -157,7 +153,7 @@ export class ConnectionManager {
}`
);
this.dialErrorsForPeer.delete(peerId.toString());
return await this.libp2pComponents.peerStore.delete(peerId);
return await this.libp2p.peerStore.delete(peerId);
} catch (error) {
throw `Error deleting undialable peer ${peerId.toString()} from peer store - ${error}`;
} finally {
Expand All @@ -168,7 +164,7 @@ export class ConnectionManager {

async dropConnection(peerId: PeerId): Promise<void> {
try {
await this.libp2pComponents.hangUp(peerId);
await this.libp2p.hangUp(peerId);
log(`Dropped connection with peer ${peerId.toString()}`);
} catch (error) {
log(
Expand All @@ -191,14 +187,14 @@ export class ConnectionManager {
}

private startPeerDiscoveryListener(): void {
this.libp2pComponents.peerStore.addEventListener(
"peer",
this.libp2p.addEventListener(
"peer:discovery",
this.onEventHandlers["peer:discovery"]
);
}

private startPeerConnectionListener(): void {
this.libp2pComponents.addEventListener(
this.libp2p.addEventListener(
"peer:connect",
this.onEventHandlers["peer:connect"]
);
Expand All @@ -217,7 +213,7 @@ export class ConnectionManager {
* >this event will **only** be triggered when the last connection is closed.
* @see https://github.com/libp2p/js-libp2p/blob/bad9e8c0ff58d60a78314077720c82ae331cc55b/doc/API.md?plain=1#L2100
*/
this.libp2pComponents.addEventListener(
this.libp2p.addEventListener(
"peer:disconnect",
this.onEventHandlers["peer:disconnect"]
);
Expand All @@ -244,20 +240,17 @@ export class ConnectionManager {
log(`Error dialing peer ${peerId.toString()} : ${err}`)
);
},
"peer:connect": async (evt: CustomEvent<Connection>): Promise<void> => {
const { remotePeer: peerId } = evt.detail;
"peer:connect": async (evt: CustomEvent<PeerId>): Promise<void> => {
const peerId = evt.detail;

this.keepAliveManager.start(
peerId,
this.libp2pComponents.ping.bind(this)
);
this.keepAliveManager.start(peerId, this.libp2p.services.ping);

const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
Tags.BOOTSTRAP
);

if (isBootstrap) {
const bootstrapConnections = this.libp2pComponents
const bootstrapConnections = this.libp2p
.getConnections()
.filter((conn) => conn.tags.includes(Tags.BOOTSTRAP));

Expand All @@ -270,8 +263,8 @@ export class ConnectionManager {
}
},
"peer:disconnect": () => {
return (evt: CustomEvent<Connection>): void => {
this.keepAliveManager.stop(evt.detail.remotePeer);
return (evt: CustomEvent<PeerId>): void => {
this.keepAliveManager.stop(evt.detail);
};
},
};
Expand All @@ -282,7 +275,7 @@ export class ConnectionManager {
* 2. If the peer is not a bootstrap peer
*/
private async shouldDialPeer(peerId: PeerId): Promise<boolean> {
const isConnected = this.libp2pComponents.getConnections(peerId).length > 0;
const isConnected = this.libp2p.getConnections(peerId).length > 0;

if (isConnected) return false;

Expand All @@ -291,7 +284,7 @@ export class ConnectionManager {
);

if (isBootstrap) {
const currentBootstrapConnections = this.libp2pComponents
const currentBootstrapConnections = this.libp2p
.getConnections()
.filter((conn) => {
conn.tags.find((name) => name === Tags.BOOTSTRAP);
Expand All @@ -309,9 +302,12 @@ export class ConnectionManager {
* Fetches the tag names for a given peer
*/
private async getTagNamesForPeer(peerId: PeerId): Promise<string[]> {
const tags = (await this.libp2pComponents.peerStore.getTags(peerId)).map(
(tag) => tag.name
);
return tags;
try {
const peer = await this.libp2p.peerStore.get(peerId);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context: Now all methods on PersistentPeerStore that libp2p uses are throwing so it is up to us how we handle that.

return Array.from(peer.tags.keys());
} catch (error) {
log(`Failed to get peer ${peerId}, error: ${error}`);
return [];
}
}
}
8 changes: 4 additions & 4 deletions packages/core/src/lib/filter/v1/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type {
Expand All @@ -9,6 +8,7 @@ import type {
IDecodedMessage,
IDecoder,
IFilter,
Libp2p,
ProtocolCreateOptions,
ProtocolOptions,
} from "@waku/interfaces";
Expand Down Expand Up @@ -50,11 +50,11 @@ class Filter extends BaseProtocol implements IFilter {
options: ProtocolCreateOptions;
private subscriptions: Map<RequestID, unknown>;

constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodec, libp2p.peerStore, libp2p.getConnections.bind(libp2p));
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodec, libp2p.components);
this.options = options ?? {};
this.subscriptions = new Map();
this.libp2p
libp2p
.handle(this.multicodec, this.onRequest.bind(this))
.catch((e) => log("Failed to register filter protocol", e));
}
Expand Down
18 changes: 6 additions & 12 deletions packages/core/src/lib/filter/v2/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
Expand All @@ -12,6 +11,7 @@ import type {
IFilterV2,
IProtoMessage,
IReceiver,
Libp2p,
PeerIdStr,
ProtocolCreateOptions,
ProtocolOptions,
Expand Down Expand Up @@ -245,18 +245,12 @@ class FilterV2 extends BaseProtocol implements IReceiver {
return subscription;
}

constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(
FilterV2Codecs.SUBSCRIBE,
libp2p.peerStore,
libp2p.getConnections.bind(libp2p)
);
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterV2Codecs.SUBSCRIBE, libp2p.components);

this.libp2p
.handle(FilterV2Codecs.PUSH, this.onRequest.bind(this))
.catch((e) => {
log("Failed to register ", FilterV2Codecs.PUSH, e);
});
libp2p.handle(FilterV2Codecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log("Failed to register ", FilterV2Codecs.PUSH, e);
});

this.activeSubscriptions = new Map();

Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/lib/keep_alive_manager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import type { IRelay } from "@waku/interfaces";
import debug from "debug";
import type { Libp2p } from "libp2p";
import type { PingService } from "libp2p/ping";

import { createEncoder } from "../index.js";

Expand All @@ -26,7 +26,7 @@ export class KeepAliveManager {
this.relay = relay;
}

public start(peerId: PeerId, libp2pPing: Libp2p["ping"]): void {
public start(peerId: PeerId, libp2pPing: PingService): void {
// Just in case a timer already exist for this peer
this.stop(peerId);

Expand All @@ -37,7 +37,7 @@ export class KeepAliveManager {

if (pingPeriodSecs !== 0) {
const interval = setInterval(() => {
libp2pPing(peerId).catch((e) => {
libp2pPing.ping(peerId).catch((e) => {
log(`Ping failed (${peerIdStr})`, e);
});
}, pingPeriodSecs * 1000);
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import {
IEncoder,
ILightPush,
IMessage,
Libp2p,
ProtocolCreateOptions,
ProtocolOptions,
SendError,
Expand Down Expand Up @@ -33,8 +33,8 @@ export { PushResponse };
class LightPush extends BaseProtocol implements ILightPush {
options: ProtocolCreateOptions;

constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.peerStore, libp2p.getConnections.bind(libp2p));
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
this.options = options || {};
}

Expand Down
Loading