Skip to content

Commit

Permalink
Merge pull request #445 from MatrixAI/feature-tasks_implementation
Browse files Browse the repository at this point in the history
Integrate TaskManager into NodeGraph and Discovery
  • Loading branch information
CMCDragonkai authored Sep 21, 2022
2 parents 54f0c2b + 23f470b commit 586750e
Show file tree
Hide file tree
Showing 42 changed files with 1,574 additions and 1,103 deletions.
14 changes: 7 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@
"@grpc/grpc-js": "1.6.7",
"@matrixai/async-cancellable": "^1.0.2",
"@matrixai/async-init": "^1.8.2",
"@matrixai/async-locks": "^3.1.2",
"@matrixai/async-locks": "^3.2.0",
"@matrixai/db": "^5.0.3",
"@matrixai/errors": "^1.1.3",
"@matrixai/id": "^3.3.3",
Expand Down
60 changes: 33 additions & 27 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import process from 'process';
import Logger from '@matrixai/logger';
import { DB } from '@matrixai/db';
import { CreateDestroyStartStop } from '@matrixai/async-init/dist/CreateDestroyStartStop';
import Queue from './nodes/Queue';
import * as networkUtils from './network/utils';
import KeyManager from './keys/KeyManager';
import Status from './status/Status';
Expand All @@ -35,6 +34,7 @@ import * as errors from './errors';
import * as utils from './utils';
import * as keysUtils from './keys/utils';
import * as nodesUtils from './nodes/utils';
import TaskManager from './tasks/TaskManager';

