From ff3d6b5d419747d23638d382253631c86b9f2e94 Mon Sep 17 00:00:00 2001 From: Joshua Karp Date: Mon, 1 Nov 2021 15:22:48 +1100 Subject: [PATCH] Changing getConnectionToNode structure to match ObjectMap pattern --- src/nodes/NodeManager.ts | 92 ++++++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 41 deletions(-) diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 2cda1cf1bb..6aa25fc3eb 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -26,7 +26,7 @@ import * as claimsUtils from '../claims/utils'; import * as agentPB from '../proto/js/Agent_pb'; import { GRPCClientAgent } from '../agent'; import { ForwardProxy, ReverseProxy } from '../network'; -import { Mutex } from 'async-mutex'; +import { Mutex, MutexInterface } from 'async-mutex'; import { CreateDestroyStartStop, ready, @@ -421,68 +421,78 @@ class NodeManager { /** * Treat this node as the client, and attempt to create/retrieve an existing * undirectional connection to another node (server). + * ObjectMap pattern adapted from: + * https://gist.github.com/CMCDragonkai/f58f08e7eaab0430ed4467ca35527a42 */ @ready(new nodesErrors.ErrorNodeManagerNotStarted()) public async getConnectionToNode( targetNodeId: NodeId, ): Promise { - const connLock = this.connections.get(targetNodeId); - // If there's already an entry in the map, we have 2 cases: - // 1. The connection already exists - // 2. The connection is currently being created by another concurrent thread - if (connLock != null) { - // Return the connection if it already exists - if (connLock.connection != null) { - return connLock.connection; + let connection: NodeConnection | undefined; + let lock: MutexInterface; + let connAndLock = this.connections.get(targetNodeId); + if (connAndLock != null) { + ({ connection, lock } = connAndLock); + if (connection != null) { + return connection; } - // Otherwise, it's expected to be currently being created by some other thread - // Wait for the lock to release let release; try { - release = await connLock.lock.acquire(); + release = await lock.acquire(); + ({ connection, lock } = connAndLock); + if (connection != null) { + return connection; + } + connection = await this.establishNodeConnection(targetNodeId, lock); + connAndLock.connection = connection; + return connection; } finally { release(); } - // Once the lock is released, then it's sufficient to recursively call the - // function. It will most likely enter the case where we already have an - // entry in the map (or, an error occurred, and the entry is removed - in - // which case, this thread will create the connection). - return await this.getConnectionToNode(targetNodeId); - - // Otherwise, we need to create an entry } else { - const lock = new Mutex(); - this.connections.set(targetNodeId, { lock }); + lock = new Mutex(); + connAndLock = { lock }; + this.connections.set(targetNodeId, connAndLock); let release; try { release = await lock.acquire(); - const targetAddress = await this.findNode(targetNodeId); - const connection = await NodeConnection.createNodeConnection({ - targetNodeId: targetNodeId, - targetHost: targetAddress.ip, - targetPort: targetAddress.port, - forwardProxy: this.fwdProxy, - keyManager: this.keyManager, - logger: this.logger, - }); - await connection.start({ - brokerConnections: this.brokerNodeConnections, - }); - // Add it to the map of active connections - this.connections.set(targetNodeId, { connection, lock }); + connection = await this.establishNodeConnection(targetNodeId, lock); + connAndLock.connection = connection; return connection; - } catch (e) { - // We need to make sure to delete any added lock if we encounter an error - // Otherwise, we can enter a state where we have a lock in the map, but - // no NodeConnection being created - this.connections.delete(targetNodeId); - throw e; } finally { release(); } } } + + /** + * Strictly a helper function for this.getConnectionToNode. Do not call this + * function anywhere else. + * To create a connection to a node, always use getConnectionToNode. + */ + @ready(new nodesErrors.ErrorNodeManagerNotStarted()) + protected async establishNodeConnection( + targetNodeId: NodeId, + lock: MutexInterface + ): Promise { + const targetAddress = await this.findNode(targetNodeId); + const connection = await NodeConnection.createNodeConnection({ + targetNodeId: targetNodeId, + targetHost: targetAddress.ip, + targetPort: targetAddress.port, + forwardProxy: this.fwdProxy, + keyManager: this.keyManager, + logger: this.logger, + }); + await connection.start({ + brokerConnections: this.brokerNodeConnections, + }); + // Add it to the map of active connections + this.connections.set(targetNodeId, { connection, lock }); + return connection; + } + /** * Create and start a connection to a broker node. Assumes that a direct * connection to the broker can be established (i.e. no hole punching required).