Skip to content

Commit

Permalink
fix: excessive connections from refreshBuckets
Browse files Browse the repository at this point in the history
Removing excessive logging for using connections. We don't need a 3 log messages for each time we use an existing connection.

Adding 'jitter' or spacing to the `refreshBuckets` delays so that they don't run all at once. This is implemented with a `refreshBucketDelaySpread` paramater that specifies the multiple of the delay to spread across. defaults to 0.5 for 50%

Adding a 'heuristic' to `refreshBucket` to prevent it from contacting the same nodes repeatably. Currently this is just a check in `getClosestGlobalNodes` where if we find less than `nodeBucketLimit` nodes we just reset the timer on all `refreshBucket` tasks.

Adding tests for checking the spread of `refreshBucket` delays. Another test for resetting the timer on `refreshBucket` tasks if a `findNode` finds less than 20 nodes.

#415
  • Loading branch information
tegefaulkes committed Sep 21, 2022
1 parent 5b92e59 commit 45aef18
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 37 deletions.
40 changes: 18 additions & 22 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,7 @@ class NodeConnectionManager {
): Promise<T> {
return await withF(
[await this.acquireConnection(targetNodeId, timer)],
async ([conn]) => {
this.logger.info(
`withConnF calling function with connection to ${nodesUtils.encodeNodeId(
targetNodeId,
)}`,
);
return await f(conn);
},
async ([conn]) => await f(conn),
);
}

Expand Down Expand Up @@ -268,25 +261,12 @@ class NodeConnectionManager {
targetNodeId: NodeId,
timer?: Timer,
): Promise<ConnectionAndTimer> {
this.logger.info(
`Getting connection to ${nodesUtils.encodeNodeId(targetNodeId)}`,
);
const targetNodeIdString = targetNodeId.toString() as NodeIdString;
return await this.connectionLocks.withF(
[targetNodeIdString, RWLockWriter, 'write'],
async () => {
const connAndTimer = this.connections.get(targetNodeIdString);
if (connAndTimer != null) {
this.logger.info(
`existing entry found for ${nodesUtils.encodeNodeId(targetNodeId)}`,
);
return connAndTimer;
}
this.logger.info(
`no existing entry, creating connection to ${nodesUtils.encodeNodeId(
targetNodeId,
)}`,
);
if (connAndTimer != null) return connAndTimer;
// Creating the connection and set in map
const targetAddress = await this.findNode(targetNodeId);
if (targetAddress == null) {
Expand Down Expand Up @@ -556,6 +536,22 @@ class NodeConnectionManager {
}
});
}
// If the found nodes are less than nodeBucketLimit then
// we expect that refresh buckets won't find anything new
if (Object.keys(contacted).length < this.nodeGraph.nodeBucketLimit) {
// Reset the delay on all refresh bucket tasks
for (
let bucketIndex = 0;
bucketIndex < this.nodeGraph.nodeIdBits;
bucketIndex++
) {
await this.nodeManager?.updateRefreshBucketDelay(
bucketIndex,
undefined,
true,
);
}
}
return foundAddress;
}