type NetworkConfig = {
forwardHost?: Host;
Expand Down Expand Up @@ -87,8 +87,8 @@ class PolykeyAgent {
acl,
gestaltGraph,
proxy,
taskManager,
nodeGraph,
queue,
nodeConnectionManager,
nodeManager,
discovery,
Expand Down Expand Up @@ -134,8 +134,8 @@ class PolykeyAgent {
acl?: ACL;
gestaltGraph?: GestaltGraph;
proxy?: Proxy;
taskManager?: TaskManager;
nodeGraph?: NodeGraph;
queue?: Queue;
nodeConnectionManager?: NodeConnectionManager;
nodeManager?: NodeManager;
discovery?: Discovery;
Expand Down Expand Up @@ -285,18 +285,21 @@ class PolykeyAgent {
keyManager,
logger: logger.getChild(NodeGraph.name),
}));
queue =
queue ??
new Queue({
logger: logger.getChild(Queue.name),
});
taskManager =
taskManager ??
(await TaskManager.createTaskManager({
db,
fresh,
lazy: true,
logger,
}));
nodeConnectionManager =
nodeConnectionManager ??
new NodeConnectionManager({
keyManager,
nodeGraph,
proxy,
queue,
taskManager,
seedNodes,
...nodeConnectionManagerConfig_,
logger: logger.getChild(NodeConnectionManager.name),
Expand All @@ -309,7 +312,7 @@ class PolykeyAgent {
keyManager,
nodeGraph,
nodeConnectionManager,
queue,
taskManager,
logger: logger.getChild(NodeManager.name),
});
await nodeManager.start();
Expand Down Expand Up @@ -373,6 +376,7 @@ class PolykeyAgent {
await notificationsManager?.stop();
await vaultManager?.stop();
await discovery?.stop();
await taskManager?.stop();
await proxy?.stop();
await gestaltGraph?.stop();
await acl?.stop();
Expand All @@ -396,7 +400,7 @@ class PolykeyAgent {
gestaltGraph,
proxy,
nodeGraph,
queue,
taskManager,
nodeConnectionManager,
nodeManager,
discovery,
Expand Down Expand Up @@ -429,7 +433,7 @@ class PolykeyAgent {
public readonly gestaltGraph: GestaltGraph;
public readonly proxy: Proxy;
public readonly nodeGraph: NodeGraph;
public readonly queue: Queue;
public readonly taskManager: TaskManager;
public readonly nodeConnectionManager: NodeConnectionManager;
public readonly nodeManager: NodeManager;
public readonly discovery: Discovery;
Expand All @@ -454,7 +458,7 @@ class PolykeyAgent {
gestaltGraph,
proxy,
nodeGraph,
queue,
taskManager,
nodeConnectionManager,
nodeManager,
discovery,
Expand All @@ -478,7 +482,7 @@ class PolykeyAgent {
gestaltGraph: GestaltGraph;
proxy: Proxy;
nodeGraph: NodeGraph;
queue: Queue;
taskManager: TaskManager;
nodeConnectionManager: NodeConnectionManager;
nodeManager: NodeManager;
discovery: Discovery;
Expand All @@ -504,7 +508,7 @@ class PolykeyAgent {
this.proxy = proxy;
this.discovery = discovery;
this.nodeGraph = nodeGraph;
this.queue = queue;
this.taskManager = taskManager;
this.nodeConnectionManager = nodeConnectionManager;
this.nodeManager = nodeManager;
this.vaultManager = vaultManager;
Expand Down Expand Up @@ -578,14 +582,10 @@ class PolykeyAgent {
);
// Reverse connection was established and authenticated,
// add it to the node graph
await this.nodeManager.setNode(
data.remoteNodeId,
{
host: data.remoteHost,
port: data.remotePort,
},
false,
);
await this.nodeManager.setNode(data.remoteNodeId, {
host: data.remoteHost,
port: data.remotePort,
});
}
},
);
Expand Down Expand Up @@ -667,15 +667,16 @@ class PolykeyAgent {
proxyPort: networkConfig_.proxyPort,
tlsConfig,
});
await this.queue.start();
await this.taskManager.start({ fresh, lazy: true });
await this.nodeManager.start();
await this.nodeConnectionManager.start({ nodeManager: this.nodeManager });
await this.nodeGraph.start({ fresh });
await this.nodeConnectionManager.syncNodeGraph(false);
await this.nodeManager.syncNodeGraph(false);
await this.discovery.start({ fresh });
await this.vaultManager.start({ fresh });
await this.notificationsManager.start({ fresh });
await this.sessionManager.start({ fresh });
await this.taskManager.startProcessing();
await this.status.finishStart({
pid: process.pid,
nodeId: this.keyManager.getNodeId(),
Expand All @@ -693,14 +694,16 @@ class PolykeyAgent {
this.logger.warn(`Failed Starting ${this.constructor.name}`);
this.events.removeAllListeners();
await this.status?.beginStop({ pid: process.pid });
await this.taskManager?.stopProcessing();
await this.taskManager?.stopTasks();
await this.sessionManager?.stop();
await this.notificationsManager?.stop();
await this.vaultManager?.stop();
await this.discovery?.stop();
await this.queue?.stop();
await this.nodeGraph?.stop();
await this.nodeConnectionManager?.stop();
await this.nodeManager?.stop();
await this.taskManager?.stop();
await this.proxy?.stop();
await this.grpcServerAgent?.stop();
await this.grpcServerClient?.stop();
Expand All @@ -723,14 +726,16 @@ class PolykeyAgent {
this.logger.info(`Stopping ${this.constructor.name}`);
this.events.removeAllListeners();
await this.status.beginStop({ pid: process.pid });
await this.taskManager.stopProcessing();
await this.taskManager.stopTasks();
await this.sessionManager.stop();
await this.notificationsManager.stop();
await this.vaultManager.stop();
await this.discovery.stop();
await this.nodeConnectionManager.stop();
await this.nodeGraph.stop();
await this.nodeManager.stop();
await this.queue.stop();
await this.taskManager.stop();
await this.proxy.stop();
await this.grpcServerAgent.stop();
await this.grpcServerClient.stop();
Expand All @@ -755,6 +760,7 @@ class PolykeyAgent {
await this.discovery.destroy();
await this.nodeGraph.destroy();
await this.gestaltGraph.destroy();
await this.taskManager.destroy();
await this.acl.destroy();
await this.sigchain.destroy();
await this.identitiesManager.destroy();
Expand Down
32 changes: 27 additions & 5 deletions src/bin/errors.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
import ErrorPolykey from '../ErrorPolykey';
import sysexits from '../utils/sysexits';

class ErrorCLI<T> extends ErrorPolykey<T> {}
class ErrorBin<T> extends ErrorPolykey<T> {}

class ErrorBinUncaughtException<T> extends ErrorBin<T> {
static description = '';
exitCode = sysexits.SOFTWARE;
}

class ErrorBinUnhandledRejection<T> extends ErrorBin<T> {
static description = '';
exitCode = sysexits.SOFTWARE;
}

class ErrorBinAsynchronousDeadlock<T> extends ErrorBin<T> {
static description =
'PolykeyAgent process exited unexpectedly, likely due to promise deadlock';
exitCode = sysexits.SOFTWARE;
}

class ErrorCLI<T> extends ErrorBin<T> {}

class ErrorCLINodePath<T> extends ErrorCLI<T> {
static description = 'Cannot derive default node path from unknown platform';
Expand Down Expand Up @@ -49,17 +67,21 @@ class ErrorCLIPolykeyAgentProcess<T> extends ErrorCLI<T> {
exitCode = sysexits.OSERR;
}

class ErrorNodeFindFailed<T> extends ErrorCLI<T> {
class ErrorCLINodeFindFailed<T> extends ErrorCLI<T> {
static description = 'Failed to find the node in the DHT';
exitCode = 1;
}

class ErrorNodePingFailed<T> extends ErrorCLI<T> {
class ErrorCLINodePingFailed<T> extends ErrorCLI<T> {
static description = 'Node was not online or not found.';
exitCode = 1;
}

export {
ErrorBin,
ErrorBinUncaughtException,
ErrorBinUnhandledRejection,
ErrorBinAsynchronousDeadlock,
ErrorCLI,
ErrorCLINodePath,
ErrorCLIClientOptions,
Expand All @@ -70,6 +92,6 @@ export {
ErrorCLIFileRead,
ErrorCLIPolykeyAgentStatus,
ErrorCLIPolykeyAgentProcess,
ErrorNodeFindFailed,
ErrorNodePingFailed,
ErrorCLINodeFindFailed,
ErrorCLINodePingFailed,
};
2 changes: 1 addition & 1 deletion src/bin/nodes/CommandFind.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class CommandFind extends CommandPolykey {
);
// Like ping it should error when failing to find node for automation reasons.
if (!result.success) {
throw new binErrors.ErrorNodeFindFailed(result.message);
throw new binErrors.ErrorCLINodeFindFailed(result.message);
}
} finally {
if (pkClient! != null) await pkClient.stop();
Expand Down
4 changes: 2 additions & 2 deletions src/bin/nodes/CommandPing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class CommandPing extends CommandPolykey {
);
} catch (err) {
if (err.cause instanceof nodesErrors.ErrorNodeGraphNodeIdNotFound) {
error = new binErrors.ErrorNodePingFailed(
error = new binErrors.ErrorCLINodePingFailed(
`Failed to resolve node ID ${nodesUtils.encodeNodeId(
nodeId,
)} to an address.`,
Expand All @@ -69,7 +69,7 @@ class CommandPing extends CommandPolykey {
const status = { success: false, message: '' };
status.success = statusMessage ? statusMessage.getSuccess() : false;
if (!status.success && !error) {
error = new binErrors.ErrorNodePingFailed('No response received');
error = new binErrors.ErrorCLINodePingFailed('No response received');
}
if (status.success) status.message = 'Node is Active.';
else status.message = error.message;
Expand Down
Loading

0 comments on commit 586750e

Please sign in to comment.