Skip to content

Commit

Permalink
feat: multi-node connection
Browse files Browse the repository at this point in the history
Related #483

[ci skip]
  • Loading branch information
tegefaulkes committed Nov 3, 2022
1 parent 58463f3 commit 7617607
Show file tree
Hide file tree
Showing 8 changed files with 370 additions and 173 deletions.
4 changes: 4 additions & 0 deletions src/network/ConnectionForward.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,10 @@ class ConnectionForward extends Connection {
} else {
ctx.signal.addEventListener('abort', () => resolveAbortedP());
}
void ctx.timer.then(
() => resolveAbortedP(),
() => {},
);
this.resolveReadyP = resolveReadyP;
this.utpSocket.on('message', this.handleMessage);
const handleStartError = (e) => {
Expand Down
24 changes: 24 additions & 0 deletions src/network/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { Host, Hostname, Port, Address, NetworkMessage } from './types';
import type { Certificate, PublicKey } from '../keys/types';
import type { NodeId } from '../ids/types';
import type { ContextTimed } from '../contexts/types';
import type { NodeAddress } from 'nodes/types';
import { Buffer } from 'buffer';
import dns from 'dns';
import { IPv4, IPv6, Validator } from 'ip-num';
Expand Down Expand Up @@ -484,6 +485,28 @@ function verifyClientCertificateChain(certChain: Array<Certificate>): void {
}
}

async function resolveAddresses(addresses: Array<NodeAddress>) {
const existingAddresses: Set<string> = new Set();
const final: Array<{ host: Host; port: Port }> = [];
for (const address of addresses) {
if (isHost(address.host)) {
if (existingAddresses.has(`${address.host}|${address.port}`)) continue;
final.push({ host: address.host, port: address.port });
existingAddresses.add(`${address.host}|${address.port}`);
continue;
}
const resolvedAddresses = await resolveHostname(address.host);
for (const resolvedHost of resolvedAddresses) {
const newAddress = { host: resolvedHost, port: address.port };
if (!Validator.isValidIPv4String(resolvedHost)[0]) continue;
if (existingAddresses.has(`${resolvedHost}|${address.port}`)) continue;
final.push(newAddress);
existingAddresses.add(`${resolvedHost}|${address.port}`);
}
}
return final;
}

