diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index 72d3911af5..897bcc3ebb 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -215,14 +215,7 @@ class NodeConnectionManager { ): Promise { return await withF( [await this.acquireConnection(targetNodeId, timer)], - async ([conn]) => { - this.logger.info( - `withConnF calling function with connection to ${nodesUtils.encodeNodeId( - targetNodeId, - )}`, - ); - return await f(conn); - }, + async ([conn]) => await f(conn), ); } @@ -268,25 +261,12 @@ class NodeConnectionManager { targetNodeId: NodeId, timer?: Timer, ): Promise { - this.logger.info( - `Getting connection to ${nodesUtils.encodeNodeId(targetNodeId)}`, - ); const targetNodeIdString = targetNodeId.toString() as NodeIdString; return await this.connectionLocks.withF( [targetNodeIdString, RWLockWriter, 'write'], async () => { const connAndTimer = this.connections.get(targetNodeIdString); - if (connAndTimer != null) { - this.logger.info( - `existing entry found for ${nodesUtils.encodeNodeId(targetNodeId)}`, - ); - return connAndTimer; - } - this.logger.info( - `no existing entry, creating connection to ${nodesUtils.encodeNodeId( - targetNodeId, - )}`, - ); + if (connAndTimer != null) return connAndTimer; // Creating the connection and set in map const targetAddress = await this.findNode(targetNodeId); if (targetAddress == null) { @@ -556,6 +536,22 @@ class NodeConnectionManager { } }); } + // If the found nodes are less than nodeBucketLimit then + // we expect that refresh buckets won't find anything new + if (Object.keys(contacted).length < this.nodeGraph.nodeBucketLimit) { + // Reset the delay on all refresh bucket tasks + for ( + let bucketIndex = 0; + bucketIndex < this.nodeGraph.nodeIdBits; + bucketIndex++ + ) { + await this.nodeManager?.updateRefreshBucketDelay( + bucketIndex, + undefined, + true, + ); + } + } return foundAddress; } diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 4b209b45bb..fae495da33 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -38,6 +38,7 @@ class NodeManager { protected nodeGraph: NodeGraph; protected taskManager: TaskManager; protected refreshBucketDelay: number; + protected refreshBucketDelaySpread: number; public readonly setNodeHandlerId = 'NodeManager.setNodeHandler' as TaskHandlerId; public readonly refreshBucketHandlerId = @@ -50,8 +51,12 @@ class NodeManager { ) => { await this.refreshBucket(bucketIndex, { signal: context.signal }); // When completed reschedule the task + const spread = + (Math.random() - 0.5) * + this.refreshBucketDelay * + this.refreshBucketDelaySpread; await this.taskManager.scheduleTask({ - delay: this.refreshBucketDelay, + delay: this.refreshBucketDelay + spread, handlerId: this.refreshBucketHandlerId, lazy: true, parameters: [bucketIndex], @@ -81,6 +86,7 @@ class NodeManager { nodeGraph, taskManager, refreshBucketDelay = 3600000, // 1 hour in milliseconds + refreshBucketDelaySpread = 0.5, // Multiple of refreshBucketDelay to spread by logger, }: { db: DB; @@ -90,6 +96,7 @@ class NodeManager { nodeGraph: NodeGraph; taskManager: TaskManager; refreshBucketDelay?: number; + refreshBucketDelaySpread?: number; logger?: Logger; }) { this.logger = logger ?? new Logger(this.constructor.name); @@ -100,6 +107,11 @@ class NodeManager { this.nodeGraph = nodeGraph; this.taskManager = taskManager; this.refreshBucketDelay = refreshBucketDelay; + // Clamped from 0 to 1 inclusive + this.refreshBucketDelaySpread = Math.max( + 0, + Math.min(refreshBucketDelaySpread, 1), + ); } public async start() { @@ -639,7 +651,6 @@ class NodeManager { } this.logger.info('Setting up refreshBucket tasks'); - // 1. Iterate over existing tasks and reset the delay const existingTasks: Array = new Array(this.nodeGraph.nodeIdBits); for await (const task of this.taskManager.getTasks( @@ -654,30 +665,30 @@ class NodeManager { { // If it's scheduled then reset delay existingTasks[bucketIndex] = true; - this.logger.debug( - `Updating refreshBucket delay for bucket ${bucketIndex}`, - ); // Total delay is refreshBucketDelay + time since task creation + const spread = + (Math.random() - 0.5) * + this.refreshBucketDelay * + this.refreshBucketDelaySpread; const delay = performance.now() + performance.timeOrigin - task.created.getTime() + - this.refreshBucketDelay; + this.refreshBucketDelay + + spread; await this.taskManager.updateTask(task.id, { delay }, tran); } break; case 'queued': case 'active': // If it's running then leave it - this.logger.debug( - `RefreshBucket task for bucket ${bucketIndex} is already active, ignoring`, - ); existingTasks[bucketIndex] = true; break; default: - // Otherwise ignore it, should be re-created + // Otherwise, ignore it, should be re-created existingTasks[bucketIndex] = false; } + this.logger.info('Set up refreshBucket tasks'); } // 2. Recreate any missing tasks for buckets @@ -692,9 +703,13 @@ class NodeManager { this.logger.debug( `Creating refreshBucket task for bucket ${bucketIndex}`, ); + const spread = + (Math.random() - 0.5) * + this.refreshBucketDelay * + this.refreshBucketDelaySpread; await this.taskManager.scheduleTask({ handlerId: this.refreshBucketHandlerId, - delay: this.refreshBucketDelay, + delay: this.refreshBucketDelay + spread, lazy: true, parameters: [bucketIndex], path: ['refreshBucket', `${bucketIndex}`], @@ -718,6 +733,8 @@ class NodeManager { ); } + const spread = + (Math.random() - 0.5) * delay * this.refreshBucketDelaySpread; let foundTask: Task | undefined; let count = 0; for await (const task of this.taskManager.getTasks( @@ -738,7 +755,8 @@ class NodeManager { performance.now() + performance.timeOrigin - task.created.getTime() + - delay; + delay + + spread; await this.taskManager.updateTask(task.id, { delay: delayNew }, tran); this.logger.debug( `Updating refreshBucket task for bucket ${bucketIndex}`, @@ -757,7 +775,7 @@ class NodeManager { `No refreshBucket task for bucket ${bucketIndex}, new one was created`, ); foundTask = await this.taskManager.scheduleTask({ - delay: this.refreshBucketDelay, + delay: delay + spread, handlerId: this.refreshBucketHandlerId, lazy: true, parameters: [bucketIndex], diff --git a/tests/nodes/NodeConnectionManager.seednodes.test.ts b/tests/nodes/NodeConnectionManager.seednodes.test.ts index 24f499cb1a..b799645257 100644 --- a/tests/nodes/NodeConnectionManager.seednodes.test.ts +++ b/tests/nodes/NodeConnectionManager.seednodes.test.ts @@ -470,7 +470,6 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { ); mockedPingNode.mockImplementation(async () => true); try { - logger.setLevel(LogLevel.WARN); node1 = await PolykeyAgent.createPolykeyAgent({ nodePath: path.join(dataDir, 'node1'), password: 'password', @@ -533,7 +532,6 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { expect(node2Nodes).toContain(nodeId1); } finally { mockedPingNode.mockRestore(); - logger.setLevel(LogLevel.WARN); await node1?.stop(); await node1?.destroy(); await node2?.stop(); @@ -542,4 +540,70 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { }, globalThis.defaultTimeout * 2, ); + test( + 'refreshBucket delays should be reset after finding less than 20 nodes', + async () => { + // Using a single seed node we need to check that each entering node adds itself to the seed node. + // Also need to check that the new nodes can be seen in the network. + let node1: PolykeyAgent | undefined; + const seedNodes: SeedNodes = {}; + seedNodes[nodesUtils.encodeNodeId(remoteNodeId1)] = { + host: remoteNode1.proxy.getProxyHost(), + port: remoteNode1.proxy.getProxyPort(), + }; + seedNodes[nodesUtils.encodeNodeId(remoteNodeId2)] = { + host: remoteNode2.proxy.getProxyHost(), + port: remoteNode2.proxy.getProxyPort(), + }; + const mockedPingNode = jest.spyOn( + NodeConnectionManager.prototype, + 'pingNode', + ); + mockedPingNode.mockImplementation(async () => true); + try { + node1 = await PolykeyAgent.createPolykeyAgent({ + nodePath: path.join(dataDir, 'node1'), + password: 'password', + networkConfig: { + proxyHost: localHost, + agentHost: localHost, + clientHost: localHost, + forwardHost: localHost, + }, + keysConfig: { + privateKeyPemOverride: globalRootKeyPems[3], + }, + seedNodes, + logger, + }); + + // Reset all the refresh bucket timers to a distinct time + for ( + let bucketIndex = 0; + bucketIndex < node1.nodeGraph.nodeIdBits; + bucketIndex++ + ) { + await node1.nodeManager.updateRefreshBucketDelay( + bucketIndex, + 10000, + true, + ); + } + + // Trigger a refreshBucket + await node1.nodeManager.refreshBucket(1); + + for await (const task of node1.taskManager.getTasks('asc', true, [ + 'refreshBucket', + ])) { + expect(task.delay).toBeGreaterThanOrEqual(50000); + } + } finally { + mockedPingNode.mockRestore(); + await node1?.stop(); + await node1?.destroy(); + } + }, + globalThis.defaultTimeout * 2, + ); }); diff --git a/tests/nodes/NodeManager.test.ts b/tests/nodes/NodeManager.test.ts index 8e81081a2a..35a90d6368 100644 --- a/tests/nodes/NodeManager.test.ts +++ b/tests/nodes/NodeManager.test.ts @@ -989,4 +989,57 @@ describe(`${NodeManager.name} test`, () => { await nodeManager.stop(); } }); + test('refreshBucket tasks should have spread delays', async () => { + const refreshBucketTimeout = 100000; + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: dummyNodeConnectionManager, + taskManager, + refreshBucketDelay: refreshBucketTimeout, + logger, + }); + const mockRefreshBucket = jest.spyOn( + NodeManager.prototype, + 'refreshBucket', + ); + try { + mockRefreshBucket.mockImplementation(async () => {}); + await taskManager.startProcessing(); + await nodeManager.start(); + await nodeConnectionManager.start({ nodeManager }); + // Getting starting value + const startingDelay = new Set(); + for await (const task of taskManager.getTasks('asc', true, [ + 'refreshBucket', + ])) { + startingDelay.add(task.delay); + } + expect(startingDelay.size).not.toBe(1); + // Updating delays should have spread + for ( + let bucketIndex = 0; + bucketIndex < nodeGraph.nodeIdBits; + bucketIndex++ + ) { + await nodeManager.updateRefreshBucketDelay( + bucketIndex, + undefined, + true, + ); + } + const updatedDelay = new Set(); + for await (const task of taskManager.getTasks('asc', true, [ + 'refreshBucket', + ])) { + updatedDelay.add(task.delay); + } + expect(updatedDelay.size).not.toBe(1); + } finally { + mockRefreshBucket.mockRestore(); + await nodeManager.stop(); + } + }); });