Skip to content

Commit

Permalink
wip: NodeManager using Scheduler and Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
tegefaulkes committed Sep 5, 2022
1 parent 7c5bab0 commit d969836
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,8 @@ class PolykeyAgent {
keyManager,
nodeGraph,
nodeConnectionManager,
queue,
scheduler: taskScheduler,
taskQueue,
logger: logger.getChild(NodeManager.name),
});
await nodeManager.start();
Expand Down
40 changes: 32 additions & 8 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { DB } from '@matrixai/db';
import type NodeConnectionManager from './NodeConnectionManager';
import type NodeGraph from './NodeGraph';
import type Queue from './Queue';
import type KeyManager from '../keys/KeyManager';
import type { PublicKeyPem } from '../keys/types';
import type Sigchain from '../sigchain/Sigchain';
Expand All @@ -12,6 +11,9 @@ import type { Timer } from '../types';
import type { PromiseDeconstructed } from '../types';
import type { DBTransaction } from '@matrixai/db';
import type { NodeId, NodeAddress } from '../nodes/types';
import type Scheduler from '../tasks/Scheduler';
import type TaskQueue from '../tasks/Queue';
import type { TaskHandlerId } from '../tasks/types';
import Logger from '@matrixai/logger';
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
import * as nodesErrors from './errors';
Expand All @@ -33,7 +35,8 @@ class NodeManager {
protected keyManager: KeyManager;
protected nodeConnectionManager: NodeConnectionManager;
protected nodeGraph: NodeGraph;
protected queue: Queue;
protected scheduler: Scheduler;
protected taskQueue: TaskQueue;
// Refresh bucket timer
protected refreshBucketDeadlineMap: Map<NodeBucketIndex, number> = new Map();
protected refreshBucketTimer: NodeJS.Timer;
Expand All @@ -47,13 +50,16 @@ class NodeManager {
protected refreshBucketQueuePause_: PromiseDeconstructed<void> = promise();
protected refreshBucketQueueAbortController: AbortController;

public readonly setNodeHanderId = 'setNode' as TaskHandlerId;

constructor({
db,
keyManager,
sigchain,
nodeConnectionManager,
nodeGraph,
queue,
scheduler,
taskQueue,
refreshBucketTimerDefault = 3600000, // 1 hour in milliseconds
logger,
}: {
Expand All @@ -62,7 +68,8 @@ class NodeManager {
sigchain: Sigchain;
nodeConnectionManager: NodeConnectionManager;
nodeGraph: NodeGraph;
queue: Queue;
scheduler: Scheduler;
taskQueue: TaskQueue;
refreshBucketTimerDefault?: number;
logger?: Logger;
}) {
Expand All @@ -72,21 +79,30 @@ class NodeManager {
this.sigchain = sigchain;
this.nodeConnectionManager = nodeConnectionManager;
this.nodeGraph = nodeGraph;
this.queue = queue;
this.scheduler = scheduler;
this.taskQueue = taskQueue;
this.refreshBucketTimerDefault = refreshBucketTimerDefault;
}

public async start() {
this.logger.info(`Starting ${this.constructor.name}`);
this.startRefreshBucketTimers();
this.refreshBucketQueueRunner = this.startRefreshBucketQueue();
this.logger.info(`Registering handler for setNode`);
this.taskQueue.registerHandler(
this.setNodeHanderId,
async (nodeId, nodeAddress, timeout) =>
this.setNode(nodeId, nodeAddress, true, false, timeout),
);
this.logger.info(`Started ${this.constructor.name}`);
}

public async stop() {
this.logger.info(`Stopping ${this.constructor.name}`);
await this.stopRefreshBucketTimers();
await this.stopRefreshBucketQueue();
this.logger.info(`Unregistering handler for setNode`);
this.taskQueue.deregisterHandler(this.setNodeHanderId);
this.logger.info(`Stopped ${this.constructor.name}`);
}

Expand Down Expand Up @@ -478,9 +494,15 @@ class NodeManager {
nodeId,
)} to queue`,
);
// Re-attempt this later asynchronously by adding the the queue
this.queue.push(() =>
this.setNode(nodeId, nodeAddress, true, false, timeout),
// Re-attempt this later asynchronously by adding to the scheduler
await this.scheduler.scheduleTask(
this.setNodeHanderId,
[nodeId, nodeAddress, timeout],
undefined,
undefined,
['setNode'],
true,
tran,
);
}
}
Expand All @@ -494,6 +516,8 @@ class NodeManager {
) {
const oldestNodeIds = await this.nodeGraph.getOldestNode(bucketIndex, 3);
// We want to concurrently ping the nodes
// Fixme, remove concurrency? we'd want to stick to 1 active connection per
// background task
const pingPromises = oldestNodeIds.map((nodeId) => {
const doPing = async (): Promise<{
nodeId: NodeId;
Expand Down

0 comments on commit d969836

Please sign in to comment.