From 6d8f60641a3912c5d1e8dd13798fbe08ef1161d2 Mon Sep 17 00:00:00 2001 From: Brian Botha Date: Mon, 26 Sep 2022 16:52:39 +1000 Subject: [PATCH] feat: `Discovery` makes use of `TaskManager` now - The parts of discovery that handles queuing has been removed. This includes the 'plug' promises, database levels and the discovery ID generator. - The logic of the queue where it handled the vertex was separated out into a new method. This logic included connecting to a vertex, getting the claims' data, iterating over the claims, verifying the claims and adding them to the gestaltGraph, and adding the new vertices to the list. - Adding a vertex to the list now consists of creating a singleton task. each task focuses on processing a single vertex. - I tried to cancel all tasks on `discovery.destroy()` but it would require the `taskManager` to be started at the time since we need to iterate over active tasks at the time. For now cancel the tasks when starting with fresh. - To avoid re-trying tasks, a `visitedVertices` set is used, this hasn't been updated. A more complex mechanism is need for this that has persistence. I think this will be left with the more complex discovery changes later. - Discovery tests have been updated. none removed or added so far. The change was just using `taskManager`. - Removed discovery utils and types. These are not needed anymore since we don't make use of a discovery ID generator. - Added locking on the vertex to avoid duplicating a task due to racing. Also catching and throwing the singleton reason, so we don't get any errors on tasks that were canceled for this reason. - Discovery tasks are canceled during fresh start and destroy. -`Discovery` stops any active tasks when stopping - Added a check for if `TaskManager` is still processing when calling stop on `Discovery` and `NodeManager`. --- src/PolykeyAgent.ts | 7 +- src/discovery/Discovery.ts | 632 ++++++++++-------- src/discovery/types.ts | 11 - src/discovery/utils.ts | 13 - src/nodes/NodeManager.ts | 3 + src/tasks/TaskManager.ts | 4 + src/tasks/errors.ts | 6 + tests/bin/identities/trustUntrustList.test.ts | 10 +- .../gestaltsDiscoveryByIdentity.test.ts | 3 +- .../service/gestaltsDiscoveryByNode.test.ts | 3 +- .../gestaltsGestaltTrustByIdentity.test.ts | 13 +- .../gestaltsGestaltTrustByNode.test.ts | 3 +- tests/discovery/Discovery.test.ts | 49 +- .../NodeConnectionManager.seednodes.test.ts | 4 + 14 files changed, 430 insertions(+), 331 deletions(-) delete mode 100644 src/discovery/types.ts delete mode 100644 src/discovery/utils.ts diff --git a/src/PolykeyAgent.ts b/src/PolykeyAgent.ts index 997010d21..e08a06435 100644 --- a/src/PolykeyAgent.ts +++ b/src/PolykeyAgent.ts @@ -325,6 +325,7 @@ class PolykeyAgent { identitiesManager, nodeManager, sigchain, + taskManager, logger: logger.getChild(Discovery.name), })); notificationsManager = @@ -754,16 +755,20 @@ class PolykeyAgent { this.logger.info(`Destroying ${this.constructor.name}`); // DB needs to be running for dependent domains to properly clear state. await this.db.start(); + // TaskManager needs to be running for dependent domains to clear state. + await this.taskManager.start({ lazy: true }); await this.sessionManager.destroy(); await this.notificationsManager.destroy(); await this.vaultManager.destroy(); await this.discovery.destroy(); await this.nodeGraph.destroy(); await this.gestaltGraph.destroy(); - await this.taskManager.destroy(); await this.acl.destroy(); await this.sigchain.destroy(); await this.identitiesManager.destroy(); + await this.taskManager.stop(); + await this.taskManager.destroy(); + // Non-TaskManager dependencies await this.db.stop(); // Non-DB dependencies await this.db.destroy(); diff --git a/src/discovery/Discovery.ts b/src/discovery/Discovery.ts index 834b6c733..55974fb33 100644 --- a/src/discovery/Discovery.ts +++ b/src/discovery/Discovery.ts @@ -1,5 +1,5 @@ -import type { DB, LevelPath } from '@matrixai/db'; -import type { DiscoveryQueueId, DiscoveryQueueIdGenerator } from './types'; +import type { DB, DBTransaction } from '@matrixai/db'; +import type { PromiseCancellable } from '@matrixai/async-cancellable'; import type { NodeId, NodeInfo } from '../nodes/types'; import type NodeManager from '../nodes/NodeManager'; import type GestaltGraph from '../gestalts/GestaltGraph'; @@ -17,22 +17,39 @@ import type Sigchain from '../sigchain/Sigchain'; import type KeyManager from '../keys/KeyManager'; import type { ClaimIdEncoded, Claim, ClaimLinkIdentity } from '../claims/types'; import type { ChainData } from '../sigchain/types'; +import type TaskManager from '../tasks/TaskManager'; +import type { ContextTimed } from '../contexts/types'; +import type { TaskHandler, TaskHandlerId } from '../tasks/types'; import Logger from '@matrixai/logger'; import { CreateDestroyStartStop, ready, - status, } from '@matrixai/async-init/dist/CreateDestroyStartStop'; -import { IdInternal } from '@matrixai/id'; -import * as idUtils from '@matrixai/id/dist/utils'; -import * as discoveryUtils from './utils'; import * as discoveryErrors from './errors'; +import * as tasksErrors from '../tasks/errors'; import * as nodesErrors from '../nodes/errors'; import * as networkErrors from '../network/errors'; import * as gestaltsUtils from '../gestalts/utils'; import * as claimsUtils from '../claims/utils'; import * as nodesUtils from '../nodes/utils'; -import { never, promise } from '../utils'; +import { never } from '../utils'; +import { context } from '../contexts/index'; +import TimedCancellable from '../contexts/decorators/timedCancellable'; + +/** + * This is the reason used to cancel duplicate tasks for vertices + */ +const abortSingletonTaskReason = Symbol('abort singleton task reason'); +/** + * This is the reason used to stop and re-schedule all discovery tasks + * when stopping. + */ +const discoveryStoppingTaskReason = Symbol('discovery stopping task reason'); +/** + * This is the reason used to cancel all tasks + * when cleaning up state during destroy or starting fresh + */ +const discoveryDestroyedTaskReason = Symbol('discovery destroyed task reason'); interface Discovery extends CreateDestroyStartStop {} @CreateDestroyStartStop( @@ -47,6 +64,7 @@ class Discovery { identitiesManager, nodeManager, sigchain, + taskManager, logger = new Logger(this.name), fresh = false, }: { @@ -56,6 +74,7 @@ class Discovery { identitiesManager: IdentitiesManager; nodeManager: NodeManager; sigchain: Sigchain; + taskManager: TaskManager; logger?: Logger; fresh?: boolean; }): Promise { @@ -67,6 +86,7 @@ class Discovery { identitiesManager, nodeManager, sigchain, + taskManager, logger, }); await discovery.start({ fresh }); @@ -81,14 +101,34 @@ class Discovery { protected gestaltGraph: GestaltGraph; protected identitiesManager: IdentitiesManager; protected nodeManager: NodeManager; + protected taskManager: TaskManager; - protected discoveryDbPath: LevelPath = [this.constructor.name]; - protected discoveryQueueDbPath: LevelPath = [this.constructor.name, 'queue']; - protected discoveryQueueIdGenerator: DiscoveryQueueIdGenerator; protected visitedVertices = new Set(); - protected discoveryProcess: Promise; - protected queuePlug = promise(); - protected queueDrained = promise(); + protected discoverVertexHandler: TaskHandler = async ( + ctx, + _taskInfo, + vertex: GestaltKey, + ) => { + try { + await this.processVertex(vertex, ctx); + } catch (e) { + if ( + e instanceof tasksErrors.ErrorTaskStop || + e === discoveryStoppingTaskReason + ) { + // We need to recreate the task for the vertex + await this.scheduleDiscoveryForVertex(vertex); + return; + } + // Aborting a duplicate task is not an error + if (e === abortSingletonTaskReason) return; + // Destroying tasks is not an error + if (e === discoveryDestroyedTaskReason) return; + throw e; + } + }; + public readonly discoverVertexHandlerId = + `${this.constructor.name}.${this.discoverVertexHandler.name}.discoverVertexHandlerId` as TaskHandlerId; public constructor({ keyManager, @@ -97,6 +137,7 @@ class Discovery { identitiesManager, nodeManager, sigchain, + taskManager, logger, }: { db: DB; @@ -105,6 +146,7 @@ class Discovery { identitiesManager: IdentitiesManager; nodeManager: NodeManager; sigchain: Sigchain; + taskManager: TaskManager; logger: Logger; }) { this.db = db; @@ -113,6 +155,7 @@ class Discovery { this.identitiesManager = identitiesManager; this.nodeManager = nodeManager; this.sigchain = sigchain; + this.taskManager = taskManager; this.logger = logger; } @@ -123,36 +166,48 @@ class Discovery { } = {}): Promise { this.logger.info(`Starting ${this.constructor.name}`); if (fresh) { - await this.db.clear(this.discoveryDbPath); - } - // Getting latest ID and creating ID generator - let latestId: DiscoveryQueueId | undefined; - const keyIterator = this.db.iterator(this.discoveryQueueDbPath, { - limit: 1, - reverse: true, - values: false, - }); - for await (const [keyPath] of keyIterator) { - const key = keyPath[0] as Buffer; - latestId = IdInternal.fromBuffer(key); + // Cancel all tasks for discovery + for await (const task of this.taskManager.getTasks('asc', true, [ + this.constructor.name, + ])) { + task.cancel(discoveryDestroyedTaskReason); + } } - this.discoveryQueueIdGenerator = - discoveryUtils.createDiscoveryQueueIdGenerator(latestId); - // Starting the queue - this.discoveryProcess = this.setupDiscoveryQueue(); + this.taskManager.registerHandler( + this.discoverVertexHandlerId, + this.discoverVertexHandler, + ); this.logger.info(`Started ${this.constructor.name}`); } public async stop(): Promise { this.logger.info(`Stopping ${this.constructor.name}`); - this.queuePlug.resolveP(); - await this.discoveryProcess; + // Stopping all tasks for discovery + if (this.taskManager.isProcessing()) { + throw new tasksErrors.ErrorTaskManagerProcessing(); + } + const taskPromises: Array> = []; + for await (const task of this.taskManager.getTasks('asc', false, [ + this.constructor.name, + ])) { + if (task.status === 'active') { + taskPromises.push(task.promise()); + task.cancel(discoveryStoppingTaskReason); + } + } + await Promise.all(taskPromises); + this.taskManager.deregisterHandler(this.discoverVertexHandlerId); this.logger.info(`Stopped ${this.constructor.name}`); } public async destroy(): Promise { this.logger.info(`Destroying ${this.constructor.name}`); - await this.db.clear(this.discoveryDbPath); + // Cancel all tasks for discovery + for await (const task of this.taskManager.getTasks('asc', true, [ + this.constructor.name, + ])) { + task.cancel(discoveryDestroyedTaskReason); + } this.logger.info(`Destroyed ${this.constructor.name}`); } @@ -162,7 +217,7 @@ class Discovery { @ready(new discoveryErrors.ErrorDiscoveryNotRunning()) public async queueDiscoveryByNode(nodeId: NodeId): Promise { const nodeKey = gestaltsUtils.keyFromNode(nodeId); - await this.pushKeyToDiscoveryQueue(nodeKey); + await this.scheduleDiscoveryForVertex(nodeKey); } /** @@ -175,299 +230,292 @@ class Discovery { identityId: IdentityId, ): Promise { const identityKey = gestaltsUtils.keyFromIdentity(providerId, identityId); - await this.pushKeyToDiscoveryQueue(identityKey); + await this.scheduleDiscoveryForVertex(identityKey); } - /** - * Async function for processing the Discovery Queue - */ - public async setupDiscoveryQueue(): Promise { - this.logger.debug('Setting up DiscoveryQueue'); - while (true) { - // Checking and waiting for items to process - if (await this.queueIsEmpty()) { - if (!(this[status] === 'stopping')) { - this.queuePlug = promise(); - this.queueDrained.resolveP(); - } - this.logger.debug('DiscoveryQueue is pausing'); - await this.queuePlug.p; - this.queueDrained = promise(); - } - if (this[status] === 'stopping') { - this.logger.debug('DiscoveryQueue is ending'); - break; - } - - // Processing queue - this.logger.debug('DiscoveryQueue is processing'); - for await (const [keyPath, vertex] of this.db.iterator( - this.discoveryQueueDbPath, - { valueAsBuffer: false }, - )) { - const key = keyPath[0] as Buffer; - const vertexId = IdInternal.fromBuffer(key); - this.logger.debug(`Processing vertex: ${vertex}`); - const vertexGId = gestaltsUtils.ungestaltKey(vertex); - switch (vertexGId.type) { - case 'node': - { - // The sigChain data of the vertex (containing all cryptolinks) - let vertexChainData: ChainData = {}; - // If the vertex we've found is our own node, we simply get our own chain - const nodeId = nodesUtils.decodeNodeId(vertexGId.nodeId)!; - if (nodeId.equals(this.keyManager.getNodeId())) { - const vertexChainDataEncoded = - await this.sigchain.getChainData(); - // Decode all our claims - no need to verify (on our own sigChain) - for (const c in vertexChainDataEncoded) { - const claimId = c as ClaimIdEncoded; - vertexChainData[claimId] = claimsUtils.decodeClaim( - vertexChainDataEncoded[claimId], - ); - } - // Otherwise, request the verified chain data from the node - } else { - try { - vertexChainData = await this.nodeManager.requestChainData( - nodeId, - ); - } catch (e) { - this.visitedVertices.add(vertex); - await this.removeKeyFromDiscoveryQueue(vertexId); + protected processVertex( + vertex: GestaltKey, + ctx?: ContextTimed, + ): PromiseCancellable; + @TimedCancellable(true, 20000) + protected async processVertex( + vertex: GestaltKey, + @context ctx: ContextTimed, + ): Promise { + this.logger.debug(`Processing vertex: ${vertex}`); + const vertexGId = gestaltsUtils.ungestaltKey(vertex); + switch (vertexGId.type) { + case 'node': + { + // The sigChain data of the vertex (containing all cryptolinks) + let vertexChainData: ChainData = {}; + // If the vertex we've found is our own node, we simply get our own chain + const nodeId = nodesUtils.decodeNodeId(vertexGId.nodeId)!; + if (nodeId.equals(this.keyManager.getNodeId())) { + const vertexChainDataEncoded = await this.sigchain.getChainData(); + // Decode all our claims - no need to verify (on our own sigChain) + for (const c in vertexChainDataEncoded) { + const claimId = c as ClaimIdEncoded; + vertexChainData[claimId] = claimsUtils.decodeClaim( + vertexChainDataEncoded[claimId], + ); + } + // Otherwise, request the verified chain data from the node + } else { + try { + vertexChainData = await this.nodeManager.requestChainData(nodeId); + } catch (e) { + this.visitedVertices.add(vertex); + this.logger.error( + `Failed to discover ${vertexGId.nodeId} - ${e.toString()}`, + ); + return; + } + } + // TODO: for now, the chain data is treated as a 'disjoint' set of + // cryptolink claims from a node to another node/identity + // That is, we have no notion of revocations, or multiple claims to + // the same node/identity. Thus, we simply iterate over this chain + // of cryptolinks. + // Now have the NodeInfo of this vertex + const vertexNodeInfo: NodeInfo = { + id: nodesUtils.encodeNodeId(nodeId), + chain: vertexChainData, + }; + // Iterate over each of the claims in the chain (already verified) + // TODO: because we're iterating over keys in a record, I don't believe + // that this will iterate in lexicographical order of keys. For now, + // this doesn't matter though (because of the previous comment). + for (const claimId in vertexChainData) { + if (ctx.signal.aborted) throw ctx.signal.reason; + const claim: Claim = vertexChainData[claimId as ClaimIdEncoded]; + // If the claim is to a node + if (claim.payload.data.type === 'node') { + // Get the chain data of the linked node + // Could be node1 or node2 in the claim so get the one that's + // not equal to nodeId from above + const node1Id = nodesUtils.decodeNodeId( + claim.payload.data.node1, + )!; + const node2Id = nodesUtils.decodeNodeId( + claim.payload.data.node2, + )!; + const linkedVertexNodeId = node1Id.equals(nodeId) + ? node2Id + : node1Id; + const linkedVertexGK = + gestaltsUtils.keyFromNode(linkedVertexNodeId); + let linkedVertexChainData: ChainData; + try { + // TODO: this needs to be cancelable + linkedVertexChainData = await this.nodeManager.requestChainData( + linkedVertexNodeId, + ); + } catch (e) { + if ( + e instanceof nodesErrors.ErrorNodeConnectionDestroyed || + e instanceof nodesErrors.ErrorNodeConnectionTimeout + ) { + if (!this.visitedVertices.has(linkedVertexGK)) { + await this.scheduleDiscoveryForVertex(linkedVertexGK); + } this.logger.error( - `Failed to discover ${vertexGId.nodeId} - ${e.toString()}`, + `Failed to discover ${nodesUtils.encodeNodeId( + linkedVertexNodeId, + )} - ${e.toString()}`, ); continue; + } else { + throw e; } } - // TODO: for now, the chain data is treated as a 'disjoint' set of - // cryptolink claims from a node to another node/identity - // That is, we have no notion of revocations, or multiple claims to the - // same node/identity. Thus, we simply iterate over this chain of - // cryptolinks. - // Now have the NodeInfo of this vertex - const vertexNodeInfo: NodeInfo = { - id: nodesUtils.encodeNodeId(nodeId), - chain: vertexChainData, + // With this verified chain, we can link + const linkedVertexNodeInfo: NodeInfo = { + id: nodesUtils.encodeNodeId(linkedVertexNodeId), + chain: linkedVertexChainData, }; - // Iterate over each of the claims in the chain (already verified) - // TODO: because we're iterating over keys in a record, I don't believe - // that this will iterate in lexicographical order of keys. For now, - // this doesn't matter though (because of the previous comment). - for (const claimId in vertexChainData) { - const claim: Claim = vertexChainData[claimId as ClaimIdEncoded]; - // If the claim is to a node - if (claim.payload.data.type === 'node') { - // Get the chain data of the linked node - // Could be node1 or node2 in the claim so get the one that's - // not equal to nodeId from above - const node1Id = nodesUtils.decodeNodeId( - claim.payload.data.node1, - )!; - const node2Id = nodesUtils.decodeNodeId( - claim.payload.data.node2, - )!; - const linkedVertexNodeId = node1Id.equals(nodeId) - ? node2Id - : node1Id; - const linkedVertexGK = - gestaltsUtils.keyFromNode(linkedVertexNodeId); - let linkedVertexChainData: ChainData; - try { - linkedVertexChainData = - await this.nodeManager.requestChainData( - linkedVertexNodeId, - ); - } catch (e) { - if ( - e instanceof nodesErrors.ErrorNodeConnectionDestroyed || - e instanceof nodesErrors.ErrorNodeConnectionTimeout - ) { - if (!this.visitedVertices.has(linkedVertexGK)) { - await this.pushKeyToDiscoveryQueue(linkedVertexGK); - } - this.logger.error( - `Failed to discover ${nodesUtils.encodeNodeId( - linkedVertexNodeId, - )} - ${e.toString()}`, - ); - continue; - } else { - throw e; - } - } - // With this verified chain, we can link - const linkedVertexNodeInfo: NodeInfo = { - id: nodesUtils.encodeNodeId(linkedVertexNodeId), - chain: linkedVertexChainData, - }; - await this.gestaltGraph.linkNodeAndNode( - vertexNodeInfo, - linkedVertexNodeInfo, - ); - // Add this vertex to the queue if it hasn't already been visited - if (!this.visitedVertices.has(linkedVertexGK)) { - await this.pushKeyToDiscoveryQueue(linkedVertexGK); - } - } - // Else the claim is to an identity - if (claim.payload.data.type === 'identity') { - // Attempt to get the identity info on the identity provider - const identityInfo = await this.getIdentityInfo( - claim.payload.data.provider, - claim.payload.data.identity, - ); - // If we can't get identity info, simply skip this claim - if (identityInfo == null) { - continue; - } - // Link the node to the found identity info - await this.gestaltGraph.linkNodeAndIdentity( - vertexNodeInfo, - identityInfo, - ); - // Add this identity vertex to the queue if it is not present - const linkedIdentityGK = gestaltsUtils.keyFromIdentity( - claim.payload.data.provider, - claim.payload.data.identity, - ); - if (!this.visitedVertices.has(linkedIdentityGK)) { - await this.pushKeyToDiscoveryQueue(linkedIdentityGK); - } - } + await this.gestaltGraph.linkNodeAndNode( + vertexNodeInfo, + linkedVertexNodeInfo, + ); + // Add this vertex to the queue if it hasn't already been visited + if (!this.visitedVertices.has(linkedVertexGK)) { + await this.scheduleDiscoveryForVertex(linkedVertexGK); } } - break; - case 'identity': - { - // If the next vertex is an identity, perform a social discovery - // Firstly get the identity info of this identity - const vertexIdentityInfo = await this.getIdentityInfo( - vertexGId.providerId, - vertexGId.identityId, + // Else the claim is to an identity + if (claim.payload.data.type === 'identity') { + // Attempt to get the identity info on the identity provider + // TODO: this needs to be cancellable + const identityInfo = await this.getIdentityInfo( + claim.payload.data.provider, + claim.payload.data.identity, ); - // If we don't have identity info, simply skip this vertex - if (vertexIdentityInfo == null) { + // If we can't get identity info, simply skip this claim + if (identityInfo == null) { continue; } - // Link the identity with each node from its claims on the provider - // Iterate over each of the claims - for (const id in vertexIdentityInfo.claims) { - const identityClaimId = id as IdentityClaimId; - const claim = vertexIdentityInfo.claims[identityClaimId]; - // Claims on an identity provider will always be node -> identity - // So just cast payload data as such - const data = claim.payload.data as ClaimLinkIdentity; - const linkedVertexNodeId = nodesUtils.decodeNodeId(data.node)!; - const linkedVertexGK = - gestaltsUtils.keyFromNode(linkedVertexNodeId); - // Get the chain data of this claimed node (so that we can link in GG) - let linkedVertexChainData: ChainData; - try { - linkedVertexChainData = - await this.nodeManager.requestChainData(linkedVertexNodeId); - } catch (e) { - if ( - e instanceof nodesErrors.ErrorNodeConnectionDestroyed || - e instanceof nodesErrors.ErrorNodeConnectionTimeout || - e instanceof networkErrors.ErrorConnectionNotRunning - ) { - if (!this.visitedVertices.has(linkedVertexGK)) { - await this.pushKeyToDiscoveryQueue(linkedVertexGK); - } - this.logger.error( - `Failed to discover ${data.node} - ${e.toString()}`, - ); - continue; - } else { - throw e; - } - } - // With this verified chain, we can link - const linkedVertexNodeInfo: NodeInfo = { - id: nodesUtils.encodeNodeId(linkedVertexNodeId), - chain: linkedVertexChainData, - }; - await this.gestaltGraph.linkNodeAndIdentity( - linkedVertexNodeInfo, - vertexIdentityInfo, - ); - // Add this vertex to the queue if it is not present + // Link the node to the found identity info + await this.gestaltGraph.linkNodeAndIdentity( + vertexNodeInfo, + identityInfo, + ); + // Add this identity vertex to the queue if it is not present + const linkedIdentityGK = gestaltsUtils.keyFromIdentity( + claim.payload.data.provider, + claim.payload.data.identity, + ); + if (!this.visitedVertices.has(linkedIdentityGK)) { + await this.scheduleDiscoveryForVertex(linkedIdentityGK); + } + } + } + } + break; + case 'identity': + { + // If the next vertex is an identity, perform a social discovery + // Firstly get the identity info of this identity + // TODO: this needs to be cancellable + const vertexIdentityInfo = await this.getIdentityInfo( + vertexGId.providerId, + vertexGId.identityId, + ); + // If we don't have identity info, simply skip this vertex + if (vertexIdentityInfo == null) { + return; + } + // Link the identity with each node from its claims on the provider + // Iterate over each of the claims + for (const id in vertexIdentityInfo.claims) { + if (ctx.signal.aborted) throw ctx.signal.reason; + const identityClaimId = id as IdentityClaimId; + const claim = vertexIdentityInfo.claims[identityClaimId]; + // Claims on an identity provider will always be node -> identity + // So just cast payload data as such + const data = claim.payload.data as ClaimLinkIdentity; + const linkedVertexNodeId = nodesUtils.decodeNodeId(data.node)!; + const linkedVertexGK = + gestaltsUtils.keyFromNode(linkedVertexNodeId); + // Get the chain data of this claimed node (so that we can link in GG) + let linkedVertexChainData: ChainData; + try { + linkedVertexChainData = await this.nodeManager.requestChainData( + linkedVertexNodeId, + ); + } catch (e) { + if ( + e instanceof nodesErrors.ErrorNodeConnectionDestroyed || + e instanceof nodesErrors.ErrorNodeConnectionTimeout || + e instanceof networkErrors.ErrorConnectionNotRunning + ) { if (!this.visitedVertices.has(linkedVertexGK)) { - await this.pushKeyToDiscoveryQueue(linkedVertexGK); + await this.scheduleDiscoveryForVertex(linkedVertexGK); } + this.logger.error( + `Failed to discover ${data.node} - ${e.toString()}`, + ); + continue; + } else { + throw e; } } - break; - default: - never(); + // With this verified chain, we can link + const linkedVertexNodeInfo: NodeInfo = { + id: nodesUtils.encodeNodeId(linkedVertexNodeId), + chain: linkedVertexChainData, + }; + await this.gestaltGraph.linkNodeAndIdentity( + linkedVertexNodeInfo, + vertexIdentityInfo, + ); + // Add this vertex to the queue if it is not present + if (!this.visitedVertices.has(linkedVertexGK)) { + await this.scheduleDiscoveryForVertex(linkedVertexGK); + } + } } - this.visitedVertices.add(vertex); - await this.removeKeyFromDiscoveryQueue(vertexId); - } + break; + default: + never(); } + this.visitedVertices.add(vertex); } /** - * Will resolve once the queue has drained + * Will resolve once all existing discovery tasks have finished. + * Returns the number of existing tasks that were awaited. */ - public async waitForDrained(): Promise { - await this.queueDrained.p; + public async waitForDiscoveryTasks(): Promise { + const promises: Array> = []; + for await (const task of this.taskManager.getTasks('asc', false, [ + this.constructor.name, + this.discoverVertexHandlerId, + ])) { + promises.push(task.promise()); + } + await Promise.all(promises); + return promises.length; } /** - * Simple check for whether the Discovery Queue is empty. + * Simple check for whether there are existing discovery tasks. + * Returns the number of tasks to avoid boolean blindness. */ - protected async queueIsEmpty(): Promise { - let nextDiscoveryQueueId: DiscoveryQueueId | undefined; - const keyIterator = this.db.iterator(this.discoveryQueueDbPath, { - limit: 1, - values: false, - }); - for await (const [keyPath] of keyIterator) { - const key = keyPath[0] as Buffer; - nextDiscoveryQueueId = IdInternal.fromBuffer(key); + protected async hasDiscoveryTasks(): Promise { + let count = 0; + for await (const _ of this.taskManager.getTasks('asc', true, [ + this.constructor.name, + this.discoverVertexHandlerId, + ])) { + count += 1; } - return nextDiscoveryQueueId == null; + return count; } /** - * Push a Gestalt Key to the Discovery Queue. This process also unlocks - * the queue if it was previously locked (due to being empty) - * Will only add the Key if it does not already exist in the queue + * Creates a task that to discover a vertex. + * Will not create a new task if an existing task for the vertex exists. */ - protected async pushKeyToDiscoveryQueue( - gestaltKey: GestaltKey, - ): Promise { - await this.db.withTransactionF(async (tran) => { - const valueIterator = tran.iterator( - this.discoveryQueueDbPath, - { valueAsBuffer: false }, + protected async scheduleDiscoveryForVertex( + vertex: GestaltKey, + tran?: DBTransaction, + ) { + if (tran == null) { + return this.db.withTransactionF((tran) => + this.scheduleDiscoveryForVertex(vertex, tran), ); - for await (const [, value] of valueIterator) { - if (value === gestaltKey) { - return; - } - } - const discoveryQueueId = this.discoveryQueueIdGenerator(); - await tran.put( - [...this.discoveryQueueDbPath, idUtils.toBuffer(discoveryQueueId)], - gestaltKey, - ); - }); - this.queuePlug.resolveP(); - } + } - /** - * Remove a Gestalt Key from the Discovery Queue by its QueueId. This should - * only be done after a Key has been discovered in order to remove it from - * the beginning of the queue. - */ - protected async removeKeyFromDiscoveryQueue( - keyId: DiscoveryQueueId, - ): Promise { - await this.db.del([...this.discoveryQueueDbPath, idUtils.toBuffer(keyId)]); + // Locking on vertex to avoid duplicates + await tran.lock( + [this.constructor.name, this.discoverVertexHandlerId, vertex].join(''), + ); + // Check if task exists + let taskExists = false; + for await (const task of this.taskManager.getTasks( + 'asc', + true, + [this.constructor.name, this.discoverVertexHandlerId, vertex], + tran, + )) { + if (!taskExists) { + taskExists = true; + continue; + } + // Any extra tasks should be cancelled, this shouldn't normally happen + task.cancel(abortSingletonTaskReason); + } + // Create a new task if none exists + await this.taskManager.scheduleTask( + { + handlerId: this.discoverVertexHandlerId, + parameters: [vertex], + path: [this.constructor.name, this.discoverVertexHandlerId, vertex], + lazy: true, + }, + tran, + ); } /** diff --git a/src/discovery/types.ts b/src/discovery/types.ts deleted file mode 100644 index c91021c7e..000000000 --- a/src/discovery/types.ts +++ /dev/null @@ -1,11 +0,0 @@ -import type { Opaque } from '../types'; -import type { Id } from '@matrixai/id'; - -/** - * Used to preserve order in the Discovery Queue. - */ -type DiscoveryQueueId = Opaque<'DiscoveryQueueId', Id>; - -type DiscoveryQueueIdGenerator = () => DiscoveryQueueId; - -export type { DiscoveryQueueId, DiscoveryQueueIdGenerator }; diff --git a/src/discovery/utils.ts b/src/discovery/utils.ts deleted file mode 100644 index b0c774a63..000000000 --- a/src/discovery/utils.ts +++ /dev/null @@ -1,13 +0,0 @@ -import type { DiscoveryQueueId, DiscoveryQueueIdGenerator } from './types'; -import { IdSortable } from '@matrixai/id'; - -function createDiscoveryQueueIdGenerator( - lastId?: DiscoveryQueueId, -): DiscoveryQueueIdGenerator { - const idSortableGenerator = new IdSortable({ - lastId, - }); - return (): DiscoveryQueueId => idSortableGenerator.get(); -} - -export { createDiscoveryQueueIdGenerator }; diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 7536b580d..62cdc324c 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -174,6 +174,9 @@ class NodeManager { public async stop() { this.logger.info(`Stopping ${this.constructor.name}`); this.logger.info('Cancelling ephemeral tasks'); + if (this.taskManager.isProcessing()) { + throw new tasksErrors.ErrorTaskManagerProcessing(); + } const tasks: Array> = []; for await (const task of this.taskManager.getTasks('asc', false, [ this.basePath, diff --git a/src/tasks/TaskManager.ts b/src/tasks/TaskManager.ts index d4c00b032..4242605b4 100644 --- a/src/tasks/TaskManager.ts +++ b/src/tasks/TaskManager.ts @@ -240,6 +240,10 @@ class TaskManager { await Promise.all([this.stopQueueing(), this.stopScheduling()]); } + public isProcessing(): boolean { + return this.schedulingLoop != null && this.queuingLoop != null; + } + /** * Stop the active tasks * This call is idempotent diff --git a/src/tasks/errors.ts b/src/tasks/errors.ts index 601eaf223..306fc139b 100644 --- a/src/tasks/errors.ts +++ b/src/tasks/errors.ts @@ -35,6 +35,11 @@ class ErrorTaskManagerQueue extends ErrorTasks { exitCode = sysexits.SOFTWARE; } +class ErrorTaskManagerProcessing extends ErrorTasks { + static description = 'TaskManager is processing'; + exitCode = sysexits.USAGE; +} + class ErrorTask extends ErrorTasks { static description = 'Task error'; exitCode = sysexits.USAGE; @@ -106,6 +111,7 @@ export { ErrorTaskManagerDestroyed, ErrorTaskManagerScheduler, ErrorTaskManagerQueue, + ErrorTaskManagerProcessing, ErrorTask, ErrorTaskMissing, ErrorTaskHandlerMissing, diff --git a/tests/bin/identities/trustUntrustList.test.ts b/tests/bin/identities/trustUntrustList.test.ts index 7f6ad6c2c..ddb007d09 100644 --- a/tests/bin/identities/trustUntrustList.test.ts +++ b/tests/bin/identities/trustUntrustList.test.ts @@ -152,7 +152,10 @@ describe('trust/untrust/list', () => { expect(exitCode).toBe(0); // Since discovery is a background process we need to wait for the // gestalt to be discovered - await pkAgent.discovery.waitForDrained(); + let existingTasks: number = 0; + do { + existingTasks = await pkAgent.discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); // Check that gestalt was discovered and permission was set ({ exitCode, stdout } = await testUtils.pkStdio( ['identities', 'list', '--format', 'json'], @@ -285,7 +288,10 @@ describe('trust/untrust/list', () => { expect(exitCode).toBe(sysexits.NOUSER); // Since discovery is a background process we need to wait for the // gestalt to be discovered - await pkAgent.discovery.waitForDrained(); + let existingTasks: number = 0; + do { + existingTasks = await pkAgent.discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); // This time the command should succeed ({ exitCode } = await testUtils.pkStdio( ['identities', 'trust', providerString], diff --git a/tests/client/service/gestaltsDiscoveryByIdentity.test.ts b/tests/client/service/gestaltsDiscoveryByIdentity.test.ts index d4c64807e..ec7984151 100644 --- a/tests/client/service/gestaltsDiscoveryByIdentity.test.ts +++ b/tests/client/service/gestaltsDiscoveryByIdentity.test.ts @@ -138,7 +138,6 @@ describe('gestaltsDiscoveryByIdentity', () => { }); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); - await taskManager.startProcessing(); discovery = await Discovery.createDiscovery({ db, keyManager, @@ -146,8 +145,10 @@ describe('gestaltsDiscoveryByIdentity', () => { identitiesManager, nodeManager, sigchain, + taskManager, logger, }); + await taskManager.startProcessing(); const clientService = { gestaltsDiscoveryByIdentity: gestaltsDiscoveryByIdentity({ authenticate, diff --git a/tests/client/service/gestaltsDiscoveryByNode.test.ts b/tests/client/service/gestaltsDiscoveryByNode.test.ts index 0354ed66f..e5572a7f6 100644 --- a/tests/client/service/gestaltsDiscoveryByNode.test.ts +++ b/tests/client/service/gestaltsDiscoveryByNode.test.ts @@ -139,7 +139,6 @@ describe('gestaltsDiscoveryByNode', () => { }); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); - await taskManager.start(); discovery = await Discovery.createDiscovery({ db, keyManager, @@ -147,8 +146,10 @@ describe('gestaltsDiscoveryByNode', () => { identitiesManager, nodeManager, sigchain, + taskManager, logger, }); + await taskManager.startProcessing(); const clientService = { gestaltsDiscoveryByNode: gestaltsDiscoveryByNode({ authenticate, diff --git a/tests/client/service/gestaltsGestaltTrustByIdentity.test.ts b/tests/client/service/gestaltsGestaltTrustByIdentity.test.ts index ef30452fb..fd16175f8 100644 --- a/tests/client/service/gestaltsGestaltTrustByIdentity.test.ts +++ b/tests/client/service/gestaltsGestaltTrustByIdentity.test.ts @@ -198,7 +198,6 @@ describe('gestaltsGestaltTrustByIdentity', () => { }); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); - await taskManager.startProcessing(); await nodeManager.setNode(nodesUtils.decodeNodeId(nodeId)!, { host: node.proxy.getProxyHost(), port: node.proxy.getProxyPort(), @@ -210,8 +209,10 @@ describe('gestaltsGestaltTrustByIdentity', () => { identitiesManager, nodeManager, sigchain, + taskManager, logger, }); + await taskManager.startProcessing(); const clientService = { gestaltsGestaltTrustByIdentity: gestaltsGestaltTrustByIdentity({ authenticate, @@ -310,7 +311,10 @@ describe('gestaltsGestaltTrustByIdentity', () => { gestaltsErrors.ErrorGestaltsGraphIdentityIdMissing, ); // Wait for both identity and node to be set in GG - await discovery.waitForDrained(); + let existingTasks: number = 0; + do { + existingTasks = await discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); const response = await grpcClient.gestaltsGestaltTrustByIdentity( request, clientUtils.encodeAuthFromPassword(password), @@ -409,7 +413,10 @@ describe('gestaltsGestaltTrustByIdentity', () => { ); // Wait and try again - should succeed second time // Wait for both identity and node to be set in GG - await discovery.waitForDrained(); + let existingTasks: number = 0; + do { + existingTasks = await discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); const response = await grpcClient.gestaltsGestaltTrustByIdentity( request, clientUtils.encodeAuthFromPassword(password), diff --git a/tests/client/service/gestaltsGestaltTrustByNode.test.ts b/tests/client/service/gestaltsGestaltTrustByNode.test.ts index e190787ff..5d50160fe 100644 --- a/tests/client/service/gestaltsGestaltTrustByNode.test.ts +++ b/tests/client/service/gestaltsGestaltTrustByNode.test.ts @@ -206,7 +206,6 @@ describe('gestaltsGestaltTrustByNode', () => { }); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); - await taskManager.startProcessing(); await nodeManager.setNode(nodesUtils.decodeNodeId(nodeId)!, { host: node.proxy.getProxyHost(), port: node.proxy.getProxyPort(), @@ -218,8 +217,10 @@ describe('gestaltsGestaltTrustByNode', () => { identitiesManager, nodeManager, sigchain, + taskManager, logger, }); + await taskManager.startProcessing(); const clientService = { gestaltsGestaltTrustByNode: gestaltsGestaltTrustByNode({ authenticate, diff --git a/tests/discovery/Discovery.test.ts b/tests/discovery/Discovery.test.ts index f99c45ee9..18e66a808 100644 --- a/tests/discovery/Discovery.test.ts +++ b/tests/discovery/Discovery.test.ts @@ -6,6 +6,7 @@ import path from 'path'; import os from 'os'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import { DB } from '@matrixai/db'; +import { PromiseCancellable } from '@matrixai/async-cancellable'; import TaskManager from '@/tasks/TaskManager'; import PolykeyAgent from '@/PolykeyAgent'; import Discovery from '@/discovery/Discovery'; @@ -59,7 +60,16 @@ describe('Discovery', () => { let nodeA: PolykeyAgent; let nodeB: PolykeyAgent; let identityId: IdentityId; + + const mockedRefreshBucket = jest.spyOn( + NodeManager.prototype, + 'refreshBucket', + ); + beforeEach(async () => { + mockedRefreshBucket.mockImplementation( + () => new PromiseCancellable((resolve) => resolve()), + ); // Sets the global GRPC logger to the logger grpcUtils.setLogger(logger); dataDir = await fs.promises.mkdtemp( @@ -152,7 +162,6 @@ describe('Discovery', () => { }); await nodeManager.start(); await nodeConnectionManager.start({ nodeManager }); - await taskManager.startProcessing(); // Set up other gestalt nodeA = await PolykeyAgent.createPolykeyAgent({ password: password, @@ -224,6 +233,7 @@ describe('Discovery', () => { force: true, recursive: true, }); + mockedRefreshBucket.mockRestore(); }); test('discovery readiness', async () => { const discovery = await Discovery.createDiscovery({ @@ -233,6 +243,7 @@ describe('Discovery', () => { identitiesManager, nodeManager, sigchain, + taskManager, logger, }); await expect(discovery.destroy()).rejects.toThrow( @@ -256,10 +267,15 @@ describe('Discovery', () => { identitiesManager, nodeManager, sigchain, + taskManager, logger, }); + await taskManager.startProcessing(); await discovery.queueDiscoveryByNode(nodeA.keyManager.getNodeId()); - await discovery.waitForDrained(); + let existingTasks: number = 0; + do { + existingTasks = await discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); const gestalt = await gestaltGraph.getGestalts(); const gestaltMatrix = gestalt[0].matrix; const gestaltNodes = gestalt[0].nodes; @@ -279,6 +295,7 @@ describe('Discovery', () => { await gestaltGraph.unsetNode(nodeA.keyManager.getNodeId()); await gestaltGraph.unsetNode(nodeB.keyManager.getNodeId()); await gestaltGraph.unsetIdentity(testToken.providerId, identityId); + await taskManager.stopProcessing(); await discovery.stop(); await discovery.destroy(); }); @@ -290,10 +307,15 @@ describe('Discovery', () => { identitiesManager, nodeManager, sigchain, + taskManager, logger, }); + await taskManager.startProcessing(); await discovery.queueDiscoveryByIdentity(testToken.providerId, identityId); - await discovery.waitForDrained(); + let existingTasks: number = 0; + do { + existingTasks = await discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); const gestalt = (await gestaltGraph.getGestalts())[0]; const gestaltMatrix = gestalt.matrix; const gestaltNodes = gestalt.nodes; @@ -313,6 +335,7 @@ describe('Discovery', () => { await gestaltGraph.unsetNode(nodeA.keyManager.getNodeId()); await gestaltGraph.unsetNode(nodeB.keyManager.getNodeId()); await gestaltGraph.unsetIdentity(testToken.providerId, identityId); + await taskManager.stopProcessing(); await discovery.stop(); await discovery.destroy(); }); @@ -324,10 +347,15 @@ describe('Discovery', () => { identitiesManager, nodeManager, sigchain, + taskManager, logger, }); + await taskManager.startProcessing(); await discovery.queueDiscoveryByNode(nodeA.keyManager.getNodeId()); - await discovery.waitForDrained(); + let existingTasks: number = 0; + do { + existingTasks = await discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); const gestalt1 = (await gestaltGraph.getGestalts())[0]; const gestaltMatrix1 = gestalt1.matrix; const gestaltNodes1 = gestalt1.nodes; @@ -361,7 +389,9 @@ describe('Discovery', () => { // Note that eventually we would like to add in a system of revisiting // already discovered vertices, however for now we must do this manually. await discovery.queueDiscoveryByNode(nodeA.keyManager.getNodeId()); - await discovery.waitForDrained(); + do { + existingTasks = await discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); const gestalt2 = (await gestaltGraph.getGestalts())[0]; const gestaltMatrix2 = gestalt2.matrix; const gestaltNodes2 = gestalt2.nodes; @@ -386,6 +416,7 @@ describe('Discovery', () => { // Can just remove the user that the claim is for as this will cause the // claim to be dropped during discovery delete testProvider.users[identityId2]; + await taskManager.stopProcessing(); await discovery.stop(); await discovery.destroy(); }); @@ -397,12 +428,17 @@ describe('Discovery', () => { identitiesManager, nodeManager, sigchain, + taskManager, logger, }); await discovery.queueDiscoveryByNode(nodeA.keyManager.getNodeId()); await discovery.stop(); await discovery.start(); - await discovery.waitForDrained(); + await taskManager.startProcessing(); + let existingTasks: number = 0; + do { + existingTasks = await discovery.waitForDiscoveryTasks(); + } while (existingTasks > 0); const gestalt = (await gestaltGraph.getGestalts())[0]; const gestaltMatrix = gestalt.matrix; const gestaltNodes = gestalt.nodes; @@ -422,6 +458,7 @@ describe('Discovery', () => { await gestaltGraph.unsetNode(nodeA.keyManager.getNodeId()); await gestaltGraph.unsetNode(nodeB.keyManager.getNodeId()); await gestaltGraph.unsetIdentity(testToken.providerId, identityId); + await taskManager.stopProcessing(); await discovery.stop(); await discovery.destroy(); }); diff --git a/tests/nodes/NodeConnectionManager.seednodes.test.ts b/tests/nodes/NodeConnectionManager.seednodes.test.ts index 033a2f87d..bf25fca1a 100644 --- a/tests/nodes/NodeConnectionManager.seednodes.test.ts +++ b/tests/nodes/NodeConnectionManager.seednodes.test.ts @@ -241,6 +241,7 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { } finally { // Clean up await nodeConnectionManager?.stop(); + await taskManager.stopProcessing(); await nodeManager?.stop(); } }); @@ -323,6 +324,7 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { } finally { mockedRefreshBucket.mockRestore(); mockedPingNode.mockRestore(); + await taskManager.stopProcessing(); await nodeManager?.stop(); await nodeConnectionManager?.stop(); } @@ -384,6 +386,7 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { } finally { mockedRefreshBucket.mockRestore(); mockedPingNode.mockRestore(); + await taskManager.stopProcessing(); await nodeManager?.stop(); await nodeConnectionManager?.stop(); } @@ -457,6 +460,7 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { mockedRefreshBucket.mockRestore(); mockedPingNode.mockRestore(); await nodeConnectionManager?.stop(); + await taskManager.stopProcessing(); await nodeManager?.stop(); } });