Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: drop start stop pattern from Network where possible #5588

Merged
merged 2 commits into from
May 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions packages/beacon-node/src/network/core/networkCore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,13 @@ export class NetworkCore implements INetworkCore {
await reqResp.start();

await gossip.start();
attnetsService.start();
syncnetsService.start();

// Network spec decides version changes based on clock fork, not head fork
const forkCurrentSlot = config.getForkName(clock.currentSlot);
// Register only ReqResp protocols relevant to clock's fork
reqResp.registerProtocolsAtFork(forkCurrentSlot);

await peerManager.start();
await peerManager.startDiscovery();

// Bind discv5's ENR to local metadata
discv5 = peerManager["discovery"]?.discv5;
Expand Down Expand Up @@ -256,15 +254,15 @@ export class NetworkCore implements INetworkCore {
// Must goodbye and disconnect before stopping libp2p
await this.peerManager.goodbyeAndDisconnectAllPeers();
this.logger.debug("network sent goodbye to all peers");
await this.peerManager.stop();
await this.peerManager.close();
this.logger.debug("network peerManager closed");
await this.gossip.stop();
this.logger.debug("network gossip closed");
await this.reqResp.stop();
await this.reqResp.unregisterAllProtocols();
this.logger.debug("network reqResp closed");
this.attnetsService.stop();
this.syncnetsService.stop();
this.attnetsService.close();
this.syncnetsService.close();
await this.libp2p.stop();
this.logger.debug("network lib2p closed");

Expand Down
9 changes: 5 additions & 4 deletions packages/beacon-node/src/network/peers/peerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,7 @@ export class PeerManager {
if (metrics) {
metrics.peers.addCollect(() => this.runPeerCountMetrics(metrics));
}
}

async start(): Promise<void> {
await this.discovery?.start();
this.libp2p.connectionManager.addEventListener(Libp2pEvent.peerConnect, this.onLibp2pPeerConnect);
this.libp2p.connectionManager.addEventListener(Libp2pEvent.peerDisconnect, this.onLibp2pPeerDisconnect);
this.networkEventBus.on(NetworkEvent.reqRespRequest, this.onRequest);
Expand All @@ -196,7 +193,11 @@ export class PeerManager {
];
}

async stop(): Promise<void> {
async startDiscovery(): Promise<void> {
await this.discovery?.start();
}

async close(): Promise<void> {
await this.discovery?.stop();
this.libp2p.connectionManager.removeEventListener(Libp2pEvent.peerConnect, this.onLibp2pPeerConnect);
this.libp2p.connectionManager.removeEventListener(Libp2pEvent.peerDisconnect, this.onLibp2pPeerDisconnect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,12 @@ export class AttnetsService implements IAttnetsService {
if (metrics) {
metrics.attnetsService.subscriptionsRandom.addCollect(() => this.onScrapeLodestarMetrics(metrics));
}
}

start(): void {
this.clock.on(ClockEvent.slot, this.onSlot);
this.clock.on(ClockEvent.epoch, this.onEpoch);
}

stop(): void {
close(): void {
this.clock.off(ClockEvent.slot, this.onSlot);
this.clock.off(ClockEvent.epoch, this.onEpoch);
}
Expand Down
3 changes: 1 addition & 2 deletions packages/beacon-node/src/network/subnets/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ export type CommitteeSubscription = {
};

export type SubnetsService = {
start(): void;
stop(): void;
close(): void;
addCommitteeSubscriptions(subscriptions: CommitteeSubscription[]): void;
getActiveSubnets(): RequestedSubnet[];
subscribeSubnetsToNextFork(nextFork: ForkName): void;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,11 @@ export class SyncnetsService implements SubnetsService {
if (metrics) {
metrics.syncnetsService.subscriptionsCommittee.addCollect(() => this.onScrapeLodestarMetrics(metrics));
}
}

start(): void {
this.clock.on(ClockEvent.epoch, this.onEpoch);
}

stop(): void {
close(): void {
this.clock.off(ClockEvent.epoch, this.onEpoch);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ describe("network / peers / PeerManager", function () {
getActiveSubnets: () => [],
shouldProcess: () => true,
addCommitteeSubscriptions: () => {},
start: () => {},
stop: () => {},
close: () => {},
subscribeSubnetsToNextFork: () => {},
unsubscribeSubnetsFromPrevFork: () => {},
};
Expand Down Expand Up @@ -93,7 +92,6 @@ describe("network / peers / PeerManager", function () {
discv5FirstQueryDelayMs: 0,
}
);
await peerManager.start();

return {statusCache, clock, libp2p, reqResp, peerManager, networkEventBus};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ describe("AttnetsService", function () {
randBetweenFn,
shuffleFn: shuffleFn as ShuffleFn,
});
service.start();
});

afterEach(() => {
service.stop();
service.close();
sandbox.restore();
randomSubnet = 0;
});
Expand Down