export {
pingBuffer,
pongBuffer,
Expand All @@ -503,4 +526,5 @@ export {
getCertificateChain,
verifyServerCertificateChain,
verifyClientCertificateChain,
resolveAddresses,
};
182 changes: 139 additions & 43 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import Logger from '@matrixai/logger';
import { ready, StartStop } from '@matrixai/async-init/dist/StartStop';
import { IdInternal } from '@matrixai/id';
import { status } from '@matrixai/async-init';
import { LockBox, RWLockWriter } from '@matrixai/async-locks';
import { LockBox, Lock, Semaphore } from '@matrixai/async-locks';
import { Timer } from '@matrixai/timer';
import NodeConnection from './NodeConnection';
import * as nodesUtils from './utils';
Expand All @@ -29,13 +29,15 @@ import GRPCClientAgent from '../agent/GRPCClientAgent';
import * as validationUtils from '../validation/utils';
import * as networkUtils from '../network/utils';
import * as nodesPB from '../proto/js/polykey/v1/nodes/nodes_pb';
import { never } from '../utils';
import { never, promise } from '../utils';
import { resolveAddresses } from '../network/utils';

// TODO: check all locking and add cancellation for it.

type ConnectionAndTimer = {
connection: NodeConnection<GRPCClientAgent>;
timer: NodeJS.Timer;
timer: Timer | null;
usageCount: number;
};

interface NodeConnectionManager extends StartStop {}
Expand Down Expand Up @@ -81,7 +83,7 @@ class NodeConnectionManager {
* NodeIds can't be used to properly retrieve a value from the map.
*/
protected connections: Map<NodeIdString, ConnectionAndTimer> = new Map();
protected connectionLocks: LockBox<RWLockWriter> = new LockBox();
protected connectionLocks: LockBox<Lock> = new LockBox();
// Tracks the backoff period for offline nodes
protected nodesBackoffMap: Map<
string,
Expand Down Expand Up @@ -168,29 +170,29 @@ class NodeConnectionManager {
ctx?: Partial<ContextTimed>,
): Promise<ResourceAcquire<NodeConnection<GRPCClientAgent>>> {
return async () => {
const { connection, timer: timeToLiveTimer } = await this.getConnection(
targetNodeId,
ctx,
);
// Acquire the read lock and the release function
// FIXME: race the abortion
const [release] = await this.connectionLocks.lock([
targetNodeId.toString(),
RWLockWriter,
'write',
])();
// Resetting TTL timer
timeToLiveTimer?.refresh();
const connectionAndTimer = await this.getConnection(targetNodeId, ctx);
// Increment usage count, and cancel timer
connectionAndTimer.usageCount += 1;
connectionAndTimer.timer?.cancel();
connectionAndTimer.timer = null;
// Return tuple of [ResourceRelease, Resource]
return [
async (e) => {
await release();
if (nodesUtils.isConnectionError(e)) {
// Error with connection, shutting connection down
await this.destroyConnection(targetNodeId);
} else {
// Decrement usage count and set up TTL if needed
connectionAndTimer.usageCount -= 1;
if (connectionAndTimer.usageCount <= 0) {
connectionAndTimer.timer = new Timer({
handler: async () => await this.destroyConnection(targetNodeId),
delay: this.connTimeoutTime,
});
}
}
},
connection,
connectionAndTimer.connection,
];
};
}
Expand Down Expand Up @@ -280,7 +282,7 @@ class NodeConnectionManager {
): Promise<ConnectionAndTimer> {
const targetNodeIdString = targetNodeId.toString() as NodeIdString;
return await this.connectionLocks.withF(
[targetNodeIdString, RWLockWriter, 'write'],
[targetNodeIdString, Lock],
async () => {
const connAndTimer = this.connections.get(targetNodeIdString);
if (connAndTimer != null) return connAndTimer;
Expand Down Expand Up @@ -312,35 +314,66 @@ class NodeConnectionManager {
await this.destroyConnection(targetNodeId);
};
// Creating new connection
const newConnection = await NodeConnection.createNodeConnection(
{
targetNodeId: targetNodeId,
targetHost: targetHosts[0],
targetHostname: targetHostname,
targetPort: targetAddress.port,
proxy: this.proxy,
keyManager: this.keyManager,
nodeConnectionManager: this,
destroyCallback,
logger: this.logger.getChild(
`${NodeConnection.name} ${targetHosts[0]}:${targetAddress.port}`,
),
clientFactory: async (args) =>
GRPCClientAgent.createGRPCClientAgent(args),
},
ctx,
);
const firstResolvedIndex = promise<number>();
const cleanUpReason = Symbol('clean up');
const connectionPromises = targetHosts.map((host, index) => {
return NodeConnection.createNodeConnection(
{
targetNodeId: targetNodeId,
targetHost: host,
targetHostname: targetHostname,
targetPort: targetAddress.port,
proxy: this.proxy,
keyManager: this.keyManager,
nodeConnectionManager: this,
destroyCallback,
logger: this.logger.getChild(
`${NodeConnection.name} ${nodesUtils.encodeNodeId(
targetNodeId,
)}:${targetAddress.port}`,
),
clientFactory: async (args) =>
GRPCClientAgent.createGRPCClientAgent(args),
},
ctx,
).finally(() => {
// Resolve the index of the first successful connection
firstResolvedIndex.resolveP(index);
});
});
// Race the first connection
const newConnection = await Promise.race(connectionPromises);
for (const prom of connectionPromises) {
const index = connectionPromises.indexOf(prom);
// Clean up remaining connections
if (index !== (await firstResolvedIndex.p)) {
prom.cancel(cleanUpReason);
await prom.then(
async (connection) => {
// Destroy any extra connections that succeeded.
// DestroyCallback is a NOP if the connection is not in
// the connection map.
await connection.destroy();
},
() => {
// Ignore rejections
},
);
}
}
// We can assume connection was established and destination was valid,
// we can add the target to the nodeGraph
await this.nodeManager?.setNode(targetNodeId, targetAddress);
// Creating TTL timeout
const timeToLiveTimer = setTimeout(async () => {
await this.destroyConnection(targetNodeId);
}, this.connTimeoutTime);
const timeToLiveTimer = new Timer({
handler: async () => await this.destroyConnection(targetNodeId),
delay: this.connTimeoutTime,
});

const newConnAndTimer: ConnectionAndTimer = {
connection: newConnection,
timer: timeToLiveTimer,
usageCount: 0,
};
this.connections.set(targetNodeIdString, newConnAndTimer);
return newConnAndTimer;
Expand All @@ -355,13 +388,13 @@ class NodeConnectionManager {
protected async destroyConnection(targetNodeId: NodeId): Promise<void> {
const targetNodeIdString = targetNodeId.toString() as NodeIdString;
return await this.connectionLocks.withF(
[targetNodeIdString, RWLockWriter, 'write'],
[targetNodeIdString, Lock],
async () => {
const connAndTimer = this.connections.get(targetNodeIdString);
if (connAndTimer?.connection == null) return;
await connAndTimer.connection.destroy();
// Destroying TTL timer
if (connAndTimer.timer != null) clearTimeout(connAndTimer.timer);
if (connAndTimer.timer != null) connAndTimer.timer.cancel();
// Updating the connection map
this.connections.delete(targetNodeIdString);
},
Expand Down Expand Up @@ -825,6 +858,69 @@ class NodeConnectionManager {
return true;
}

public establishMultiConnection(
nodeIds: Array<NodeId>,
addresses: Array<NodeAddress>,
connectionTimeout?: number,
limit?: number,
ctx?: Partial<ContextTimed>,
): PromiseCancellable<Map<NodeId, { host: Host; port: Port }>>;
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
@timedCancellable(true)
public async establishMultiConnection(
nodeIds: Array<NodeId>,
addresses: Array<NodeAddress>,
connectionTimeout: number = 2000,
limit: number | undefined,
@context ctx: ContextTimed,
): Promise<Map<NodeId, { host: Host; port: Port }>> {
// Get the full list of addresses by flattening results
const addresses_ = await resolveAddresses(addresses);
// We want to establish forward connections to each address
const pendingConnectionProms: Array<Promise<void>> = [];
const semaphore = limit != null ? new Semaphore(limit) : null;
const establishedMap: Map<NodeId, { host: Host; port: Port }> = new Map();
const cleanUpReason = Symbol('CleanUp');
const abortController = new AbortController();
const abort = () => abortController.abort(ctx.signal.reason);
ctx.signal.addEventListener('abort', abort);
const signal = abortController.signal;
for (const address of addresses_) {
if (semaphore != null) await semaphore.waitForUnlock();
if (signal.aborted) break;
const [semaphoreReleaser] =
semaphore != null ? await semaphore.lock()() : [() => {}];
const timer = new Timer({ delay: connectionTimeout });
const connectionProm = this.proxy
.openConnectionForward(nodeIds, address.host as Host, address.port, {
signal,
timer,
})
.then(
(nodeId) => {
// Connection established, add it to the map
establishedMap.set(nodeId, address);
// Check if all nodes are established and trigger clean up
if (establishedMap.size >= nodeIds.length) {
abortController.abort(cleanUpReason);
}
},
() => {
// Connection failed, ignore error
},
)
.finally(async () => {
// Clean up
await semaphoreReleaser();
timer.cancel(cleanUpReason);
});
pendingConnectionProms.push(connectionProm);
}
await Promise.all(pendingConnectionProms);
ctx.signal.removeEventListener('abort', abort);
return establishedMap;
}

protected hasBackoff(nodeId: NodeId): boolean {
const backoff = this.nodesBackoffMap.get(nodeId.toString());
if (backoff == null) return false;
Expand Down
Loading

0 comments on commit 7617607

Please sign in to comment.