From 4c7cf0bc977ec6762e9374631149ea5258337c23 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Fri, 25 Mar 2022 18:02:09 +1100 Subject: [PATCH] feat: adding optional timeouts to `NodeConnectionManager` methods In some cases we want to specify how long we attempt to connect to a node on a per-connection basis. Related #363 --- src/nodes/NodeConnectionManager.ts | 57 ++++++++++++++++++++++-------- 1 file changed, 42 insertions(+), 15 deletions(-) diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index 3853cc3d5a..234cde2d2e 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -36,7 +36,7 @@ interface NodeConnectionManager extends StartStop {} @StartStop() class NodeConnectionManager { /** - * Time used to estalish `NodeConnection` + * Time used to establish `NodeConnection` */ public readonly connConnectTime: number; @@ -122,14 +122,18 @@ class NodeConnectionManager { * itself is such that we can pass targetNodeId as a parameter (as opposed to * an acquire function with no parameters). * @param targetNodeId Id of target node to communicate with + * @param connConnectTime? Optional connection timeout time + * @param address? Optional address to connect to * @returns ResourceAcquire Resource API for use in with contexts */ @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) public async acquireConnection( targetNodeId: NodeId, + connConnectTime?: number, + address?: NodeAddress, ): Promise>> { return async () => { - const connAndLock = await this.createConnection(targetNodeId); + const connAndLock = await this.createConnection(targetNodeId, address, connConnectTime); // Acquire the read lock and the release function const release = await connAndLock.lock.acquireRead(); // Resetting TTL timer @@ -151,15 +155,17 @@ class NodeConnectionManager { * for use with normal arrow function * @param targetNodeId Id of target node to communicate with * @param f Function to handle communication + * @param connConnectTime Optional connection timeout time */ @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) public async withConnF( targetNodeId: NodeId, f: (conn: NodeConnection) => Promise, + connConnectTime?: number, ): Promise { try { return await withF( - [await this.acquireConnection(targetNodeId)], + [await this.acquireConnection(targetNodeId, connConnectTime, undefined)], async ([conn]) => { return await f(conn); }, @@ -184,6 +190,7 @@ class NodeConnectionManager { * for use with a generator function * @param targetNodeId Id of target node to communicate with * @param g Generator function to handle communication + * @param connConnectTime Optional connection timeout time */ @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) public async *withConnG( @@ -191,8 +198,9 @@ class NodeConnectionManager { g: ( conn: NodeConnection, ) => AsyncGenerator, + connConnectTime?: number, ): AsyncGenerator { - const acquire = await this.acquireConnection(targetNodeId); + const acquire = await this.acquireConnection(targetNodeId, connConnectTime, undefined); const [release, conn] = await acquire(); try { return yield* await g(conn!); @@ -217,10 +225,14 @@ class NodeConnectionManager { * Create a connection to another node (without performing any function). * This is a NOOP if a connection already exists. * @param targetNodeId Id of node we are creating connection to - * @returns ConnectionAndLock that was create or exists in the connection map. + * @param address Optional address to connect to + * @param connConnectTime Optional connection timeout time + * @returns ConnectionAndLock that was create or exists in the connection map */ protected async createConnection( targetNodeId: NodeId, + address?: NodeAddress, + connConnectTime?: number ): Promise { this.logger.info( `Creating connection to ${nodesUtils.encodeNodeId(targetNodeId)}`, @@ -249,7 +261,7 @@ class NodeConnectionManager { )}`, ); // Creating the connection and set in map - return await this.establishNodeConnection(targetNodeId, lock); + return await this.establishNodeConnection(targetNodeId, lock, address, connConnectTime); }); } else { lock = new RWLock(); @@ -265,7 +277,7 @@ class NodeConnectionManager { )}`, ); // Creating the connection and set in map - return await this.establishNodeConnection(targetNodeId, lock); + return await this.establishNodeConnection(targetNodeId, lock, address, connConnectTime); }); } } @@ -278,13 +290,17 @@ class NodeConnectionManager { * This only adds the connection to the connection map if the connection was established. * @param targetNodeId Id of node we are establishing connection to * @param lock Lock associated with connection + * @param address Optional address to connect to + * @param connConnectTime Optional connection timeout time * @returns ConnectionAndLock that was added to the connection map */ protected async establishNodeConnection( targetNodeId: NodeId, lock: RWLock, + address?: NodeAddress, + connConnectTime?: number, ): Promise { - const targetAddress = await this.findNode(targetNodeId); + const targetAddress = address ?? await this.findNode(targetNodeId); // If the stored host is not a valid host (IP address), then we assume it to // be a hostname const targetHostname = !networkUtils.isHost(targetAddress.host) @@ -317,7 +333,7 @@ class NodeConnectionManager { keyManager: this.keyManager, nodeConnectionManager: this, destroyCallback, - connConnectTime: this.connConnectTime, + connConnectTime: connConnectTime ?? this.connConnectTime, logger: this.logger.getChild(`${targetHost}:${targetAddress.port}`), clientFactory: async (args) => GRPCClientAgent.createGRPCClientAgent(args), @@ -439,11 +455,13 @@ class NodeConnectionManager { * port). * @param targetNodeId ID of the node attempting to be found (i.e. attempting * to find its IP address and port) + * @param connConnectTime Optional connection timeout time * @returns whether the target node was located in the process */ @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) public async getClosestGlobalNodes( targetNodeId: NodeId, + connConnectTime?: number, ): Promise { // Let foundTarget: boolean = false; let foundAddress: NodeAddress | undefined = undefined; @@ -482,7 +500,7 @@ class NodeConnectionManager { // Add the node to the database so that we can find its address in // call to getConnectionToNode await this.nodeGraph.setNode(nextNodeId, nextNodeAddress.address); - await this.createConnection(nextNodeId); + await this.createConnection(nextNodeId, undefined, connConnectTime); } catch (e) { // If we can't connect to the node, then skip it continue; @@ -492,6 +510,7 @@ class NodeConnectionManager { const foundClosest = await this.getRemoteNodeClosestNodes( nextNodeId, targetNodeId, + connConnectTime, ); // Check to see if any of these are the target node. At the same time, add // them to the shortlist @@ -533,12 +552,14 @@ class NodeConnectionManager { * target node ID. * @param nodeId the node ID to search on * @param targetNodeId the node ID to find other nodes closest to it + * @param connConnectTime Optional connection timeout time * @returns list of nodes and their IP/port that are closest to the target */ @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) public async getRemoteNodeClosestNodes( nodeId: NodeId, targetNodeId: NodeId, + connConnectTime?: number, ): Promise> { // Construct the message const nodeIdMessage = new nodesPB.Node(); @@ -565,7 +586,7 @@ class NodeConnectionManager { } }); return nodes; - }); + }, connConnectTime); } /** @@ -579,13 +600,14 @@ class NodeConnectionManager { * This has been removed from start() as there's a chicken-egg scenario * where we require the NodeGraph instance to be created in order to get * connections. + * @param connConnectTime Optional connection timeout time */ @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) - public async syncNodeGraph() { + public async syncNodeGraph(connConnectTime?: number) { for (const seedNodeId of this.getSeedNodes()) { // Check if the connection is viable try { - await this.createConnection(seedNodeId); + await this.createConnection(seedNodeId, undefined, connConnectTime); } catch (e) { if (e instanceof nodesErrors.ErrorNodeConnectionTimeout) continue; throw e; @@ -594,6 +616,7 @@ class NodeConnectionManager { const nodes = await this.getRemoteNodeClosestNodes( seedNodeId, this.keyManager.getNodeId(), + connConnectTime, ); for (const [nodeId, nodeData] of nodes) { // FIXME: this should be the `nodeManager.setNode` @@ -611,6 +634,7 @@ class NodeConnectionManager { * @param targetNodeId node ID of the target node to hole punch * @param proxyAddress stringified address of `proxyHost:proxyPort` * @param signature signature to verify source node is sender (signature based + * @param connConnectTime Optional connection timeout time * on proxyAddress as message) */ @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) @@ -620,6 +644,7 @@ class NodeConnectionManager { targetNodeId: NodeId, proxyAddress: string, signature: Buffer, + connConnectTime?: number, ): Promise { const relayMsg = new nodesPB.Relay(); relayMsg.setSrcId(nodesUtils.encodeNodeId(sourceNodeId)); @@ -629,7 +654,7 @@ class NodeConnectionManager { await this.withConnF(relayNodeId, async (connection) => { const client = connection.getClient(); await client.nodesHolePunchMessageSend(relayMsg); - }); + }, connConnectTime); } /** @@ -639,15 +664,17 @@ class NodeConnectionManager { * node). * @param message the original relay message (assumed to be created in * nodeConnection.start()) + * @param connConnectTime Optional connection timeout time */ @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) - public async relayHolePunchMessage(message: nodesPB.Relay): Promise { + public async relayHolePunchMessage(message: nodesPB.Relay, connConnectTime?: number,): Promise { await this.sendHolePunchMessage( validationUtils.parseNodeId(message.getTargetId()), validationUtils.parseNodeId(message.getSrcId()), validationUtils.parseNodeId(message.getTargetId()), message.getProxyAddress(), Buffer.from(message.getSignature()), + connConnectTime, ); }