Expand Down
44 changes: 31 additions & 13 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class NodeManager {
protected nodeGraph: NodeGraph;
protected taskManager: TaskManager;
protected refreshBucketDelay: number;
protected refreshBucketDelaySpread: number;
public readonly setNodeHandlerId =
'NodeManager.setNodeHandler' as TaskHandlerId;
public readonly refreshBucketHandlerId =
Expand All @@ -50,8 +51,12 @@ class NodeManager {
) => {
await this.refreshBucket(bucketIndex, { signal: context.signal });
// When completed reschedule the task
const spread =
(Math.random() - 0.5) *
this.refreshBucketDelay *
this.refreshBucketDelaySpread;
await this.taskManager.scheduleTask({
delay: this.refreshBucketDelay,
delay: this.refreshBucketDelay + spread,
handlerId: this.refreshBucketHandlerId,
lazy: true,
parameters: [bucketIndex],
Expand Down Expand Up @@ -81,6 +86,7 @@ class NodeManager {
nodeGraph,
taskManager,
refreshBucketDelay = 3600000, // 1 hour in milliseconds
refreshBucketDelaySpread = 0.5, // Multiple of refreshBucketDelay to spread by
logger,
}: {
db: DB;
Expand All @@ -90,6 +96,7 @@ class NodeManager {
nodeGraph: NodeGraph;
taskManager: TaskManager;
refreshBucketDelay?: number;
refreshBucketDelaySpread?: number;
logger?: Logger;
}) {
this.logger = logger ?? new Logger(this.constructor.name);
Expand All @@ -100,6 +107,11 @@ class NodeManager {
this.nodeGraph = nodeGraph;
this.taskManager = taskManager;
this.refreshBucketDelay = refreshBucketDelay;
// Clamped from 0 to 1 inclusive
this.refreshBucketDelaySpread = Math.max(
0,
Math.min(refreshBucketDelaySpread, 1),
);
}

public async start() {
Expand Down Expand Up @@ -639,7 +651,6 @@ class NodeManager {
}

this.logger.info('Setting up refreshBucket tasks');

// 1. Iterate over existing tasks and reset the delay
const existingTasks: Array<boolean> = new Array(this.nodeGraph.nodeIdBits);
for await (const task of this.taskManager.getTasks(
Expand All @@ -654,30 +665,30 @@ class NodeManager {
{
// If it's scheduled then reset delay
existingTasks[bucketIndex] = true;
this.logger.debug(
`Updating refreshBucket delay for bucket ${bucketIndex}`,
);
// Total delay is refreshBucketDelay + time since task creation
const spread =
(Math.random() - 0.5) *
this.refreshBucketDelay *
this.refreshBucketDelaySpread;
const delay =
performance.now() +
performance.timeOrigin -
task.created.getTime() +
this.refreshBucketDelay;
this.refreshBucketDelay +
spread;
await this.taskManager.updateTask(task.id, { delay }, tran);
}
break;
case 'queued':
case 'active':
// If it's running then leave it
this.logger.debug(
`RefreshBucket task for bucket ${bucketIndex} is already active, ignoring`,
);
existingTasks[bucketIndex] = true;
break;
default:
// Otherwise ignore it, should be re-created
// Otherwise, ignore it, should be re-created
existingTasks[bucketIndex] = false;
}
this.logger.info('Set up refreshBucket tasks');
}

// 2. Recreate any missing tasks for buckets
Expand All @@ -692,9 +703,13 @@ class NodeManager {
this.logger.debug(
`Creating refreshBucket task for bucket ${bucketIndex}`,
);
const spread =
(Math.random() - 0.5) *
this.refreshBucketDelay *
this.refreshBucketDelaySpread;
await this.taskManager.scheduleTask({
handlerId: this.refreshBucketHandlerId,
delay: this.refreshBucketDelay,
delay: this.refreshBucketDelay + spread,
lazy: true,
parameters: [bucketIndex],
path: ['refreshBucket', `${bucketIndex}`],
Expand All @@ -718,6 +733,8 @@ class NodeManager {
);
}

const spread =
(Math.random() - 0.5) * delay * this.refreshBucketDelaySpread;
let foundTask: Task | undefined;
let count = 0;
for await (const task of this.taskManager.getTasks(
Expand All @@ -738,7 +755,8 @@ class NodeManager {
performance.now() +
performance.timeOrigin -
task.created.getTime() +
delay;
delay +
spread;
await this.taskManager.updateTask(task.id, { delay: delayNew }, tran);
this.logger.debug(
`Updating refreshBucket task for bucket ${bucketIndex}`,
Expand All @@ -757,7 +775,7 @@ class NodeManager {
`No refreshBucket task for bucket ${bucketIndex}, new one was created`,
);
foundTask = await this.taskManager.scheduleTask({
delay: this.refreshBucketDelay,
delay: delay + spread,
handlerId: this.refreshBucketHandlerId,
lazy: true,
parameters: [bucketIndex],
Expand Down
68 changes: 66 additions & 2 deletions tests/nodes/NodeConnectionManager.seednodes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,6 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => {
);
mockedPingNode.mockImplementation(async () => true);
try {
logger.setLevel(LogLevel.WARN);
node1 = await PolykeyAgent.createPolykeyAgent({
nodePath: path.join(dataDir, 'node1'),
password: 'password',
Expand Down Expand Up @@ -533,7 +532,6 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => {
expect(node2Nodes).toContain(nodeId1);
} finally {
mockedPingNode.mockRestore();
logger.setLevel(LogLevel.WARN);
await node1?.stop();
await node1?.destroy();
await node2?.stop();
Expand All @@ -542,4 +540,70 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => {
},
globalThis.defaultTimeout * 2,
);
test(
'refreshBucket delays should be reset after finding less than 20 nodes',
async () => {
// Using a single seed node we need to check that each entering node adds itself to the seed node.
// Also need to check that the new nodes can be seen in the network.
let node1: PolykeyAgent | undefined;
const seedNodes: SeedNodes = {};
seedNodes[nodesUtils.encodeNodeId(remoteNodeId1)] = {
host: remoteNode1.proxy.getProxyHost(),
port: remoteNode1.proxy.getProxyPort(),
};
seedNodes[nodesUtils.encodeNodeId(remoteNodeId2)] = {
host: remoteNode2.proxy.getProxyHost(),
port: remoteNode2.proxy.getProxyPort(),
};
const mockedPingNode = jest.spyOn(
NodeConnectionManager.prototype,
'pingNode',
);
mockedPingNode.mockImplementation(async () => true);
try {
node1 = await PolykeyAgent.createPolykeyAgent({
nodePath: path.join(dataDir, 'node1'),
password: 'password',
networkConfig: {
proxyHost: localHost,
agentHost: localHost,
clientHost: localHost,
forwardHost: localHost,
},
keysConfig: {
privateKeyPemOverride: globalRootKeyPems[3],
},
seedNodes,
logger,
});

// Reset all the refresh bucket timers to a distinct time
for (
let bucketIndex = 0;
bucketIndex < node1.nodeGraph.nodeIdBits;
bucketIndex++
) {
await node1.nodeManager.updateRefreshBucketDelay(
bucketIndex,
10000,
true,
);
}

// Trigger a refreshBucket
await node1.nodeManager.refreshBucket(1);

for await (const task of node1.taskManager.getTasks('asc', true, [
'refreshBucket',
])) {
expect(task.delay).toBeGreaterThanOrEqual(50000);
}
} finally {
mockedPingNode.mockRestore();
await node1?.stop();
await node1?.destroy();
}
},
globalThis.defaultTimeout * 2,
);
});
53 changes: 53 additions & 0 deletions tests/nodes/NodeManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -989,4 +989,57 @@ describe(`${NodeManager.name} test`, () => {
await nodeManager.stop();
}
});
test('refreshBucket tasks should have spread delays', async () => {
const refreshBucketTimeout = 100000;
const nodeManager = new NodeManager({
db,
sigchain: {} as Sigchain,
keyManager,
nodeGraph,
nodeConnectionManager: dummyNodeConnectionManager,
taskManager,
refreshBucketDelay: refreshBucketTimeout,
logger,
});
const mockRefreshBucket = jest.spyOn(
NodeManager.prototype,
'refreshBucket',
);
try {
mockRefreshBucket.mockImplementation(async () => {});
await taskManager.startProcessing();
await nodeManager.start();
await nodeConnectionManager.start({ nodeManager });
// Getting starting value
const startingDelay = new Set<number>();
for await (const task of taskManager.getTasks('asc', true, [
'refreshBucket',
])) {
startingDelay.add(task.delay);
}
expect(startingDelay.size).not.toBe(1);
// Updating delays should have spread
for (
let bucketIndex = 0;
bucketIndex < nodeGraph.nodeIdBits;
bucketIndex++
) {
await nodeManager.updateRefreshBucketDelay(
bucketIndex,
undefined,
true,
);
}
const updatedDelay = new Set<number>();
for await (const task of taskManager.getTasks('asc', true, [
'refreshBucket',
])) {
updatedDelay.add(task.delay);
}
expect(updatedDelay.size).not.toBe(1);
} finally {
mockRefreshBucket.mockRestore();
await nodeManager.stop();
}
});
});

0 comments on commit 45aef18

Please sign in to comment.