Skip to content

Commit

Permalink
feat: adding optional timeouts to NodeConnectionManager methods
Browse files Browse the repository at this point in the history
In some cases we want to specify how long we attempt to connect to a node on a per-connection basis.

Related #363
  • Loading branch information
tegefaulkes committed Mar 25, 2022
1 parent eb07281 commit 4c7cf0b
Showing 1 changed file with 42 additions and 15 deletions.
57 changes: 42 additions & 15 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ interface NodeConnectionManager extends StartStop {}
@StartStop()
class NodeConnectionManager {
/**
* Time used to estalish `NodeConnection`
* Time used to establish `NodeConnection`
*/
public readonly connConnectTime: number;

Expand Down Expand Up @@ -122,14 +122,18 @@ class NodeConnectionManager {
* itself is such that we can pass targetNodeId as a parameter (as opposed to
* an acquire function with no parameters).
* @param targetNodeId Id of target node to communicate with
* @param connConnectTime? Optional connection timeout time
* @param address? Optional address to connect to
* @returns ResourceAcquire Resource API for use in with contexts
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async acquireConnection(
targetNodeId: NodeId,
connConnectTime?: number,
address?: NodeAddress,
): Promise<ResourceAcquire<NodeConnection<GRPCClientAgent>>> {
return async () => {
const connAndLock = await this.createConnection(targetNodeId);
const connAndLock = await this.createConnection(targetNodeId, address, connConnectTime);
// Acquire the read lock and the release function
const release = await connAndLock.lock.acquireRead();
// Resetting TTL timer
Expand All @@ -151,15 +155,17 @@ class NodeConnectionManager {
* for use with normal arrow function
* @param targetNodeId Id of target node to communicate with
* @param f Function to handle communication
* @param connConnectTime Optional connection timeout time
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async withConnF<T>(
targetNodeId: NodeId,
f: (conn: NodeConnection<GRPCClientAgent>) => Promise<T>,
connConnectTime?: number,
): Promise<T> {
try {
return await withF(
[await this.acquireConnection(targetNodeId)],
[await this.acquireConnection(targetNodeId, connConnectTime, undefined)],
async ([conn]) => {
return await f(conn);
},
Expand All @@ -184,15 +190,17 @@ class NodeConnectionManager {
* for use with a generator function
* @param targetNodeId Id of target node to communicate with
* @param g Generator function to handle communication
* @param connConnectTime Optional connection timeout time
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async *withConnG<T, TReturn, TNext>(
targetNodeId: NodeId,
g: (
conn: NodeConnection<GRPCClientAgent>,
) => AsyncGenerator<T, TReturn, TNext>,
connConnectTime?: number,
): AsyncGenerator<T, TReturn, TNext> {
const acquire = await this.acquireConnection(targetNodeId);
const acquire = await this.acquireConnection(targetNodeId, connConnectTime, undefined);
const [release, conn] = await acquire();
try {
return yield* await g(conn!);
Expand All @@ -217,10 +225,14 @@ class NodeConnectionManager {
* Create a connection to another node (without performing any function).
* This is a NOOP if a connection already exists.
* @param targetNodeId Id of node we are creating connection to
* @returns ConnectionAndLock that was create or exists in the connection map.
* @param address Optional address to connect to
* @param connConnectTime Optional connection timeout time
* @returns ConnectionAndLock that was create or exists in the connection map
*/
protected async createConnection(
targetNodeId: NodeId,
address?: NodeAddress,
connConnectTime?: number
): Promise<ConnectionAndLock> {
this.logger.info(
`Creating connection to ${nodesUtils.encodeNodeId(targetNodeId)}`,
Expand Down Expand Up @@ -249,7 +261,7 @@ class NodeConnectionManager {
)}`,
);
// Creating the connection and set in map
return await this.establishNodeConnection(targetNodeId, lock);
return await this.establishNodeConnection(targetNodeId, lock, address, connConnectTime);
});
} else {
lock = new RWLock();
Expand All @@ -265,7 +277,7 @@ class NodeConnectionManager {
)}`,
);
// Creating the connection and set in map
return await this.establishNodeConnection(targetNodeId, lock);
return await this.establishNodeConnection(targetNodeId, lock, address, connConnectTime);
});
}
}
Expand All @@ -278,13 +290,17 @@ class NodeConnectionManager {
* This only adds the connection to the connection map if the connection was established.
* @param targetNodeId Id of node we are establishing connection to
* @param lock Lock associated with connection
* @param address Optional address to connect to
* @param connConnectTime Optional connection timeout time
* @returns ConnectionAndLock that was added to the connection map
*/
protected async establishNodeConnection(
targetNodeId: NodeId,
lock: RWLock,
address?: NodeAddress,
connConnectTime?: number,
): Promise<ConnectionAndLock> {
const targetAddress = await this.findNode(targetNodeId);
const targetAddress = address ?? await this.findNode(targetNodeId);
// If the stored host is not a valid host (IP address), then we assume it to
// be a hostname
const targetHostname = !networkUtils.isHost(targetAddress.host)
Expand Down Expand Up @@ -317,7 +333,7 @@ class NodeConnectionManager {
keyManager: this.keyManager,
nodeConnectionManager: this,
destroyCallback,
connConnectTime: this.connConnectTime,
connConnectTime: connConnectTime ?? this.connConnectTime,
logger: this.logger.getChild(`${targetHost}:${targetAddress.port}`),
clientFactory: async (args) =>
GRPCClientAgent.createGRPCClientAgent(args),
Expand Down Expand Up @@ -439,11 +455,13 @@ class NodeConnectionManager {
* port).
* @param targetNodeId ID of the node attempting to be found (i.e. attempting
* to find its IP address and port)
* @param connConnectTime Optional connection timeout time
* @returns whether the target node was located in the process
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async getClosestGlobalNodes(
targetNodeId: NodeId,
connConnectTime?: number,
): Promise<NodeAddress | undefined> {
// Let foundTarget: boolean = false;
let foundAddress: NodeAddress | undefined = undefined;
Expand Down Expand Up @@ -482,7 +500,7 @@ class NodeConnectionManager {
// Add the node to the database so that we can find its address in
// call to getConnectionToNode
await this.nodeGraph.setNode(nextNodeId, nextNodeAddress.address);
await this.createConnection(nextNodeId);
await this.createConnection(nextNodeId, undefined, connConnectTime);
} catch (e) {
// If we can't connect to the node, then skip it
continue;
Expand All @@ -492,6 +510,7 @@ class NodeConnectionManager {
const foundClosest = await this.getRemoteNodeClosestNodes(
nextNodeId,
targetNodeId,
connConnectTime,
);
// Check to see if any of these are the target node. At the same time, add
// them to the shortlist
Expand Down Expand Up @@ -533,12 +552,14 @@ class NodeConnectionManager {
* target node ID.
* @param nodeId the node ID to search on
* @param targetNodeId the node ID to find other nodes closest to it
* @param connConnectTime Optional connection timeout time
* @returns list of nodes and their IP/port that are closest to the target
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async getRemoteNodeClosestNodes(
nodeId: NodeId,
targetNodeId: NodeId,
connConnectTime?: number,
): Promise<Array<[NodeId, NodeData]>> {
// Construct the message
const nodeIdMessage = new nodesPB.Node();
Expand All @@ -565,7 +586,7 @@ class NodeConnectionManager {
}
});
return nodes;
});
}, connConnectTime);
}

/**
Expand All @@ -579,13 +600,14 @@ class NodeConnectionManager {
* This has been removed from start() as there's a chicken-egg scenario
* where we require the NodeGraph instance to be created in order to get
* connections.
* @param connConnectTime Optional connection timeout time
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async syncNodeGraph() {
public async syncNodeGraph(connConnectTime?: number) {
for (const seedNodeId of this.getSeedNodes()) {
// Check if the connection is viable
try {
await this.createConnection(seedNodeId);
await this.createConnection(seedNodeId, undefined, connConnectTime);
} catch (e) {
if (e instanceof nodesErrors.ErrorNodeConnectionTimeout) continue;
throw e;
Expand All @@ -594,6 +616,7 @@ class NodeConnectionManager {
const nodes = await this.getRemoteNodeClosestNodes(
seedNodeId,
this.keyManager.getNodeId(),
connConnectTime,
);
for (const [nodeId, nodeData] of nodes) {
// FIXME: this should be the `nodeManager.setNode`
Expand All @@ -611,6 +634,7 @@ class NodeConnectionManager {
* @param targetNodeId node ID of the target node to hole punch
* @param proxyAddress stringified address of `proxyHost:proxyPort`
* @param signature signature to verify source node is sender (signature based
* @param connConnectTime Optional connection timeout time
* on proxyAddress as message)
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
Expand All @@ -620,6 +644,7 @@ class NodeConnectionManager {
targetNodeId: NodeId,
proxyAddress: string,
signature: Buffer,
connConnectTime?: number,
): Promise<void> {
const relayMsg = new nodesPB.Relay();
relayMsg.setSrcId(nodesUtils.encodeNodeId(sourceNodeId));
Expand All @@ -629,7 +654,7 @@ class NodeConnectionManager {
await this.withConnF(relayNodeId, async (connection) => {
const client = connection.getClient();
await client.nodesHolePunchMessageSend(relayMsg);
});
}, connConnectTime);
}

/**
Expand All @@ -639,15 +664,17 @@ class NodeConnectionManager {
* node).
* @param message the original relay message (assumed to be created in
* nodeConnection.start())
* @param connConnectTime Optional connection timeout time
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async relayHolePunchMessage(message: nodesPB.Relay): Promise<void> {
public async relayHolePunchMessage(message: nodesPB.Relay, connConnectTime?: number,): Promise<void> {
await this.sendHolePunchMessage(
validationUtils.parseNodeId(message.getTargetId()),
validationUtils.parseNodeId(message.getSrcId()),
validationUtils.parseNodeId(message.getTargetId()),
message.getProxyAddress(),
Buffer.from(message.getSignature()),
connConnectTime,
);
}

Expand Down

0 comments on commit 4c7cf0b

Please sign in to comment.