From 93f53c83afb3e4cdcc7e0d48bc684ecbf5f30af6 Mon Sep 17 00:00:00 2001 From: Emma Casolin Date: Thu, 21 Apr 2022 14:52:22 +1000 Subject: [PATCH] feat: generic `SetNodeQueue` class for queuing `setNode` operations `NodeManager.setNode` and `NodeConnectionManager.syncNodeGraph` now utilise a single, shared queue to asynchronously add nodes to the node graph without blocking the main loop. These methods are both blocking by default but can be made non-blocking by setting the `block` parameter to false. #322 --- src/PolykeyAgent.ts | 29 ++++- src/bootstrap/utils.ts | 4 + src/nodes/NodeConnectionManager.ts | 62 +++++---- src/nodes/NodeManager.ts | 120 ++---------------- src/nodes/SetNodeQueue.ts | 107 ++++++++++++++++ src/nodes/errors.ts | 6 + tests/agent/GRPCClientAgent.test.ts | 7 + tests/agent/service/notificationsSend.test.ts | 8 ++ .../gestaltsDiscoveryByIdentity.test.ts | 9 ++ .../service/gestaltsDiscoveryByNode.test.ts | 9 ++ .../gestaltsGestaltTrustByIdentity.test.ts | 15 ++- .../gestaltsGestaltTrustByNode.test.ts | 15 ++- tests/client/service/identitiesClaim.test.ts | 10 +- tests/client/service/nodesAdd.test.ts | 9 ++ tests/client/service/nodesClaim.test.ts | 13 +- tests/client/service/nodesFind.test.ts | 8 ++ tests/client/service/nodesPing.test.ts | 9 ++ .../client/service/notificationsClear.test.ts | 9 ++ .../client/service/notificationsRead.test.ts | 11 +- .../client/service/notificationsSend.test.ts | 11 +- tests/discovery/Discovery.test.ts | 15 ++- tests/nodes/NodeConnection.test.ts | 7 + .../NodeConnectionManager.general.test.ts | 13 ++ .../NodeConnectionManager.lifecycle.test.ts | 19 +++ .../NodeConnectionManager.seednodes.test.ts | 25 ++++ .../NodeConnectionManager.termination.test.ts | 10 ++ .../NodeConnectionManager.timeout.test.ts | 4 + tests/nodes/NodeManager.test.ts | 71 ++++++++++- .../NotificationsManager.test.ts | 7 + tests/vaults/VaultManager.test.ts | 3 + 30 files changed, 494 insertions(+), 151 deletions(-) create mode 100644 src/nodes/SetNodeQueue.ts diff --git a/src/PolykeyAgent.ts b/src/PolykeyAgent.ts index ddc7ca2cd..2b8951d91 100644 --- a/src/PolykeyAgent.ts +++ b/src/PolykeyAgent.ts @@ -34,6 +34,7 @@ import * as errors from './errors'; import * as utils from './utils'; import * as keysUtils from './keys/utils'; import * as nodesUtils from './nodes/utils'; +import SetNodeQueue from './nodes/SetNodeQueue'; type NetworkConfig = { forwardHost?: Host; @@ -87,6 +88,7 @@ class PolykeyAgent { gestaltGraph, proxy, nodeGraph, + setNodeQueue, nodeConnectionManager, nodeManager, discovery, @@ -132,6 +134,7 @@ class PolykeyAgent { gestaltGraph?: GestaltGraph; proxy?: Proxy; nodeGraph?: NodeGraph; + setNodeQueue?: SetNodeQueue; nodeConnectionManager?: NodeConnectionManager; nodeManager?: NodeManager; discovery?: Discovery; @@ -281,12 +284,18 @@ class PolykeyAgent { keyManager, logger: logger.getChild(NodeGraph.name), })); + setNodeQueue = + setNodeQueue ?? + new SetNodeQueue({ + logger: logger.getChild(SetNodeQueue.name), + }); nodeConnectionManager = nodeConnectionManager ?? new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, seedNodes, ...nodeConnectionManagerConfig_, logger: logger.getChild(NodeConnectionManager.name), @@ -299,6 +308,7 @@ class PolykeyAgent { keyManager, nodeGraph, nodeConnectionManager, + setNodeQueue, logger: logger.getChild(NodeManager.name), }); await nodeManager.start(); @@ -385,6 +395,7 @@ class PolykeyAgent { gestaltGraph, proxy, nodeGraph, + setNodeQueue, nodeConnectionManager, nodeManager, discovery, @@ -417,6 +428,7 @@ class PolykeyAgent { public readonly gestaltGraph: GestaltGraph; public readonly proxy: Proxy; public readonly nodeGraph: NodeGraph; + public readonly setNodeQueue: SetNodeQueue; public readonly nodeConnectionManager: NodeConnectionManager; public readonly nodeManager: NodeManager; public readonly discovery: Discovery; @@ -441,6 +453,7 @@ class PolykeyAgent { gestaltGraph, proxy, nodeGraph, + setNodeQueue, nodeConnectionManager, nodeManager, discovery, @@ -464,6 +477,7 @@ class PolykeyAgent { gestaltGraph: GestaltGraph; proxy: Proxy; nodeGraph: NodeGraph; + setNodeQueue: SetNodeQueue; nodeConnectionManager: NodeConnectionManager; nodeManager: NodeManager; discovery: Discovery; @@ -489,6 +503,7 @@ class PolykeyAgent { this.proxy = proxy; this.discovery = discovery; this.nodeGraph = nodeGraph; + this.setNodeQueue = setNodeQueue; this.nodeConnectionManager = nodeConnectionManager; this.nodeManager = nodeManager; this.vaultManager = vaultManager; @@ -562,10 +577,14 @@ class PolykeyAgent { ); // Reverse connection was established and authenticated, // add it to the node graph - await this.nodeManager.setNode(data.remoteNodeId, { - host: data.remoteHost, - port: data.remotePort, - }); + await this.nodeManager.setNode( + data.remoteNodeId, + { + host: data.remoteHost, + port: data.remotePort, + }, + false, + ); } }, ); @@ -647,6 +666,7 @@ class PolykeyAgent { proxyPort: networkConfig_.proxyPort, tlsConfig, }); + await this.setNodeQueue.start(); await this.nodeManager.start(); await this.nodeConnectionManager.start({ nodeManager: this.nodeManager }); await this.nodeGraph.start({ fresh }); @@ -704,6 +724,7 @@ class PolykeyAgent { await this.nodeConnectionManager.stop(); await this.nodeGraph.stop(); await this.nodeManager.stop(); + await this.setNodeQueue.stop(); await this.proxy.stop(); await this.grpcServerAgent.stop(); await this.grpcServerClient.stop(); diff --git a/src/bootstrap/utils.ts b/src/bootstrap/utils.ts index 422709b01..09aff4586 100644 --- a/src/bootstrap/utils.ts +++ b/src/bootstrap/utils.ts @@ -4,6 +4,7 @@ import path from 'path'; import Logger from '@matrixai/logger'; import { DB } from '@matrixai/db'; import * as bootstrapErrors from './errors'; +import SetNodeQueue from '../nodes/SetNodeQueue'; import { IdentitiesManager } from '../identities'; import { SessionManager } from '../sessions'; import { Status } from '../status'; @@ -141,10 +142,12 @@ async function bootstrapState({ keyManager, logger: logger.getChild(NodeGraph.name), }); + const setNodeQueue = new SetNodeQueue({ logger }); const nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, logger: logger.getChild(NodeConnectionManager.name), }); const nodeManager = new NodeManager({ @@ -153,6 +156,7 @@ async function bootstrapState({ nodeGraph, nodeConnectionManager, sigchain, + setNodeQueue, logger: logger.getChild(NodeManager.name), }); const notificationsManager = diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index f7ea93732..7d6b5d694 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -4,6 +4,7 @@ import type Proxy from '../network/Proxy'; import type { Host, Hostname, Port } from '../network/types'; import type { Timer } from '../types'; import type NodeGraph from './NodeGraph'; +import type SetNodeQueue from './SetNodeQueue'; import type { NodeAddress, NodeData, @@ -60,6 +61,7 @@ class NodeConnectionManager { protected nodeGraph: NodeGraph; protected keyManager: KeyManager; protected proxy: Proxy; + protected setNodeQueue: SetNodeQueue; // NodeManager has to be passed in during start to allow co-dependency protected nodeManager: NodeManager | undefined; protected seedNodes: SeedNodes; @@ -80,6 +82,7 @@ class NodeConnectionManager { keyManager, nodeGraph, proxy, + setNodeQueue, seedNodes = {}, initialClosestNodes = 3, connConnectTime = 20000, @@ -89,6 +92,7 @@ class NodeConnectionManager { nodeGraph: NodeGraph; keyManager: KeyManager; proxy: Proxy; + setNodeQueue: SetNodeQueue; seedNodes?: SeedNodes; initialClosestNodes?: number; connConnectTime?: number; @@ -99,6 +103,7 @@ class NodeConnectionManager { this.keyManager = keyManager; this.nodeGraph = nodeGraph; this.proxy = proxy; + this.setNodeQueue = setNodeQueue; this.seedNodes = seedNodes; this.initialClosestNodes = initialClosestNodes; this.connConnectTime = connConnectTime; @@ -301,7 +306,7 @@ class NodeConnectionManager { }); // We can assume connection was established and destination was valid, // we can add the target to the nodeGraph - await this.nodeManager?.setNode(targetNodeId, targetAddress); + await this.nodeManager?.setNode(targetNodeId, targetAddress, false); // Creating TTL timeout const timeToLiveTimer = setTimeout(async () => { await this.destroyConnection(targetNodeId); @@ -574,18 +579,12 @@ class NodeConnectionManager { /** * Perform an initial database synchronisation: get k of the closest nodes * from each seed node and add them to this database - * For now, we also attempt to establish a connection to each of them. - * If these nodes are offline, this will impose a performance penalty, - * so we should investigate performing this in the background if possible. - * Alternatively, we can also just add the nodes to our database without - * establishing connection. - * This has been removed from start() as there's a chicken-egg scenario - * where we require the NodeGraph instance to be created in order to get - * connections. - * @param timer Connection timeout timer + * Establish a proxy connection to each node before adding it + * By default this operation is blocking, set `block` to false to make it + * non-blocking */ @ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning()) - public async syncNodeGraph(timer?: Timer) { + public async syncNodeGraph(block: boolean = true, timer?: Timer) { for (const seedNodeId of this.getSeedNodes()) { // Check if the connection is viable try { @@ -594,28 +593,43 @@ class NodeConnectionManager { if (e instanceof nodesErrors.ErrorNodeConnectionTimeout) continue; throw e; } - const nodes = await this.getRemoteNodeClosestNodes( seedNodeId, this.keyManager.getNodeId(), timer, ); for (const [nodeId, nodeData] of nodes) { - // FIXME: needs to ping the node right? we want to be non-blocking - try { - // FIXME: no tran needed - await this.nodeManager?.setNode(nodeId, nodeData.address); - } catch (e) { - if (!(e instanceof nodesErrors.ErrorNodeGraphSameNodeId)) throw e; + if (!block) { + this.setNodeQueue.queueSetNode(() => + this.nodeManager!.setNode(nodeId, nodeData.address), + ); + } else { + try { + // FIXME: no tran neededawait this.nodeManager?.setNode(nodeId, nodeData.address); + } catch (e) { + if (!(e instanceof nodesErrors.ErrorNodeGraphSameNodeId)) throw e; + } } } // Refreshing every bucket above the closest node - const [closestNode] = ( - await this.nodeGraph.getClosestNodes(this.keyManager.getNodeId(), 1) - ).pop()!; - const [bucketIndex] = this.nodeGraph.bucketIndex(closestNode); - for (let i = bucketIndex; i < this.nodeGraph.nodeIdBits; i++) { - this.nodeManager?.refreshBucketQueueAdd(i); + if (!block) { + this.setNodeQueue.queueSetNode(async () => { + const [closestNode] = ( + await this.nodeGraph.getClosestNodes(this.keyManager.getNodeId(), 1) + ).pop()!; + const [bucketIndex] = this.nodeGraph.bucketIndex(closestNode); + for (let i = bucketIndex; i < this.nodeGraph.nodeIdBits; i++) { + this.nodeManager?.refreshBucketQueueAdd(i); + } + }); + } else { + const [closestNode] = ( + await this.nodeGraph.getClosestNodes(this.keyManager.getNodeId(), 1) + ).pop()!; + const [bucketIndex] = this.nodeGraph.bucketIndex(closestNode); + for (let i = bucketIndex; i < this.nodeGraph.nodeIdBits; i++) { + this.nodeManager?.refreshBucketQueueAdd(i); + } } } } diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 747ffe4b3..2bc76bccb 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -1,6 +1,7 @@ import type { DB, DBTransaction } from '@matrixai/db'; import type NodeConnectionManager from './NodeConnectionManager'; import type NodeGraph from './NodeGraph'; +import type SetNodeQueue from './SetNodeQueue'; import type KeyManager from '../keys/KeyManager'; import type { PublicKeyPem } from '../keys/types'; import type Sigchain from '../sigchain/Sigchain'; @@ -37,18 +38,7 @@ class NodeManager { protected keyManager: KeyManager; protected nodeConnectionManager: NodeConnectionManager; protected nodeGraph: NodeGraph; - // SetNodeQueue - protected endQueue: boolean = false; - protected setNodeQueue: Array<{ - nodeId: NodeId; - nodeAddress: NodeAddress; - timeout?: number; - }> = []; - protected setNodeQueuePlug: Promise; - protected setNodeQueueUnplug: (() => void) | undefined; - protected setNodeQueueRunner: Promise; - protected setNodeQueueEmpty: Promise; - protected setNodeQueueDrained: () => void; + protected setNodeQueue: SetNodeQueue; // Refresh bucket timer protected refreshBucketDeadlineMap: Map = new Map(); protected refreshBucketTimer: NodeJS.Timer; @@ -67,6 +57,7 @@ class NodeManager { sigchain, nodeConnectionManager, nodeGraph, + setNodeQueue, refreshBucketTimerDefault = 3600000, // 1 hour in milliseconds logger, }: { @@ -75,6 +66,7 @@ class NodeManager { sigchain: Sigchain; nodeConnectionManager: NodeConnectionManager; nodeGraph: NodeGraph; + setNodeQueue: SetNodeQueue; refreshBucketTimerDefault?: number; logger?: Logger; }) { @@ -84,12 +76,12 @@ class NodeManager { this.sigchain = sigchain; this.nodeConnectionManager = nodeConnectionManager; this.nodeGraph = nodeGraph; + this.setNodeQueue = setNodeQueue; this.refreshBucketTimerDefault = refreshBucketTimerDefault; } public async start() { this.logger.info(`Starting ${this.constructor.name}`); - this.setNodeQueueRunner = this.startSetNodeQueue(); this.startRefreshBucketTimers(); this.refreshBucketQueueRunner = this.startRefreshBucketQueue(); this.logger.info(`Started ${this.constructor.name}`); @@ -97,7 +89,6 @@ class NodeManager { public async stop() { this.logger.info(`Stopping ${this.constructor.name}`); - await this.stopSetNodeQueue(); await this.stopRefreshBucketTimers(); await this.stopRefreshBucketQueue(); this.logger.info(`Stopped ${this.constructor.name}`); @@ -400,11 +391,12 @@ class NodeManager { } /** - * Adds a node to the node graph. This assumes that you have already authenticated the node. - * Updates the node if the node already exists. + * Adds a node to the node graph. This assumes that you have already authenticated the node + * Updates the node if the node already exists + * This operation is blocking by default - set `block` to false to make it non-blocking * @param nodeId - Id of the node we wish to add * @param nodeAddress - Expected address of the node we want to add - * @param blocking - Flag for if the operation should block or utilize the async queue + * @param block - Flag for if the operation should block or utilize the async queue * @param force - Flag for if we want to add the node without authenticating or if the bucket is full. * This will drop the oldest node in favor of the new. * @param timeout Connection timeout timeout @@ -414,7 +406,7 @@ class NodeManager { public async setNode( nodeId: NodeId, nodeAddress: NodeAddress, - blocking: boolean = false, + block: boolean = true, force: boolean = false, timeout?: number, tran: DBTransaction, @@ -459,7 +451,7 @@ class NodeManager { // Updating the refreshBucket timer this.refreshBucketUpdateDeadline(bucketIndex); return; - } else if (blocking) { + } else if (block) { this.logger.debug( `Bucket was full and blocking was true, garbage collecting old nodes to add ${nodesUtils.encodeNodeId( nodeId, @@ -478,7 +470,9 @@ class NodeManager { )} to queue`, ); // Re-attempt this later asynchronously by adding the the queue - this.queueSetNode(nodeId, nodeAddress, timeout); + this.setNodeQueue.queueSetNode(() => + this.setNode(nodeId, nodeAddress, true, false, timeout), + ); } } } @@ -557,92 +551,6 @@ class NodeManager { // Return await this.nodeGraph.refreshBuckets(tran); } - // SetNode queue - - /** - * This adds a setNode operation to the queue - */ - private queueSetNode( - nodeId: NodeId, - nodeAddress: NodeAddress, - timeout?: number, - ): void { - this.logger.debug(`Adding ${nodesUtils.encodeNodeId(nodeId)} to queue`); - this.setNodeQueue.push({ - nodeId, - nodeAddress, - timeout, - }); - this.unplugQueue(); - } - - /** - * This starts the process of digesting the queue - */ - private async startSetNodeQueue(): Promise { - this.logger.debug('Starting setNodeQueue'); - this.plugQueue(); - // While queue hasn't ended - while (true) { - // Wait for queue to be unplugged - await this.setNodeQueuePlug; - if (this.endQueue) break; - const job = this.setNodeQueue.shift(); - if (job == null) { - // If the queue is empty then we pause the queue - this.plugQueue(); - continue; - } - // Process the job - this.logger.debug( - `SetNodeQueue processing job for: ${nodesUtils.encodeNodeId( - job.nodeId, - )}`, - ); - await this.setNode(job.nodeId, job.nodeAddress, true, false, job.timeout); - } - this.logger.debug('SetNodeQueue has ended'); - } - - private async stopSetNodeQueue(): Promise { - this.logger.debug('Stopping setNodeQueue'); - // Tell the queue runner to end - this.endQueue = true; - this.unplugQueue(); - // Wait for runner to finish it's current job - await this.setNodeQueueRunner; - } - - private plugQueue(): void { - if (this.setNodeQueueUnplug == null) { - this.logger.debug('Plugging setNodeQueue'); - // Pausing queue - this.setNodeQueuePlug = new Promise((resolve) => { - this.setNodeQueueUnplug = resolve; - }); - // Signaling queue is empty - if (this.setNodeQueueDrained != null) this.setNodeQueueDrained(); - } - } - - private unplugQueue(): void { - if (this.setNodeQueueUnplug != null) { - this.logger.debug('Unplugging setNodeQueue'); - // Starting queue - this.setNodeQueueUnplug(); - this.setNodeQueueUnplug = undefined; - // Signalling queue is running - this.setNodeQueueEmpty = new Promise((resolve) => { - this.setNodeQueueDrained = resolve; - }); - } - } - - @ready(new nodesErrors.ErrorNodeManagerNotRunning()) - public async queueDrained(): Promise { - await this.setNodeQueueEmpty; - } - /** * Kademlia refresh bucket operation. * It picks a random node within a bucket and does a search for that node. diff --git a/src/nodes/SetNodeQueue.ts b/src/nodes/SetNodeQueue.ts new file mode 100644 index 000000000..a405c3418 --- /dev/null +++ b/src/nodes/SetNodeQueue.ts @@ -0,0 +1,107 @@ +import Logger from '@matrixai/logger'; +import { StartStop, ready } from '@matrixai/async-init/dist/StartStop'; +import * as nodesErrors from './errors'; + +interface SetNodeQueue extends StartStop {} +@StartStop() +class SetNodeQueue { + protected logger: Logger; + protected endQueue: boolean = false; + protected setNodeQueue: Array<() => Promise> = []; + protected setNodeQueuePlug: Promise; + protected setNodeQueueUnplug: (() => void) | undefined; + protected setNodeQueueRunner: Promise; + protected setNodeQueueEmpty: Promise; + protected setNodeQueueDrained: () => void; + + constructor({ logger }: { logger?: Logger }) { + this.logger = logger ?? new Logger(this.constructor.name); + } + + public async start() { + this.logger.info(`Starting ${this.constructor.name}`); + this.setNodeQueueRunner = this.startSetNodeQueue(); + this.logger.info(`Started ${this.constructor.name}`); + } + + public async stop() { + this.logger.info(`Stopping ${this.constructor.name}`); + await this.stopSetNodeQueue(); + this.logger.info(`Stopped ${this.constructor.name}`); + } + + /** + * This adds a setNode operation to the queue + */ + public queueSetNode(f: () => Promise): void { + this.setNodeQueue.push(f); + this.unplugQueue(); + } + + /** + * This starts the process of digesting the queue + */ + private async startSetNodeQueue(): Promise { + this.logger.debug('Starting setNodeQueue'); + this.plugQueue(); + // While queue hasn't ended + while (true) { + // Wait for queue to be unplugged + await this.setNodeQueuePlug; + if (this.endQueue) break; + const job = this.setNodeQueue.shift(); + if (job == null) { + // If the queue is empty then we pause the queue + this.plugQueue(); + continue; + } + try { + await job(); + } catch (e) { + if (!(e instanceof nodesErrors.ErrorNodeGraphSameNodeId)) throw e; + } + } + this.logger.debug('setNodeQueue has ended'); + } + + private async stopSetNodeQueue(): Promise { + this.logger.debug('Stopping setNodeQueue'); + // Tell the queue runner to end + this.endQueue = true; + this.unplugQueue(); + // Wait for runner to finish it's current job + await this.setNodeQueueRunner; + } + + private plugQueue(): void { + if (this.setNodeQueueUnplug == null) { + this.logger.debug('Plugging setNodeQueue'); + // Pausing queue + this.setNodeQueuePlug = new Promise((resolve) => { + this.setNodeQueueUnplug = resolve; + }); + // Signaling queue is empty + if (this.setNodeQueueDrained != null) this.setNodeQueueDrained(); + } + } + + private unplugQueue(): void { + if (this.setNodeQueueUnplug != null) { + this.logger.debug('Unplugging setNodeQueue'); + // Starting queue + this.setNodeQueueUnplug(); + this.setNodeQueueUnplug = undefined; + // Signalling queue is running + this.setNodeQueueEmpty = new Promise((resolve) => { + this.setNodeQueueDrained = resolve; + }); + } + } + + @ready(new nodesErrors.ErrorSetNodeQueueNotRunning()) + public async queueDrained(): Promise { + await this.setNodeQueueEmpty; + } +} + +export default SetNodeQueue; diff --git a/src/nodes/errors.ts b/src/nodes/errors.ts index a98fbcaa6..159021b9c 100644 --- a/src/nodes/errors.ts +++ b/src/nodes/errors.ts @@ -12,6 +12,11 @@ class ErrorNodeManagerNotRunning extends ErrorNodes { exitCode = sysexits.USAGE; } +class ErrorSetNodeQueueNotRunning extends ErrorNodes { + static description = 'SetNodeQueue is not running'; + exitCode = sysexits.USAGE; +} + class ErrorNodeGraphRunning extends ErrorNodes { static description = 'NodeGraph is running'; exitCode = sysexits.USAGE; @@ -86,6 +91,7 @@ export { ErrorNodes, ErrorNodeAborted, ErrorNodeManagerNotRunning, + ErrorSetNodeQueueNotRunning, ErrorNodeGraphRunning, ErrorNodeGraphNotRunning, ErrorNodeGraphDestroyed, diff --git a/tests/agent/GRPCClientAgent.test.ts b/tests/agent/GRPCClientAgent.test.ts index 5cc38708a..2a9644055 100644 --- a/tests/agent/GRPCClientAgent.test.ts +++ b/tests/agent/GRPCClientAgent.test.ts @@ -22,6 +22,7 @@ import * as utilsPB from '@/proto/js/polykey/v1/utils/utils_pb'; import * as agentErrors from '@/agent/errors'; import * as keysUtils from '@/keys/utils'; import { timerStart } from '@/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testAgentUtils from './utils'; describe(GRPCClientAgent.name, () => { @@ -49,6 +50,7 @@ describe(GRPCClientAgent.name, () => { let keyManager: KeyManager; let vaultManager: VaultManager; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let nodeManager: NodeManager; let sigchain: Sigchain; @@ -110,10 +112,12 @@ describe(GRPCClientAgent.name, () => { keyManager, logger, }); + setNodeQueue = new SetNodeQueue({ logger }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, logger, }); nodeManager = new NodeManager({ @@ -122,8 +126,10 @@ describe(GRPCClientAgent.name, () => { keyManager: keyManager, nodeGraph: nodeGraph, nodeConnectionManager: nodeConnectionManager, + setNodeQueue, logger: logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); notificationsManager = @@ -178,6 +184,7 @@ describe(GRPCClientAgent.name, () => { await sigchain.stop(); await nodeConnectionManager.stop(); await nodeManager.stop(); + await setNodeQueue.stop(); await nodeGraph.stop(); await gestaltGraph.stop(); await acl.stop(); diff --git a/tests/agent/service/notificationsSend.test.ts b/tests/agent/service/notificationsSend.test.ts index 46a9aa07e..9425743b8 100644 --- a/tests/agent/service/notificationsSend.test.ts +++ b/tests/agent/service/notificationsSend.test.ts @@ -27,6 +27,7 @@ import * as notificationsPB from '@/proto/js/polykey/v1/notifications/notificati import * as keysUtils from '@/keys/utils'; import * as nodesUtils from '@/nodes/utils'; import * as notificationsUtils from '@/notifications/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; import { expectRemoteError } from '../../utils'; @@ -40,6 +41,7 @@ describe('notificationsSend', () => { let senderKeyManager: KeyManager; let dataDir: string; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let nodeManager: NodeManager; let notificationsManager: NotificationsManager; @@ -109,10 +111,14 @@ describe('notificationsSend', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), @@ -123,8 +129,10 @@ describe('notificationsSend', () => { nodeGraph, nodeConnectionManager, sigchain, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); notificationsManager = diff --git a/tests/client/service/gestaltsDiscoveryByIdentity.test.ts b/tests/client/service/gestaltsDiscoveryByIdentity.test.ts index a9a4d7a17..6fec772c1 100644 --- a/tests/client/service/gestaltsDiscoveryByIdentity.test.ts +++ b/tests/client/service/gestaltsDiscoveryByIdentity.test.ts @@ -24,6 +24,7 @@ import * as utilsPB from '@/proto/js/polykey/v1/utils/utils_pb'; import * as identitiesPB from '@/proto/js/polykey/v1/identities/identities_pb'; import * as clientUtils from '@/client/utils/utils'; import * as keysUtils from '@/keys/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; describe('gestaltsDiscoveryByIdentity', () => { @@ -59,6 +60,7 @@ describe('gestaltsDiscoveryByIdentity', () => { let gestaltGraph: GestaltGraph; let identitiesManager: IdentitiesManager; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let nodeManager: NodeManager; let sigchain: Sigchain; @@ -125,10 +127,14 @@ describe('gestaltsDiscoveryByIdentity', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), @@ -139,8 +145,10 @@ describe('gestaltsDiscoveryByIdentity', () => { nodeConnectionManager, nodeGraph, sigchain, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); discovery = await Discovery.createDiscovery({ @@ -179,6 +187,7 @@ describe('gestaltsDiscoveryByIdentity', () => { await nodeGraph.stop(); await nodeConnectionManager.stop(); await nodeManager.stop(); + await setNodeQueue.stop(); await sigchain.stop(); await proxy.stop(); await identitiesManager.stop(); diff --git a/tests/client/service/gestaltsDiscoveryByNode.test.ts b/tests/client/service/gestaltsDiscoveryByNode.test.ts index e34f5f8ed..1e97b6250 100644 --- a/tests/client/service/gestaltsDiscoveryByNode.test.ts +++ b/tests/client/service/gestaltsDiscoveryByNode.test.ts @@ -25,6 +25,7 @@ import * as nodesPB from '@/proto/js/polykey/v1/nodes/nodes_pb'; import * as clientUtils from '@/client/utils/utils'; import * as keysUtils from '@/keys/utils'; import * as nodesUtils from '@/nodes/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; import * as testNodesUtils from '../../nodes/utils'; @@ -60,6 +61,7 @@ describe('gestaltsDiscoveryByNode', () => { let gestaltGraph: GestaltGraph; let identitiesManager: IdentitiesManager; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let nodeManager: NodeManager; let sigchain: Sigchain; @@ -126,10 +128,14 @@ describe('gestaltsDiscoveryByNode', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), @@ -140,8 +146,10 @@ describe('gestaltsDiscoveryByNode', () => { nodeConnectionManager, nodeGraph, sigchain, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); discovery = await Discovery.createDiscovery({ @@ -180,6 +188,7 @@ describe('gestaltsDiscoveryByNode', () => { await nodeGraph.stop(); await nodeConnectionManager.stop(); await nodeManager.stop(); + await setNodeQueue.stop(); await sigchain.stop(); await proxy.stop(); await identitiesManager.stop(); diff --git a/tests/client/service/gestaltsGestaltTrustByIdentity.test.ts b/tests/client/service/gestaltsGestaltTrustByIdentity.test.ts index 949a5f5e4..17de35e72 100644 --- a/tests/client/service/gestaltsGestaltTrustByIdentity.test.ts +++ b/tests/client/service/gestaltsGestaltTrustByIdentity.test.ts @@ -31,6 +31,7 @@ import * as gestaltsErrors from '@/gestalts/errors'; import * as keysUtils from '@/keys/utils'; import * as clientUtils from '@/client/utils/utils'; import * as nodesUtils from '@/nodes/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; import TestProvider from '../../identities/TestProvider'; import { expectRemoteError } from '../../utils'; @@ -115,6 +116,7 @@ describe('gestaltsGestaltTrustByIdentity', () => { let discovery: Discovery; let gestaltGraph: GestaltGraph; let identitiesManager: IdentitiesManager; + let setNodeQueue: SetNodeQueue; let nodeManager: NodeManager; let nodeConnectionManager: NodeConnectionManager; let nodeGraph: NodeGraph; @@ -191,10 +193,14 @@ describe('gestaltsGestaltTrustByIdentity', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), @@ -202,11 +208,13 @@ describe('gestaltsGestaltTrustByIdentity', () => { nodeManager = new NodeManager({ db, keyManager, - sigchain, - nodeGraph, nodeConnectionManager, - logger: logger.getChild('nodeManager'), + nodeGraph, + sigchain, + setNodeQueue, + logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); await nodeManager.setNode(nodesUtils.decodeNodeId(nodeId)!, { @@ -250,6 +258,7 @@ describe('gestaltsGestaltTrustByIdentity', () => { await discovery.stop(); await nodeConnectionManager.stop(); await nodeManager.stop(); + await setNodeQueue.stop(); await nodeGraph.stop(); await proxy.stop(); await sigchain.stop(); diff --git a/tests/client/service/gestaltsGestaltTrustByNode.test.ts b/tests/client/service/gestaltsGestaltTrustByNode.test.ts index d8ecae06e..5a409c120 100644 --- a/tests/client/service/gestaltsGestaltTrustByNode.test.ts +++ b/tests/client/service/gestaltsGestaltTrustByNode.test.ts @@ -33,6 +33,7 @@ import * as claimsUtils from '@/claims/utils'; import * as keysUtils from '@/keys/utils'; import * as clientUtils from '@/client/utils/utils'; import * as nodesUtils from '@/nodes/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; import TestProvider from '../../identities/TestProvider'; @@ -114,6 +115,7 @@ describe('gestaltsGestaltTrustByNode', () => { let discovery: Discovery; let gestaltGraph: GestaltGraph; let identitiesManager: IdentitiesManager; + let setNodeQueue: SetNodeQueue; let nodeManager: NodeManager; let nodeConnectionManager: NodeConnectionManager; let nodeGraph: NodeGraph; @@ -190,10 +192,14 @@ describe('gestaltsGestaltTrustByNode', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), @@ -201,11 +207,13 @@ describe('gestaltsGestaltTrustByNode', () => { nodeManager = new NodeManager({ db, keyManager, - sigchain, - nodeGraph, nodeConnectionManager, - logger: logger.getChild('nodeManager'), + nodeGraph, + sigchain, + setNodeQueue, + logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); await nodeManager.setNode(nodesUtils.decodeNodeId(nodeId)!, { @@ -249,6 +257,7 @@ describe('gestaltsGestaltTrustByNode', () => { await discovery.stop(); await nodeConnectionManager.stop(); await nodeManager.stop(); + await setNodeQueue.stop(); await nodeGraph.stop(); await proxy.stop(); await sigchain.stop(); diff --git a/tests/client/service/identitiesClaim.test.ts b/tests/client/service/identitiesClaim.test.ts index f03e1be07..5038821e9 100644 --- a/tests/client/service/identitiesClaim.test.ts +++ b/tests/client/service/identitiesClaim.test.ts @@ -26,6 +26,7 @@ import * as keysUtils from '@/keys/utils'; import * as claimsUtils from '@/claims/utils'; import * as nodesUtils from '@/nodes/utils'; import * as validationErrors from '@/validation/errors'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; import TestProvider from '../../identities/TestProvider'; import { expectRemoteError } from '../../utils'; @@ -86,6 +87,7 @@ describe('identitiesClaim', () => { let testProvider: TestProvider; let identitiesManager: IdentitiesManager; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let sigchain: Sigchain; let proxy: Proxy; @@ -137,13 +139,18 @@ describe('identitiesClaim', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ connConnectTime: 2000, proxy, keyManager, nodeGraph, - logger: logger.getChild('nodeConnectionManager'), + setNodeQueue, + logger: logger.getChild('NodeConnectionManager'), }); + await setNodeQueue.start(); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); const clientService = { identitiesClaim: identitiesClaim({ @@ -172,6 +179,7 @@ describe('identitiesClaim', () => { await grpcClient.destroy(); await grpcServer.stop(); await nodeConnectionManager.stop(); + await setNodeQueue.stop(); await nodeGraph.stop(); await sigchain.stop(); await proxy.stop(); diff --git a/tests/client/service/nodesAdd.test.ts b/tests/client/service/nodesAdd.test.ts index 94d925acc..e6d037034 100644 --- a/tests/client/service/nodesAdd.test.ts +++ b/tests/client/service/nodesAdd.test.ts @@ -22,6 +22,7 @@ import * as nodesUtils from '@/nodes/utils'; import * as clientUtils from '@/client/utils/utils'; import * as keysUtils from '@/keys/utils'; import * as validationErrors from '@/validation/errors'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; import { expectRemoteError } from '../../utils'; @@ -50,6 +51,7 @@ describe('nodesAdd', () => { const authToken = 'abc123'; let dataDir: string; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let nodeManager: NodeManager; let sigchain: Sigchain; @@ -96,10 +98,14 @@ describe('nodesAdd', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), @@ -110,8 +116,10 @@ describe('nodesAdd', () => { nodeConnectionManager, nodeGraph, sigchain, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); const clientService = { @@ -140,6 +148,7 @@ describe('nodesAdd', () => { await grpcServer.stop(); await nodeGraph.stop(); await nodeConnectionManager.stop(); + await setNodeQueue.stop(); await sigchain.stop(); await proxy.stop(); await db.stop(); diff --git a/tests/client/service/nodesClaim.test.ts b/tests/client/service/nodesClaim.test.ts index 47102fe1a..21b6a4a5a 100644 --- a/tests/client/service/nodesClaim.test.ts +++ b/tests/client/service/nodesClaim.test.ts @@ -24,6 +24,7 @@ import * as utilsPB from '@/proto/js/polykey/v1/utils/utils_pb'; import * as clientUtils from '@/client/utils/utils'; import * as keysUtils from '@/keys/utils'; import * as validationErrors from '@/validation/errors'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; describe('nodesClaim', () => { @@ -75,6 +76,7 @@ describe('nodesClaim', () => { const authToken = 'abc123'; let dataDir: string; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let nodeManager: NodeManager; let notificationsManager: NotificationsManager; @@ -126,10 +128,14 @@ describe('nodesClaim', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), @@ -137,11 +143,13 @@ describe('nodesClaim', () => { nodeManager = new NodeManager({ db, keyManager, - sigchain, - nodeGraph, nodeConnectionManager, + nodeGraph, + sigchain, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); notificationsManager = @@ -179,6 +187,7 @@ describe('nodesClaim', () => { await grpcClient.destroy(); await grpcServer.stop(); await nodeConnectionManager.stop(); + await setNodeQueue.stop(); await nodeGraph.stop(); await notificationsManager.stop(); await sigchain.stop(); diff --git a/tests/client/service/nodesFind.test.ts b/tests/client/service/nodesFind.test.ts index 21372cb4c..095139160 100644 --- a/tests/client/service/nodesFind.test.ts +++ b/tests/client/service/nodesFind.test.ts @@ -20,6 +20,7 @@ import * as nodesPB from '@/proto/js/polykey/v1/nodes/nodes_pb'; import * as clientUtils from '@/client/utils/utils'; import * as keysUtils from '@/keys/utils'; import * as validationErrors from '@/validation/errors'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; import { expectRemoteError } from '../../utils'; @@ -56,6 +57,7 @@ describe('nodesFind', () => { const authToken = 'abc123'; let dataDir: string; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let sigchain: Sigchain; let proxy: Proxy; @@ -101,14 +103,19 @@ describe('nodesFind', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), }); + await setNodeQueue.start(); await nodeConnectionManager.start({ nodeManager: {} as NodeManager }); const clientService = { nodesFind: nodesFind({ @@ -136,6 +143,7 @@ describe('nodesFind', () => { await sigchain.stop(); await nodeGraph.stop(); await nodeConnectionManager.stop(); + await setNodeQueue.stop(); await proxy.stop(); await db.stop(); await keyManager.stop(); diff --git a/tests/client/service/nodesPing.test.ts b/tests/client/service/nodesPing.test.ts index 6fd489d36..bd1409b30 100644 --- a/tests/client/service/nodesPing.test.ts +++ b/tests/client/service/nodesPing.test.ts @@ -21,6 +21,7 @@ import * as nodesPB from '@/proto/js/polykey/v1/nodes/nodes_pb'; import * as clientUtils from '@/client/utils/utils'; import * as keysUtils from '@/keys/utils'; import * as validationErrors from '@/validation/errors'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; import { expectRemoteError } from '../../utils'; @@ -55,6 +56,7 @@ describe('nodesPing', () => { const authToken = 'abc123'; let dataDir: string; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let nodeManager: NodeManager; let sigchain: Sigchain; @@ -101,10 +103,14 @@ describe('nodesPing', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), @@ -115,8 +121,10 @@ describe('nodesPing', () => { nodeConnectionManager, nodeGraph, sigchain, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeConnectionManager.start({ nodeManager }); const clientService = { nodesPing: nodesPing({ @@ -144,6 +152,7 @@ describe('nodesPing', () => { await sigchain.stop(); await nodeGraph.stop(); await nodeConnectionManager.stop(); + await setNodeQueue.stop(); await proxy.stop(); await db.stop(); await keyManager.stop(); diff --git a/tests/client/service/notificationsClear.test.ts b/tests/client/service/notificationsClear.test.ts index c2a1c5cd3..7020f7d84 100644 --- a/tests/client/service/notificationsClear.test.ts +++ b/tests/client/service/notificationsClear.test.ts @@ -21,6 +21,7 @@ import { ClientServiceService } from '@/proto/js/polykey/v1/client_service_grpc_ import * as utilsPB from '@/proto/js/polykey/v1/utils/utils_pb'; import * as keysUtils from '@/keys/utils'; import * as clientUtils from '@/client/utils/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; describe('notificationsClear', () => { @@ -53,6 +54,7 @@ describe('notificationsClear', () => { const authToken = 'abc123'; let dataDir: string; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let nodeManager: NodeManager; let notificationsManager: NotificationsManager; @@ -105,10 +107,14 @@ describe('notificationsClear', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), @@ -119,8 +125,10 @@ describe('notificationsClear', () => { nodeConnectionManager, nodeGraph, sigchain, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); notificationsManager = @@ -159,6 +167,7 @@ describe('notificationsClear', () => { await notificationsManager.stop(); await nodeGraph.stop(); await nodeConnectionManager.stop(); + await setNodeQueue.stop(); await sigchain.stop(); await proxy.stop(); await acl.stop(); diff --git a/tests/client/service/notificationsRead.test.ts b/tests/client/service/notificationsRead.test.ts index 24b8b9542..0f5b80610 100644 --- a/tests/client/service/notificationsRead.test.ts +++ b/tests/client/service/notificationsRead.test.ts @@ -23,6 +23,7 @@ import * as notificationsPB from '@/proto/js/polykey/v1/notifications/notificati import * as keysUtils from '@/keys/utils'; import * as nodesUtils from '@/nodes/utils'; import * as clientUtils from '@/client/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; import * as testNodesUtils from '../../nodes/utils'; @@ -128,6 +129,7 @@ describe('notificationsRead', () => { const authToken = 'abc123'; let dataDir: string; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let nodeManager: NodeManager; let notificationsManager: NotificationsManager; @@ -180,10 +182,14 @@ describe('notificationsRead', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), @@ -191,11 +197,13 @@ describe('notificationsRead', () => { nodeManager = new NodeManager({ db, keyManager, - nodeGraph, nodeConnectionManager, + nodeGraph, sigchain, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); notificationsManager = @@ -235,6 +243,7 @@ describe('notificationsRead', () => { await sigchain.stop(); await nodeGraph.stop(); await nodeConnectionManager.stop(); + await setNodeQueue.stop(); await proxy.stop(); await acl.stop(); await db.stop(); diff --git a/tests/client/service/notificationsSend.test.ts b/tests/client/service/notificationsSend.test.ts index 220edfe83..a0f471b58 100644 --- a/tests/client/service/notificationsSend.test.ts +++ b/tests/client/service/notificationsSend.test.ts @@ -24,6 +24,7 @@ import * as keysUtils from '@/keys/utils'; import * as nodesUtils from '@/nodes/utils'; import * as notificationsUtils from '@/notifications/utils'; import * as clientUtils from '@/client/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../../utils'; describe('notificationsSend', () => { @@ -63,6 +64,7 @@ describe('notificationsSend', () => { const authToken = 'abc123'; let dataDir: string; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let nodeManager: NodeManager; let notificationsManager: NotificationsManager; @@ -114,10 +116,14 @@ describe('notificationsSend', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), @@ -125,11 +131,13 @@ describe('notificationsSend', () => { nodeManager = new NodeManager({ db, keyManager, - nodeGraph, nodeConnectionManager, + nodeGraph, sigchain, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); notificationsManager = @@ -167,6 +175,7 @@ describe('notificationsSend', () => { await notificationsManager.stop(); await nodeGraph.stop(); await nodeConnectionManager.stop(); + await setNodeQueue.stop(); await sigchain.stop(); await proxy.stop(); await acl.stop(); diff --git a/tests/discovery/Discovery.test.ts b/tests/discovery/Discovery.test.ts index da9acd92b..04f5b8236 100644 --- a/tests/discovery/Discovery.test.ts +++ b/tests/discovery/Discovery.test.ts @@ -21,6 +21,7 @@ import * as nodesUtils from '@/nodes/utils'; import * as claimsUtils from '@/claims/utils'; import * as discoveryErrors from '@/discovery/errors'; import * as keysUtils from '@/keys/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testNodesUtils from '../nodes/utils'; import * as testUtils from '../utils'; import TestProvider from '../identities/TestProvider'; @@ -47,6 +48,7 @@ describe('Discovery', () => { let gestaltGraph: GestaltGraph; let identitiesManager: IdentitiesManager; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let nodeManager: NodeManager; let db: DB; @@ -130,10 +132,14 @@ describe('Discovery', () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 2000, connTimeoutTime: 2000, logger: logger.getChild('NodeConnectionManager'), @@ -141,11 +147,13 @@ describe('Discovery', () => { nodeManager = new NodeManager({ db, keyManager, - sigchain, - nodeGraph, nodeConnectionManager, - logger: logger.getChild('nodeManager'), + nodeGraph, + sigchain, + setNodeQueue, + logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); // Set up other gestalt @@ -204,6 +212,7 @@ describe('Discovery', () => { await nodeB.stop(); await nodeConnectionManager.stop(); await nodeManager.stop(); + await setNodeQueue.stop(); await nodeGraph.stop(); await proxy.stop(); await sigchain.stop(); diff --git a/tests/nodes/NodeConnection.test.ts b/tests/nodes/NodeConnection.test.ts index 35c084fa9..99de0bbe8 100644 --- a/tests/nodes/NodeConnection.test.ts +++ b/tests/nodes/NodeConnection.test.ts @@ -34,6 +34,7 @@ import * as nodesUtils from '@/nodes/utils'; import * as agentErrors from '@/agent/errors'; import * as grpcUtils from '@/grpc/utils'; import { timerStart } from '@/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testNodesUtils from './utils'; import * as testUtils from '../utils'; import * as grpcTestUtils from '../grpc/utils'; @@ -84,6 +85,7 @@ describe('${NodeConnection.name} test', () => { let serverKeyManager: KeyManager; let serverVaultManager: VaultManager; let serverNodeGraph: NodeGraph; + let serverSetNodeQueue: SetNodeQueue; let serverNodeConnectionManager: NodeConnectionManager; let serverNodeManager: NodeManager; let serverSigchain: Sigchain; @@ -231,10 +233,12 @@ describe('${NodeConnection.name} test', () => { logger, }); + serverSetNodeQueue = new SetNodeQueue({ logger }); serverNodeConnectionManager = new NodeConnectionManager({ keyManager: serverKeyManager, nodeGraph: serverNodeGraph, proxy: serverProxy, + setNodeQueue: serverSetNodeQueue, logger, }); serverNodeManager = new NodeManager({ @@ -243,8 +247,10 @@ describe('${NodeConnection.name} test', () => { keyManager: serverKeyManager, nodeGraph: serverNodeGraph, nodeConnectionManager: serverNodeConnectionManager, + setNodeQueue: serverSetNodeQueue, logger: logger, }); + await serverSetNodeQueue.start(); await serverNodeManager.start(); await serverNodeConnectionManager.start({ nodeManager: serverNodeManager }); serverVaultManager = await VaultManager.createVaultManager({ @@ -356,6 +362,7 @@ describe('${NodeConnection.name} test', () => { await serverNodeGraph.destroy(); await serverNodeConnectionManager.stop(); await serverNodeManager.stop(); + await serverSetNodeQueue.stop(); await serverNotificationsManager.stop(); await serverNotificationsManager.destroy(); await agentTestUtils.closeTestAgentServer(agentServer); diff --git a/tests/nodes/NodeConnectionManager.general.test.ts b/tests/nodes/NodeConnectionManager.general.test.ts index 6231e5dcc..dd30f9049 100644 --- a/tests/nodes/NodeConnectionManager.general.test.ts +++ b/tests/nodes/NodeConnectionManager.general.test.ts @@ -20,6 +20,7 @@ import * as keysUtils from '@/keys/utils'; import * as grpcUtils from '@/grpc/utils'; import * as nodesPB from '@/proto/js/polykey/v1/nodes/nodes_pb'; import * as utilsPB from '@/proto/js/polykey/v1/utils/utils_pb'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testNodesUtils from './utils'; describe(`${NodeConnectionManager.name} general test`, () => { @@ -76,6 +77,7 @@ describe(`${NodeConnectionManager.name} general test`, () => { let db: DB; let proxy: Proxy; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let remoteNode1: PolykeyAgent; let remoteNode2: PolykeyAgent; @@ -191,6 +193,10 @@ describe(`${NodeConnectionManager.name} general test`, () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); + await setNodeQueue.start(); const tlsConfig = { keyPrivatePem: keyManager.getRootKeyPairPem().privateKey, certChainPem: keysUtils.certToPem(keyManager.getRootCert()), @@ -216,6 +222,7 @@ describe(`${NodeConnectionManager.name} general test`, () => { }); afterEach(async () => { + await setNodeQueue.stop(); await nodeGraph.stop(); await nodeGraph.destroy(); await db.stop(); @@ -232,6 +239,7 @@ describe(`${NodeConnectionManager.name} general test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -259,6 +267,7 @@ describe(`${NodeConnectionManager.name} general test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -300,6 +309,7 @@ describe(`${NodeConnectionManager.name} general test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -353,6 +363,7 @@ describe(`${NodeConnectionManager.name} general test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: logger.getChild('NodeConnectionManager'), }); @@ -424,6 +435,7 @@ describe(`${NodeConnectionManager.name} general test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -461,6 +473,7 @@ describe(`${NodeConnectionManager.name} general test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); diff --git a/tests/nodes/NodeConnectionManager.lifecycle.test.ts b/tests/nodes/NodeConnectionManager.lifecycle.test.ts index 979403ec3..bf719789a 100644 --- a/tests/nodes/NodeConnectionManager.lifecycle.test.ts +++ b/tests/nodes/NodeConnectionManager.lifecycle.test.ts @@ -19,6 +19,7 @@ import * as nodesErrors from '@/nodes/errors'; import * as keysUtils from '@/keys/utils'; import * as grpcUtils from '@/grpc/utils'; import { timerStart } from '@/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; describe(`${NodeConnectionManager.name} lifecycle test`, () => { const logger = new Logger( @@ -76,6 +77,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { let proxy: Proxy; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let remoteNode1: PolykeyAgent; let remoteNode2: PolykeyAgent; @@ -154,6 +156,10 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, logger: logger.getChild('NodeGraph'), }); + setNodeQueue = new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }); + await setNodeQueue.start(); const tlsConfig = { keyPrivatePem: keyManager.getRootKeyPairPem().privateKey, certChainPem: keysUtils.certToPem(keyManager.getRootCert()), @@ -179,6 +185,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { }); afterEach(async () => { + await setNodeQueue.stop(); await nodeGraph.stop(); await nodeGraph.destroy(); await db.stop(); @@ -197,6 +204,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -222,6 +230,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -256,6 +265,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -284,6 +294,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -336,6 +347,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, connConnectTime: 500, logger: nodeConnectionManagerLogger, }); @@ -377,6 +389,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -403,6 +416,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -436,6 +450,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -469,6 +484,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -511,6 +527,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -531,6 +548,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); @@ -556,6 +574,7 @@ describe(`${NodeConnectionManager.name} lifecycle test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); diff --git a/tests/nodes/NodeConnectionManager.seednodes.test.ts b/tests/nodes/NodeConnectionManager.seednodes.test.ts index 4d47afb0c..ae186f451 100644 --- a/tests/nodes/NodeConnectionManager.seednodes.test.ts +++ b/tests/nodes/NodeConnectionManager.seednodes.test.ts @@ -17,6 +17,7 @@ import Proxy from '@/network/Proxy'; import * as nodesUtils from '@/nodes/utils'; import * as keysUtils from '@/keys/utils'; import * as grpcUtils from '@/grpc/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; describe(`${NodeConnectionManager.name} seed nodes test`, () => { const logger = new Logger( @@ -190,6 +191,9 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue: new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }), seedNodes: dummySeedNodes, logger: logger, }); @@ -213,6 +217,9 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue: new SetNodeQueue({ + logger: logger.getChild('SetNodeQueue'), + }), seedNodes: dummySeedNodes, logger: logger, }); @@ -230,6 +237,7 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { test('should synchronise nodeGraph', async () => { let nodeConnectionManager: NodeConnectionManager | undefined; let nodeManager: NodeManager | undefined; + let setNodeQueue: SetNodeQueue | undefined; const mockedRefreshBucket = jest.spyOn( NodeManager.prototype, 'refreshBucket', @@ -245,10 +253,12 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { host: remoteNode2.proxy.getProxyHost(), port: remoteNode2.proxy.getProxyPort(), }; + setNodeQueue = new SetNodeQueue({ logger }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, seedNodes, logger: logger, }); @@ -258,8 +268,10 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { logger, nodeConnectionManager, nodeGraph, + setNodeQueue, sigchain: {} as Sigchain, }); + await setNodeQueue.start(); await nodeManager.start(); await remoteNode1.nodeGraph.setNode(nodeId1, { host: serverHost, @@ -278,11 +290,13 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { mockedRefreshBucket.mockRestore(); await nodeManager?.stop(); await nodeConnectionManager?.stop(); + await setNodeQueue?.stop(); } }); test('should call refreshBucket when syncing nodeGraph', async () => { let nodeConnectionManager: NodeConnectionManager | undefined; let nodeManager: NodeManager | undefined; + let setNodeQueue: SetNodeQueue | undefined; const mockedRefreshBucket = jest.spyOn( NodeManager.prototype, 'refreshBucket', @@ -298,10 +312,12 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { host: remoteNode2.proxy.getProxyHost(), port: remoteNode2.proxy.getProxyPort(), }; + setNodeQueue = new SetNodeQueue({ logger }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, seedNodes, logger: logger, }); @@ -312,7 +328,9 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { nodeConnectionManager, nodeGraph, sigchain: {} as Sigchain, + setNodeQueue, }); + await setNodeQueue.start(); await nodeManager.start(); await remoteNode1.nodeGraph.setNode(nodeId1, { host: serverHost, @@ -330,11 +348,13 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { mockedRefreshBucket.mockRestore(); await nodeManager?.stop(); await nodeConnectionManager?.stop(); + await setNodeQueue?.stop(); } }); test('should handle an offline seed node when synchronising nodeGraph', async () => { let nodeConnectionManager: NodeConnectionManager | undefined; let nodeManager: NodeManager | undefined; + let setNodeQueue: SetNodeQueue | undefined; const mockedRefreshBucket = jest.spyOn( NodeManager.prototype, 'refreshBucket', @@ -363,10 +383,12 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { host: serverHost, port: serverPort, }); + setNodeQueue = new SetNodeQueue({ logger }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, proxy, + setNodeQueue, seedNodes, connConnectTime: 500, logger: logger, @@ -378,7 +400,9 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { nodeConnectionManager, nodeGraph, sigchain: {} as Sigchain, + setNodeQueue, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); // This should complete without error @@ -390,6 +414,7 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { mockedRefreshBucket.mockRestore(); await nodeConnectionManager?.stop(); await nodeManager?.stop(); + await setNodeQueue?.stop(); } }); }); diff --git a/tests/nodes/NodeConnectionManager.termination.test.ts b/tests/nodes/NodeConnectionManager.termination.test.ts index 0422cf223..89358557b 100644 --- a/tests/nodes/NodeConnectionManager.termination.test.ts +++ b/tests/nodes/NodeConnectionManager.termination.test.ts @@ -2,6 +2,7 @@ import type { AddressInfo } from 'net'; import type { NodeId, NodeIdString, SeedNodes } from '@/nodes/types'; import type { Host, Port, TLSConfig } from '@/network/types'; import type NodeManager from '@/nodes/NodeManager'; +import type SetNodeQueue from '@/nodes/SetNodeQueue'; import net from 'net'; import fs from 'fs'; import path from 'path'; @@ -246,6 +247,7 @@ describe(`${NodeConnectionManager.name} termination test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue: {} as SetNodeQueue, logger: logger, connConnectTime: 2000, }); @@ -286,6 +288,7 @@ describe(`${NodeConnectionManager.name} termination test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue: {} as SetNodeQueue, logger: logger, connConnectTime: 2000, }); @@ -329,6 +332,7 @@ describe(`${NodeConnectionManager.name} termination test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue: {} as SetNodeQueue, logger: logger, connConnectTime: 2000, }); @@ -372,6 +376,7 @@ describe(`${NodeConnectionManager.name} termination test`, () => { keyManager, nodeGraph, proxy: defaultProxy, + setNodeQueue: {} as SetNodeQueue, logger: logger, connConnectTime: 2000, }); @@ -429,6 +434,7 @@ describe(`${NodeConnectionManager.name} termination test`, () => { keyManager, nodeGraph, proxy: defaultProxy, + setNodeQueue: {} as SetNodeQueue, logger: logger, connConnectTime: 2000, }); @@ -508,6 +514,7 @@ describe(`${NodeConnectionManager.name} termination test`, () => { keyManager, nodeGraph, proxy: defaultProxy, + setNodeQueue: {} as SetNodeQueue, logger: logger, connConnectTime: 2000, }); @@ -580,6 +587,7 @@ describe(`${NodeConnectionManager.name} termination test`, () => { keyManager, nodeGraph, proxy: defaultProxy, + setNodeQueue: {} as SetNodeQueue, logger: logger, connConnectTime: 2000, }); @@ -657,6 +665,7 @@ describe(`${NodeConnectionManager.name} termination test`, () => { keyManager, nodeGraph, proxy: defaultProxy, + setNodeQueue: {} as SetNodeQueue, logger: logger, connConnectTime: 2000, }); @@ -734,6 +743,7 @@ describe(`${NodeConnectionManager.name} termination test`, () => { keyManager, nodeGraph, proxy: defaultProxy, + setNodeQueue: {} as SetNodeQueue, logger: logger, connConnectTime: 2000, }); diff --git a/tests/nodes/NodeConnectionManager.timeout.test.ts b/tests/nodes/NodeConnectionManager.timeout.test.ts index 494350f52..e07958140 100644 --- a/tests/nodes/NodeConnectionManager.timeout.test.ts +++ b/tests/nodes/NodeConnectionManager.timeout.test.ts @@ -1,6 +1,7 @@ import type { NodeId, NodeIdString, SeedNodes } from '@/nodes/types'; import type { Host, Port } from '@/network/types'; import type NodeManager from 'nodes/NodeManager'; +import type SetNodeQueue from '@/nodes/SetNodeQueue'; import fs from 'fs'; import path from 'path'; import os from 'os'; @@ -188,6 +189,7 @@ describe(`${NodeConnectionManager.name} timeout test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue: {} as SetNodeQueue, connTimeoutTime: 500, logger: nodeConnectionManagerLogger, }); @@ -225,6 +227,7 @@ describe(`${NodeConnectionManager.name} timeout test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue: {} as SetNodeQueue, connTimeoutTime: 1000, logger: nodeConnectionManagerLogger, }); @@ -278,6 +281,7 @@ describe(`${NodeConnectionManager.name} timeout test`, () => { keyManager, nodeGraph, proxy, + setNodeQueue: {} as SetNodeQueue, logger: nodeConnectionManagerLogger, }); await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); diff --git a/tests/nodes/NodeManager.test.ts b/tests/nodes/NodeManager.test.ts index b83be35d8..7815c34e0 100644 --- a/tests/nodes/NodeManager.test.ts +++ b/tests/nodes/NodeManager.test.ts @@ -20,6 +20,7 @@ import { promise, promisify, sleep } from '@/utils'; import * as nodesUtils from '@/nodes/utils'; import * as utilsPB from '@/proto/js/polykey/v1/utils/utils_pb'; import * as nodesErrors from '@/nodes/errors'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as nodesTestUtils from './utils'; import { generateNodeIdForBucket } from './utils'; @@ -30,6 +31,7 @@ describe(`${NodeManager.name} test`, () => { ]); let dataDir: string; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let proxy: Proxy; let keyManager: KeyManager; @@ -111,9 +113,11 @@ describe(`${NodeManager.name} test`, () => { keyManager, logger, }); + setNodeQueue = new SetNodeQueue({ logger }); nodeConnectionManager = new NodeConnectionManager({ keyManager, nodeGraph, + setNodeQueue, proxy, logger, }); @@ -122,6 +126,7 @@ describe(`${NodeManager.name} test`, () => { mockedPingNode.mockClear(); mockedPingNode.mockImplementation(async (_) => true); await nodeConnectionManager.stop(); + await setNodeQueue.stop(); await nodeGraph.stop(); await nodeGraph.destroy(); await sigchain.stop(); @@ -167,6 +172,7 @@ describe(`${NodeManager.name} test`, () => { keyManager, nodeGraph, nodeConnectionManager, + setNodeQueue, logger, }); await nodeManager.start(); @@ -240,6 +246,7 @@ describe(`${NodeManager.name} test`, () => { keyManager, nodeGraph, nodeConnectionManager, + setNodeQueue, logger, }); await nodeManager.start(); @@ -427,6 +434,7 @@ describe(`${NodeManager.name} test`, () => { keyManager, nodeGraph, nodeConnectionManager, + setNodeQueue, logger, }); await nodeManager.start(); @@ -443,15 +451,18 @@ describe(`${NodeManager.name} test`, () => { }); }); test('should add a node when bucket has room', async () => { + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: {} as NodeConnectionManager, + setNodeQueue, logger, }); try { + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); const localNodeId = keyManager.getNodeId(); @@ -467,18 +478,22 @@ describe(`${NodeManager.name} test`, () => { expect(bucket).toHaveLength(1); } finally { await nodeManager.stop(); + await setNodeQueue.stop(); } }); test('should update a node if node exists', async () => { + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: {} as NodeConnectionManager, + setNodeQueue, logger, }); try { + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); const localNodeId = keyManager.getNodeId(); @@ -506,18 +521,22 @@ describe(`${NodeManager.name} test`, () => { expect(newNodeData.lastUpdated).not.toEqual(nodeData.lastUpdated); } finally { await nodeManager.stop(); + await setNodeQueue.stop(); } }); test('should not add node if bucket is full and old node is alive', async () => { + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: {} as NodeConnectionManager, + setNodeQueue, logger, }); try { + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); const localNodeId = keyManager.getNodeId(); @@ -556,18 +575,22 @@ describe(`${NodeManager.name} test`, () => { nodeManagerPingMock.mockRestore(); } finally { await nodeManager.stop(); + await setNodeQueue.stop(); } }); test('should add node if bucket is full, old node is alive and force is set', async () => { + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: {} as NodeConnectionManager, + setNodeQueue, logger, }); try { + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); const localNodeId = keyManager.getNodeId(); @@ -608,18 +631,22 @@ describe(`${NodeManager.name} test`, () => { nodeManagerPingMock.mockRestore(); } finally { await nodeManager.stop(); + await setNodeQueue.stop(); } }); test('should add node if bucket is full and old node is dead', async () => { + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: {} as NodeConnectionManager, + setNodeQueue, logger, }); try { + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); const localNodeId = keyManager.getNodeId(); @@ -652,19 +679,23 @@ describe(`${NodeManager.name} test`, () => { nodeManagerPingMock.mockRestore(); } finally { await nodeManager.stop(); + await setNodeQueue.stop(); } }); test('should add node when an incoming connection is established', async () => { let server: PolykeyAgent | undefined; + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: {} as NodeConnectionManager, + setNodeQueue, logger, }); try { + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); server = await PolykeyAgent.createPolykeyAgent({ @@ -704,19 +735,23 @@ describe(`${NodeManager.name} test`, () => { await server?.stop(); await server?.destroy(); await nodeManager.stop(); + await setNodeQueue.stop(); } }); test('should not add nodes to full bucket if pings succeeds', async () => { mockedPingNode.mockImplementation(async (_) => true); + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: dummyNodeConnectionManager, + setNodeQueue, logger, }); try { + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); const nodeId = keyManager.getNodeId(); @@ -742,18 +777,22 @@ describe(`${NodeManager.name} test`, () => { ); } finally { await nodeManager.stop(); + await setNodeQueue.stop(); } }); test('should add nodes to full bucket if pings fail', async () => { mockedPingNode.mockImplementation(async (_) => true); + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: dummyNodeConnectionManager, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeManager.start(); try { await nodeConnectionManager.start({ nodeManager }); @@ -779,13 +818,14 @@ describe(`${NodeManager.name} test`, () => { await nodeManager.setNode(newNode1, address); await nodeManager.setNode(newNode2, address); await nodeManager.setNode(newNode3, address); - await nodeManager.queueDrained(); + await setNodeQueue.queueDrained(); const list = await listBucket(100); expect(list).toContain(nodesUtils.encodeNodeId(newNode1)); expect(list).toContain(nodesUtils.encodeNodeId(newNode2)); expect(list).toContain(nodesUtils.encodeNodeId(newNode3)); } finally { await nodeManager.stop(); + await setNodeQueue.stop(); } }); test('should not block when bucket is full', async () => { @@ -795,14 +835,17 @@ describe(`${NodeManager.name} test`, () => { logger, }); mockedPingNode.mockImplementation(async (_) => true); + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph: tempNodeGraph, nodeConnectionManager: dummyNodeConnectionManager, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeManager.start(); try { await nodeConnectionManager.start({ nodeManager }); @@ -821,27 +864,32 @@ describe(`${NodeManager.name} test`, () => { return true; }); const newNode4 = generateNodeIdForBucket(nodeId, 100, 25); + // Set manually to non-blocking await expect( - nodeManager.setNode(newNode4, address), + nodeManager.setNode(newNode4, address, false), ).resolves.toBeUndefined(); delayPing.resolveP(null); - await nodeManager.queueDrained(); + await setNodeQueue.queueDrained(); } finally { await nodeManager.stop(); + await setNodeQueue.stop(); await tempNodeGraph.stop(); await tempNodeGraph.destroy(); } }); test('should block when blocking is set to true', async () => { mockedPingNode.mockImplementation(async (_) => true); + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: dummyNodeConnectionManager, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeManager.start(); try { await nodeConnectionManager.start({ nodeManager }); @@ -863,16 +911,19 @@ describe(`${NodeManager.name} test`, () => { expect(mockedPingNode).toBeCalled(); } finally { await nodeManager.stop(); + await setNodeQueue.stop(); } }); test('should update deadline when updating a bucket', async () => { const refreshBucketTimeout = 100000; + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: dummyNodeConnectionManager, + setNodeQueue, refreshBucketTimerDefault: refreshBucketTimeout, logger, }); @@ -882,6 +933,7 @@ describe(`${NodeManager.name} test`, () => { ); try { mockRefreshBucket.mockImplementation(async () => {}); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); // @ts-ignore: kidnap map @@ -901,16 +953,19 @@ describe(`${NodeManager.name} test`, () => { } finally { mockRefreshBucket.mockRestore(); await nodeManager.stop(); + await setNodeQueue.stop(); } }); test('should add buckets to the queue when exceeding deadline', async () => { const refreshBucketTimeout = 100; + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: dummyNodeConnectionManager, + setNodeQueue, refreshBucketTimerDefault: refreshBucketTimeout, logger, }); @@ -924,6 +979,7 @@ describe(`${NodeManager.name} test`, () => { ); try { mockRefreshBucket.mockImplementation(async () => {}); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); // Getting starting value @@ -934,16 +990,19 @@ describe(`${NodeManager.name} test`, () => { mockRefreshBucketQueueAdd.mockRestore(); mockRefreshBucket.mockRestore(); await nodeManager.stop(); + await setNodeQueue.stop(); } }); test('should digest queue to refresh buckets', async () => { const refreshBucketTimeout = 1000000; + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: dummyNodeConnectionManager, + setNodeQueue, refreshBucketTimerDefault: refreshBucketTimeout, logger, }); @@ -952,6 +1011,7 @@ describe(`${NodeManager.name} test`, () => { 'refreshBucket', ); try { + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); mockRefreshBucket.mockImplementation(async () => {}); @@ -968,16 +1028,19 @@ describe(`${NodeManager.name} test`, () => { } finally { mockRefreshBucket.mockRestore(); await nodeManager.stop(); + await setNodeQueue.stop(); } }); test('should abort refreshBucket queue when stopping', async () => { const refreshBucketTimeout = 1000000; + const setNodeQueue = new SetNodeQueue({ logger }); const nodeManager = new NodeManager({ db, sigchain: {} as Sigchain, keyManager, nodeGraph, nodeConnectionManager: dummyNodeConnectionManager, + setNodeQueue, refreshBucketTimerDefault: refreshBucketTimeout, logger, }); @@ -986,6 +1049,7 @@ describe(`${NodeManager.name} test`, () => { 'refreshBucket', ); try { + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); mockRefreshBucket.mockImplementation( @@ -1007,6 +1071,7 @@ describe(`${NodeManager.name} test`, () => { } finally { mockRefreshBucket.mockRestore(); await nodeManager.stop(); + await setNodeQueue.stop(); } }); }); diff --git a/tests/notifications/NotificationsManager.test.ts b/tests/notifications/NotificationsManager.test.ts index b382ea49d..8a498fdb7 100644 --- a/tests/notifications/NotificationsManager.test.ts +++ b/tests/notifications/NotificationsManager.test.ts @@ -22,6 +22,7 @@ import * as notificationsErrors from '@/notifications/errors'; import * as vaultsUtils from '@/vaults/utils'; import * as nodesUtils from '@/nodes/utils'; import * as keysUtils from '@/keys/utils'; +import SetNodeQueue from '@/nodes/SetNodeQueue'; import * as testUtils from '../utils'; describe('NotificationsManager', () => { @@ -50,6 +51,7 @@ describe('NotificationsManager', () => { let acl: ACL; let db: DB; let nodeGraph: NodeGraph; + let setNodeQueue: SetNodeQueue; let nodeConnectionManager: NodeConnectionManager; let nodeManager: NodeManager; let keyManager: KeyManager; @@ -112,10 +114,12 @@ describe('NotificationsManager', () => { keyManager, logger, }); + setNodeQueue = new SetNodeQueue({ logger }); nodeConnectionManager = new NodeConnectionManager({ nodeGraph, keyManager, proxy, + setNodeQueue, logger, }); nodeManager = new NodeManager({ @@ -124,8 +128,10 @@ describe('NotificationsManager', () => { sigchain, nodeConnectionManager, nodeGraph, + setNodeQueue, logger, }); + await setNodeQueue.start(); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); // Set up node for receiving notifications @@ -147,6 +153,7 @@ describe('NotificationsManager', () => { }, global.defaultTimeout); afterAll(async () => { await receiver.stop(); + await setNodeQueue.stop(); await nodeConnectionManager.stop(); await nodeGraph.stop(); await proxy.stop(); diff --git a/tests/vaults/VaultManager.test.ts b/tests/vaults/VaultManager.test.ts index 5236215b0..46257fa33 100644 --- a/tests/vaults/VaultManager.test.ts +++ b/tests/vaults/VaultManager.test.ts @@ -8,6 +8,7 @@ import type { import type NotificationsManager from '@/notifications/NotificationsManager'; import type { Host, Port, TLSConfig } from '@/network/types'; import type NodeManager from '@/nodes/NodeManager'; +import type SetNodeQueue from '@/nodes/SetNodeQueue'; import fs from 'fs'; import os from 'os'; import path from 'path'; @@ -580,6 +581,7 @@ describe('VaultManager', () => { keyManager, nodeGraph, proxy, + setNodeQueue: {} as SetNodeQueue, logger, }); await nodeConnectionManager.start({ @@ -1497,6 +1499,7 @@ describe('VaultManager', () => { logger, nodeGraph, proxy, + setNodeQueue: {} as SetNodeQueue, connConnectTime: 1000, }); await nodeConnectionManager.start({