Skip to content

Commit

Permalink
feat: refactoring nat signalling logic
Browse files Browse the repository at this point in the history
[ci skip]
  • Loading branch information
tegefaulkes committed Oct 23, 2023
1 parent 5b01c9e commit 8ccacd3
Show file tree
Hide file tree
Showing 19 changed files with 1,047 additions and 695 deletions.
213 changes: 117 additions & 96 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { LockRequest } from '@matrixai/async-locks';
import type { ResourceAcquire } from '@matrixai/resources';
import type { ContextTimedInput, ContextTimed } from '@matrixai/contexts';
import type { PromiseCancellable } from '@matrixai/async-cancellable';
import type { ClientCryptoOps, QUICConnection } from '@matrixai/quic';
import type NodeGraph from './NodeGraph';
import type {
Expand All @@ -21,15 +20,15 @@ import type {
TLSConfig,
} from '../network/types';
import type { ServerManifest } from '@matrixai/rpc';
import type { HolePunchRelayMessage } from './agent/types';
import Logger from '@matrixai/logger';
import { withF } from '@matrixai/resources';
import { ready, StartStop } from '@matrixai/async-init/dist/StartStop';
import { IdInternal } from '@matrixai/id';
import { Lock, LockBox } from '@matrixai/async-locks';
import { Lock, LockBox, Semaphore } from '@matrixai/async-locks';
import { Timer } from '@matrixai/timer';
import { timedCancellable, context } from '@matrixai/contexts/dist/decorators';
import { AbstractEvent, EventAll } from '@matrixai/events';
import { PromiseCancellable } from '@matrixai/async-cancellable';
import {
QUICSocket,
QUICServer,
Expand All @@ -43,11 +42,11 @@ import * as nodesUtils from './utils';
import * as nodesErrors from './errors';
import * as nodesEvents from './events';
import manifestClientAgent from './agent/callers';
import * as ids from '../ids';
import * as keysUtils from '../keys/utils';
import * as networkUtils from '../network/utils';
import * as utils from '../utils';
import config from '../config';
import RateLimiter from '../rateLimiter/RateLimiter';

type ManifestClientAgent = typeof manifestClientAgent;

Expand Down Expand Up @@ -129,6 +128,22 @@ class NodeConnectionManager {
* Default timeout for RPC handlers
*/
public readonly rpcCallTimeoutTime: number;
/**
* Used to track active hole punching attempts
*/
protected activePunchMap = new Map<string, PromiseCancellable<void>>();
/**
* Used to rate limit punch attempts per IP Address
*/
protected activeAddressMap = new Map<string, Semaphore>();
/**
* Used track active signalling attempts
*/
protected activeSignalSet = new Set<PromiseCancellable<void>>();
/**
* Used to limit signalling requests on a per requester basis
*/
protected rateLimiter = new RateLimiter(60000, 20, 10, 1);

protected logger: Logger;
protected keyRing: KeyRing;
Expand Down Expand Up @@ -456,11 +471,13 @@ class NodeConnectionManager {
this.handleEventQUICServerConnection,
);
this.quicSocket.addEventListener(EventAll.name, this.handleEventAll);
this.rateLimiter.startRefillInterval();
this.logger.info(`Started ${this.constructor.name}`);
}

public async stop() {
this.logger.info(`Stop ${this.constructor.name}`);
this.rateLimiter.stop();

this.removeEventListener(
nodesEvents.EventNodeConnectionManagerError.name,
Expand Down Expand Up @@ -503,6 +520,16 @@ class NodeConnectionManager {
destroyProms.push(destroyProm);
}
await Promise.all(destroyProms);
const signallingProms: Array<PromiseCancellable<void>> = [];
for (const [, activePunch] of this.activePunchMap) {
signallingProms.push(activePunch);
activePunch.cancel();
}
for (const activeSignal of this.activeSignalSet) {
signallingProms.push(activeSignal);
activeSignal.cancel();
}
await Promise.allSettled(signallingProms);
await this.quicServer.stop({ force: true });
await this.quicSocket.stop({ force: true });
await this.rpcServer.stop({ force: true });
Expand Down Expand Up @@ -1074,11 +1101,11 @@ class NodeConnectionManager {
* @param port port of the target client.
* @param ctx
*/
public async holePunchReverse(
public holePunchReverse(
host: Host,
port: Port,
ctx?: Partial<ContextTimed>,
): Promise<void>;
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<void>;
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
@timedCancellable(
true,
Expand Down Expand Up @@ -1378,107 +1405,105 @@ class NodeConnectionManager {
}

/**
* Performs an RPC request to send a hole-punch message to the target. Used to
* initially establish the NodeConnection from source to target.
* This is used by the `nodesHolePunchRequestHandler` to initiate the hole punch procedure.
*
* @param relayNodeId node ID of the relay node (i.e. the seed node)
* @param sourceNodeId node ID of the current node (i.e. the sender)
* @param targetNodeId node ID of the target node to hole punch
* @param address
* @param ctx
* Will validate the message, and initiate hole punching in the background and return immediately
*/
public sendSignalingMessage(
relayNodeId: NodeId,
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
public handleNodesConnectionSignalFinal(host: Host, port: Port) {
const id = `${host}:${port}`;
if (this.activePunchMap.has(id)) return;
// Checking for resource semaphore
let semaphore: Semaphore | undefined = this.activeAddressMap.get(host);
if (semaphore == null) {
semaphore = new Semaphore(3);
this.activeAddressMap.set(host, semaphore);
}
const holePunchAttempt = new PromiseCancellable<void>(
async (res, rej, signal) => {
await semaphore!.withF(async () => {
this.holePunchReverse(host, port, { signal })
.finally(() => {
this.activePunchMap.delete(id);
if (semaphore!.count === 0) {
this.activeAddressMap.delete(host);
}
})
.then(res, rej);
});
},
);
this.activePunchMap.set(id, holePunchAttempt);
}

/**
* The handler used by the RPC to process signalling requests
* @param sourceNodeId - NodeId of the node making the request. Used for rate limiting.
* @param targetNodeId - NodeId of the node that needs to initiate hole punching.
* @param address - Address the target needs to punch to.
*/
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
public handleNodesConnectionSignalInitial(
sourceNodeId: NodeId,
targetNodeId: NodeId,
address?: NodeAddress,
ctx?: Partial<ContextTimed>,
address: NodeAddress,
) {
// Need to get the connection details of the requester and add it to the message.
// Then send the message to the target.
// This would only function with existing connections
if (!this.hasConnection(targetNodeId)) {
throw Error('TMP, No active connection with target');
}
// Do other checks.
const sourceNodeIdString = sourceNodeId.toString();
if (!this.rateLimiter.consume(sourceNodeIdString)) {
throw Error('TMP, attempt was blocked by rate limit');
}
const connProm = this.withConnF(targetNodeId, async (conn) => {
const client = conn.getClient();
await client.methods.nodesConnectionSignalFinal({
address,
});
}).finally(() => {
this.activeSignalSet.delete(connProm);
});
this.activeSignalSet.add(connProm);
}

/**
* This till ask a signalling node to signal a target node to hole punch back to this node.
* @param targetNodeId - NodeId of the node that needs to signal back.
* @param signallingNodeId - NodeId of the signalling node.
* @param ctx
*/
public holePunchSignalRequest(
targetNodeId: NodeId,
signallingNodeId: NodeId,
ctx?: Partial<ContextTimedInput>,
): PromiseCancellable<void>;
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
@timedCancellable(
true,
(nodeConnectionManager: NodeConnectionManager) =>
nodeConnectionManager.connectionConnectTimeoutTime,
)
public async sendSignalingMessage(
relayNodeId: NodeId,
sourceNodeId: NodeId,
public async holePunchSignalRequest(
targetNodeId: NodeId,
address: NodeAddress | undefined,
signallingNodeId: NodeId,
@context ctx: ContextTimed,
): Promise<void> {
if (
this.keyRing.getNodeId().equals(relayNodeId) ||
this.keyRing.getNodeId().equals(targetNodeId)
) {
// Logging and silently dropping operation
this.logger.debug(
'Attempted to send signaling message to our own NodeId',
);
return;
}
const rlyNode = nodesUtils.encodeNodeId(relayNodeId);
const srcNode = nodesUtils.encodeNodeId(sourceNodeId);
const tgtNode = nodesUtils.encodeNodeId(targetNodeId);
const addressString =
address != null ? `, address: ${address.host}:${address.port}` : '';
this.logger.debug(
`sendSignalingMessage sending Signaling message relay: ${rlyNode}, source: ${srcNode}, target: ${tgtNode}${addressString}`,
);
// Send message and ignore any error
await this.withConnF(
relayNodeId,
async (connection) => {
const client = connection.getClient();
await client.methods.nodesHolePunchMessageSend(
signallingNodeId,
async (conn) => {
const client = conn.getClient();
await client.methods.nodesConnectionSignalInitial(
{
srcIdEncoded: srcNode,
dstIdEncoded: tgtNode,
address,
targetNodeIdEncoded: nodesUtils.encodeNodeId(targetNodeId),
},
ctx,
);
},
ctx,
).catch(() => {});
}

/**
* Forwards a received hole punch message on to the target.
* If not known, the node ID -> address mapping is attempted to be discovered
* through Kademlia (note, however, this is currently only called by a 'broker'
* node).
* @param message the original relay message (assumed to be created in
* nodeConnection.start())
* @param sourceAddress
* @param ctx
*/
public relaySignalingMessage(
message: HolePunchRelayMessage,
sourceAddress: NodeAddress,
ctx?: Partial<ContextTimed>,
): PromiseCancellable<void>;
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
@timedCancellable(
true,
(nodeConnectionManager: NodeConnectionManager) =>
nodeConnectionManager.connectionConnectTimeoutTime,
)
public async relaySignalingMessage(
message: HolePunchRelayMessage,
sourceAddress: NodeAddress,
@context ctx: ContextTimed,
): Promise<void> {
// First check if we already have an existing ID -> address record
// If we're relaying then we trust our own node graph records over
// what was provided in the message
const sourceNode = ids.parseNodeId(message.srcIdEncoded);
await this.sendSignalingMessage(
ids.parseNodeId(message.dstIdEncoded),
sourceNode,
ids.parseNodeId(message.dstIdEncoded),
sourceAddress,
ctx,
);
}

Expand Down Expand Up @@ -1657,19 +1682,15 @@ class NodeConnectionManager {
const allProms: Array<Promise<Array<void>>> = [];
for (const targetNodeId of targetNodeIds) {
if (!this.isSeedNode(targetNodeId)) {
// Ask seed nodes to signal hole punching for target
const holePunchProms = seedNodes.map((seedNodeId) => {
return (
this.sendSignalingMessage(
seedNodeId,
this.keyRing.getNodeId(),
targetNodeId,
undefined,
ctx,
)
this.holePunchSignalRequest(targetNodeId, seedNodeId, ctx)
// Ignore results
.then(
() => {},
() => {},
(e) =>
this.logger.debug(`signal request failed with ${e.message}`),
)
);
});
Expand Down
9 changes: 6 additions & 3 deletions src/nodes/agent/callers/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import nodesClaimsGet from './nodesClaimsGet';
import nodesClosestLocalNodesGet from './nodesClosestLocalNodesGet';
import nodesConnectionSignalFinal from './nodesConnectionSignalFinal';
import nodesConnectionSignalInitial from './nodesConnectionSignalInitial';
import nodesCrossSignClaim from './nodesCrossSignClaim';
import nodesHolePunchMessageSend from './nodesHolePunchMessageSend';
import notificationsSend from './notificationsSend';
import vaultsGitInfoGet from './vaultsGitInfoGet';
import vaultsGitPackGet from './vaultsGitPackGet';
Expand All @@ -13,8 +14,9 @@ import vaultsScan from './vaultsScan';
const manifestClient = {
nodesClaimsGet,
nodesClosestLocalNodesGet,
nodesConnectionSignalFinal,
nodesConnectionSignalInitial,
nodesCrossSignClaim,
nodesHolePunchMessageSend,
notificationsSend,
vaultsGitInfoGet,
vaultsGitPackGet,
Expand All @@ -26,8 +28,9 @@ export default manifestClient;
export {
nodesClaimsGet,
nodesClosestLocalNodesGet,
nodesConnectionSignalFinal,
nodesConnectionSignalInitial,
nodesCrossSignClaim,
nodesHolePunchMessageSend,
notificationsSend,
vaultsGitInfoGet,
vaultsGitPackGet,
Expand Down
12 changes: 12 additions & 0 deletions src/nodes/agent/callers/nodesConnectionSignalFinal.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { HandlerTypes } from '@matrixai/rpc';
import type NodesConnectionSignalFinal from '../handlers/NodesConnectionSignalFinal';
import { UnaryCaller } from '@matrixai/rpc';

type CallerTypes = HandlerTypes<NodesConnectionSignalFinal>;

const nodesConnectionSignalFinal = new UnaryCaller<
CallerTypes['input'],
CallerTypes['output']
>();

export default nodesConnectionSignalFinal;
12 changes: 12 additions & 0 deletions src/nodes/agent/callers/nodesConnectionSignalInitial.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import type { HandlerTypes } from '@matrixai/rpc';
import type NodesConnectionSignalInitial from '../handlers/NodesConnectionSignalInitial';
import { UnaryCaller } from '@matrixai/rpc';

type CallerTypes = HandlerTypes<NodesConnectionSignalInitial>;

const nodesConnectionSignalInitial = new UnaryCaller<
CallerTypes['input'],
CallerTypes['output']
>();

export default nodesConnectionSignalInitial;
12 changes: 0 additions & 12 deletions src/nodes/agent/callers/nodesHolePunchMessageSend.ts

This file was deleted.

Loading

0 comments on commit 8ccacd3

Please sign in to comment.