Skip to content

Commit

Permalink
feat: async queueing for setting nodes
Browse files Browse the repository at this point in the history
`setNode` now has a `blocking` flag that defaults to false. If it encounters a full bucket when adding a node then it will add the operation to the queue and asynchronously trys a blocking `setNode` in the background. `setNode`s will only be added to the queue if the bucket was full.

#322
  • Loading branch information
tegefaulkes authored and emmacasolin committed Jun 14, 2022
1 parent a02cfea commit 4460779
Show file tree
Hide file tree
Showing 3 changed files with 383 additions and 43 deletions.
15 changes: 15 additions & 0 deletions src/nodes/NodeGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,23 @@ class NodeGraph {
nodesUtils.bucketDbKey(nodeId),
]);
if (nodeData != null) {
this.logger.debug(
`Updating node ${nodesUtils.encodeNodeId(
nodeId,
)} in bucket ${bucketIndex}`,
);
// If the node already exists we want to remove the old `lastUpdated`
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(
nodeData.lastUpdated,
nodeId,
);
await tran.del([...lastUpdatedPath, lastUpdatedKey]);
} else {
this.logger.debug(
`Adding node ${nodesUtils.encodeNodeId(
nodeId,
)} to bucket ${bucketIndex}`,
);
// It didn't exist so we want to increment the bucket count
const count = await this.getBucketMetaProp(bucketIndex, 'count', tran);
await this.setBucketMetaProp(bucketIndex, 'count', count + 1, tran);
Expand Down Expand Up @@ -294,6 +304,11 @@ class NodeGraph {
nodesUtils.bucketDbKey(nodeId),
]);
if (nodeData != null) {
this.logger.debug(
`Removing node ${nodesUtils.encodeNodeId(
nodeId,
)} from bucket ${bucketIndex}`,
);
const count = await this.getBucketMetaProp(bucketIndex, 'count', tran);
await this.setBucketMetaProp(bucketIndex, 'count', count - 1, tran);
await tran.del([...bucketPath, nodesUtils.bucketDbKey(nodeId)]);
Expand Down
218 changes: 177 additions & 41 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import type { NodeId, NodeAddress, NodeBucket } from '../nodes/types';
import type { ClaimEncoded } from '../claims/types';
import type { Timer } from '../types';
import Logger from '@matrixai/logger';
import { StartStop } from '@matrixai/async-init/dist/StartStop';
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
import * as nodesErrors from './errors';
import * as nodesUtils from './utils';
import * as networkUtils from '../network/utils';
Expand All @@ -18,6 +18,7 @@ import * as utilsPB from '../proto/js/polykey/v1/utils/utils_pb';
import * as claimsErrors from '../claims/errors';
import * as sigchainUtils from '../sigchain/utils';
import * as claimsUtils from '../claims/utils';
import { timerStart } from '../utils/utils';

interface NodeManager extends StartStop {}
@StartStop()
Expand All @@ -28,6 +29,18 @@ class NodeManager {
protected keyManager: KeyManager;
protected nodeConnectionManager: NodeConnectionManager;
protected nodeGraph: NodeGraph;
// SetNodeQueue
protected endQueue: boolean = false;
protected setNodeQueue: Array<{
nodeId: NodeId;
nodeAddress: NodeAddress;
timeout?: number;
}> = [];
protected setNodeQueuePlug: Promise<void>;
protected setNodeQueueUnplug: (() => void) | undefined;
protected setNodeQueueRunner: Promise<void>;
protected setNodeQueueEmpty: Promise<void>;
protected setNodeQueueDrained: () => void;

constructor({
db,
Expand Down Expand Up @@ -365,16 +378,19 @@ class NodeManager {
* Updates the node if the node already exists.
* @param nodeId - Id of the node we wish to add
* @param nodeAddress - Expected address of the node we want to add
* @param blocking - Flag for if the operation should block or utilize the async queue
* @param force - Flag for if we want to add the node without authenticating or if the bucket is full.
* This will drop the oldest node in favor of the new.
* @param timer Connection timeout timer
* @param timeout Connection timeout timeout
* @param tran
*/
@ready(new nodesErrors.ErrorNodeManagerNotRunning())
public async setNode(
nodeId: NodeId,
nodeAddress: NodeAddress,
blocking: boolean = false,
force: boolean = false,
timer?: Timer,
timeout?: number,
tran: DBTransaction,
): Promise<void> {
// When adding a node we need to handle 3 cases
Expand All @@ -400,53 +416,87 @@ class NodeManager {
} else {
// We want to add a node but the bucket is full
// We need to ping the oldest node
const oldestNodeIds = (await this.nodeGraph.getOldestNode(
bucketIndex,
3,
tran,
))!;
if (force) {
// We just add the new node anyway without checking the old one
const oldNodeId = oldestNodeIds[0];
const oldNodeId = (
await this.nodeGraph.getOldestNode(bucketIndex, 1, tran)
).pop()!;
this.logger.debug(
`Force was set, removing ${nodesUtils.encodeNodeId(
oldNodeId,
)} and adding ${nodesUtils.encodeNodeId(nodeId)}`,
);
await this.nodeGraph.unsetNode(oldNodeId, tran);
await this.nodeGraph.setNode(nodeId, nodeAddress, tran);
return;
}
// We want to concurrently ping the nodes
const pingPromises = oldestNodeIds.map((nodeId) => {
const doPing = async (): Promise<{
nodeId: NodeId;
success: boolean;
}> => {
// This needs to return nodeId and ping result
const data = await this.nodeGraph.getNode(nodeId, tran);
if (data == null) return { nodeId, success: false };
const result = await this.pingNode(nodeId, undefined, timer);
return { nodeId, success: result };
};
return doPing();
});
const pingResults = await Promise.all(pingPromises);
for (const { nodeId, success } of pingResults) {
if (success) {
// Ping succeeded, update the node
const node = (await this.nodeGraph.getNode(nodeId, tran))!;
await this.nodeGraph.setNode(nodeId, node.address, tran);
} else {
// Otherwise we remove the node
await this.nodeGraph.unsetNode(nodeId, tran);
}
if (blocking) {
this.logger.debug(
`Bucket was full and blocking was true, garbage collecting old nodes to add ${nodesUtils.encodeNodeId(
nodeId,
)}`,
);
await this.garbageCollectOldNode(
bucketIndex,
nodeId,
nodeAddress,
timeout,
);
} else {
this.logger.debug(
`Bucket was full and blocking was false, adding ${nodesUtils.encodeNodeId(
nodeId,
)} to queue`,
);
// Re-attempt this later asynchronously by adding the the queue
this.queueSetNode(nodeId, nodeAddress, timeout);
}
// Check if we now have room and add the new node
const count = await this.nodeGraph.getBucketMetaProp(
bucketIndex,
'count',
tran,
);
if (count < this.nodeGraph.nodeBucketLimit) {
await this.nodeGraph.setNode(nodeId, nodeAddress, tran);
}
}

private async garbageCollectOldNode(
bucketIndex: number,
nodeId: NodeId,
nodeAddress: NodeAddress,
timeout?: number,
) {
const oldestNodeIds = await this.nodeGraph.getOldestNode(bucketIndex, 3, tran);
// We want to concurrently ping the nodes
const pingPromises = oldestNodeIds.map((nodeId) => {
const doPing = async (): Promise<{
nodeId: NodeId;
success: boolean;
}> => {
// This needs to return nodeId and ping result
const data = await this.nodeGraph.getNode(nodeId);
if (data == null) return { nodeId, success: false };
const timer = timeout != null ? timerStart(timeout) : undefined;
const result = await this.pingNode(nodeId, nodeAddress, timer);
return { nodeId, success: result };
};
return doPing();
});
const pingResults = await Promise.all(pingPromises);
for (const { nodeId, success } of pingResults) {
if (success) {
// Ping succeeded, update the node
this.logger.debug(
`Ping succeeded for ${nodesUtils.encodeNodeId(nodeId)}`,
);
const node = (await this.nodeGraph.getNode(nodeId))!;
await this.nodeGraph.setNode(nodeId, node.address);
} else {
this.logger.debug(`Ping failed for ${nodesUtils.encodeNodeId(nodeId)}`);
// Otherwise we remove the node
await this.nodeGraph.unsetNode(nodeId);
}
}
// Check if we now have room and add the new node
const count = await this.nodeGraph.getBucketMetaProp(bucketIndex, 'count');
if (count < this.nodeGraph.nodeBucketLimit) {
this.logger.debug(`Bucket ${bucketIndex} now has room, adding new node`);
await this.nodeGraph.setNode(nodeId, nodeAddress);
}
}

/**
Expand All @@ -473,6 +523,92 @@ class NodeManager {
throw Error('fixme');
// Return await this.nodeGraph.refreshBuckets(tran);
}

// SetNode queue

/**
* This adds a setNode operation to the queue
*/
private queueSetNode(
nodeId: NodeId,
nodeAddress: NodeAddress,
timeout?: number,
): void {
this.logger.debug(`Adding ${nodesUtils.encodeNodeId(nodeId)} to queue`);
this.setNodeQueue.push({
nodeId,
nodeAddress,
timeout,
});
this.unplugQueue();
}

/**
* This starts the process of digesting the queue
*/
private async startSetNodeQueue(): Promise<void> {
this.logger.debug('Starting setNodeQueue');
this.plugQueue();
// While queue hasn't ended
while (true) {
// Wait for queue to be unplugged
await this.setNodeQueuePlug;
if (this.endQueue) break;
const job = this.setNodeQueue.shift();
if (job == null) {
// If the queue is empty then we pause the queue
this.plugQueue();
continue;
}
// Process the job
this.logger.debug(
`SetNodeQueue processing job for: ${nodesUtils.encodeNodeId(
job.nodeId,
)}`,
);
await this.setNode(job.nodeId, job.nodeAddress, true, false, job.timeout);
}
this.logger.debug('SetNodeQueue has ended');
}

private async stopSetNodeQueue(): Promise<void> {
this.logger.debug('Stopping setNodeQueue');
// Tell the queue runner to end
this.endQueue = true;
this.unplugQueue();
// Wait for runner to finish it's current job
await this.setNodeQueueRunner;
}

private plugQueue(): void {
if (this.setNodeQueueUnplug == null) {
this.logger.debug('Plugging setNodeQueue');
// Pausing queue
this.setNodeQueuePlug = new Promise((resolve) => {
this.setNodeQueueUnplug = resolve;
});
// Signaling queue is empty
if (this.setNodeQueueDrained != null) this.setNodeQueueDrained();
}
}

private unplugQueue(): void {
if (this.setNodeQueueUnplug != null) {
this.logger.debug('Unplugging setNodeQueue');
// Starting queue
this.setNodeQueueUnplug();
this.setNodeQueueUnplug = undefined;
// Signalling queue is running
this.setNodeQueueEmpty = new Promise((resolve) => {
this.setNodeQueueDrained = resolve;
});
}
}

@ready(new nodesErrors.ErrorNodeManagerNotRunning())
public async queueDrained(): Promise<void> {
await this.setNodeQueueEmpty;
}
}

export default NodeManager;
Loading

0 comments on commit 4460779

Please sign in to comment.