diff --git a/src/nodes/NodeGraph.ts b/src/nodes/NodeGraph.ts index f1efe255ab..ceab0a7685 100644 --- a/src/nodes/NodeGraph.ts +++ b/src/nodes/NodeGraph.ts @@ -257,6 +257,12 @@ class NodeGraph { } } + /** + * Will add a node to the node graph and increment the bucket count. + * If the node already existed it will be updated. + * @param nodeId NodeId to add to the NodeGraph + * @param nodeAddress Address information to add + */ @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async setNode( nodeId: NodeId, @@ -269,56 +275,54 @@ class NodeGraph { bucketDomain, nodesUtils.bucketDbKey(nodeId), ); - // If this is a new entry, check the bucket limit - if (nodeData == null) { - const count = await this.getBucketMetaProp(bucketIndex, 'count'); - if (count < this.nodeBucketLimit) { - // Increment the bucket count - this.setBucketMetaProp(bucketIndex, 'count', count + 1); - } else { - // Remove the oldest entry in the bucket - const lastUpdatedBucketDb = await this.db.level( - bucketKey, - this.nodeGraphLastUpdatedDb, - ); - let oldestLastUpdatedKey: Buffer; - let oldestNodeId: NodeId; - for await (const key of lastUpdatedBucketDb.createKeyStream({ - limit: 1, - })) { - oldestLastUpdatedKey = key as Buffer; - ({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey( - key as Buffer, - )); - } - await this.db.del(bucketDomain, oldestNodeId!.toBuffer()); - await this.db.del(lastUpdatedDomain, oldestLastUpdatedKey!); - } - } else { - // This is an existing entry, so the index entry must be reset + if (nodeData != null) { + // If the node already exists we want to remove the old `lastUpdated` const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey( nodeData.lastUpdated, nodeId, ); await this.db.del(lastUpdatedDomain, lastUpdatedKey); + } else { + // It didn't exist so we want to increment the bucket count + const count = await this.getBucketMetaProp(bucketIndex, 'count'); + await this.setBucketMetaProp(bucketIndex, 'count', count + 1); } const lastUpdated = getUnixtime(); await this.db.put(bucketDomain, nodesUtils.bucketDbKey(nodeId), { address: nodeAddress, lastUpdated, }); - const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey( + const newLastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey( lastUpdated, nodeId, ); await this.db.put( lastUpdatedDomain, - lastUpdatedKey, + newLastUpdatedKey, nodesUtils.bucketDbKey(nodeId), true, ); } + @ready(new nodesErrors.ErrorNodeGraphNotRunning()) + public async getOldestNode(bucketIndex: number): Promise { + const bucketKey = nodesUtils.bucketKey(bucketIndex); + // Remove the oldest entry in the bucket + const lastUpdatedBucketDb = await this.db.level( + bucketKey, + this.nodeGraphLastUpdatedDb, + ); + let oldestLastUpdatedKey: Buffer; + let oldestNodeId: NodeId | undefined; + for await (const key of lastUpdatedBucketDb.createKeyStream({ limit: 1 })) { + oldestLastUpdatedKey = key as Buffer; + ({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey( + key as Buffer, + )); + } + return oldestNodeId; + } + @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async unsetNode(nodeId: NodeId): Promise { const [bucketIndex, bucketKey] = this.bucketIndex(nodeId); @@ -330,7 +334,7 @@ class NodeGraph { ); if (nodeData != null) { const count = await this.getBucketMetaProp(bucketIndex, 'count'); - this.setBucketMetaProp(bucketIndex, 'count', count - 1); + await this.setBucketMetaProp(bucketIndex, 'count', count - 1); await this.db.del(bucketDomain, nodesUtils.bucketDbKey(nodeId)); const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey( nodeData.lastUpdated, @@ -790,7 +794,7 @@ class NodeGraph { * The bucket key is the string encoded version of bucket index * that preserves lexicographic order */ - protected bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] { + public bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] { const nodeIdOwn = this.keyManager.getNodeId(); if (nodeId.equals(nodeIdOwn)) { throw new nodesErrors.ErrorNodeGraphSameNodeId(); diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index d0f3bc47b6..adcd422ac3 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -8,6 +8,7 @@ import type { ChainData, ChainDataEncoded } from '../sigchain/types'; import type { NodeId, NodeAddress, NodeBucket } from '../nodes/types'; import type { ClaimEncoded } from '../claims/types'; import Logger from '@matrixai/logger'; +import { getUnixtime } from '@/utils'; import * as nodesErrors from './errors'; import * as nodesUtils from './utils'; import { utils as validationUtils } from '../validation'; @@ -17,6 +18,7 @@ import * as networkErrors from '../network/errors'; import * as networkUtils from '../network/utils'; import * as sigchainUtils from '../sigchain/utils'; import * as claimsUtils from '../claims/utils'; +import { NodeData } from '../nodes/types'; class NodeManager { protected db: DB; @@ -53,6 +55,10 @@ class NodeManager { * Determines whether a node in the Polykey network is online. * @return true if online, false if offline */ + // FIXME: We shouldn't be trying to find the node just to ping it + // since we are usually pinging it during the find procedure anyway. + // I think we should be providing the address of what we're trying to ping, + // possibly make it an optional parameter? public async pingNode(targetNodeId: NodeId): Promise { const targetAddress: NodeAddress = await this.nodeConnectionManager.findNode(targetNodeId); @@ -326,13 +332,46 @@ class NodeManager { } /** - * Sets a node in the NodeGraph + * Adds a node to the node graph. + * Updates the node if the node already exists. + * */ public async setNode( nodeId: NodeId, nodeAddress: NodeAddress, + force = false, ): Promise { - return await this.nodeGraph.setNode(nodeId, nodeAddress); + // When adding a node we need to handle 3 cases + // 1. The node already exists. We need to update it's last updated field + // 2. The node doesn't exist and bucket has room. + // We need to add the node to the bucket + // 3. The node doesn't exist and the bucket is full. + // We need to ping the oldest node. If the ping succeeds we need to update + // the lastUpdated of the oldest node and drop the new one. If the ping + // fails we delete the old node and add in the new one. + const nodeData = await this.nodeGraph.getNode(nodeId); + // If this is a new entry, check the bucket limit + const [bucketIndex] = this.nodeGraph.bucketIndex(nodeId); + const count = await this.nodeGraph.getBucketMetaProp(bucketIndex, 'count'); + if (nodeData != null || count < this.nodeGraph.nodeBucketLimit) { + // Either already exists or has room in the bucket + // We want to add or update the node + await this.nodeGraph.setNode(nodeId, nodeAddress); + } else { + // We want to add a node but the bucket is full + // We need to ping the oldest node + const oldestNodeId = (await this.nodeGraph.getOldestNode(bucketIndex))!; + if ((await this.pingNode(oldestNodeId)) && !force) { + // The node responded, we need to update it's info and drop the new node + const oldestNode = (await this.nodeGraph.getNode(oldestNodeId))!; + await this.nodeGraph.setNode(oldestNodeId, oldestNode.address); + } else { + // The node could not be contacted or force was set, + // we drop it in favor of the new node + await this.nodeGraph.unsetNode(oldestNodeId); + await this.nodeGraph.setNode(nodeId, nodeAddress); + } + } } // FIXME diff --git a/tests/nodes/NodeManager.test.ts b/tests/nodes/NodeManager.test.ts index f3de57cd8f..78a8f26b0a 100644 --- a/tests/nodes/NodeManager.test.ts +++ b/tests/nodes/NodeManager.test.ts @@ -18,6 +18,7 @@ import Sigchain from '@/sigchain/Sigchain'; import * as claimsUtils from '@/claims/utils'; import { promisify, sleep } from '@/utils'; import * as nodesUtils from '@/nodes/utils'; +import * as nodesTestUtils from './utils'; describe(`${NodeManager.name} test`, () => { const password = 'password'; @@ -425,4 +426,184 @@ describe(`${NodeManager.name} test`, () => { expect(chainData).toContain(nodesUtils.encodeNodeId(yNodeId)); }); }); + test('should add a node when bucket has room', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + ); + await nodeManager.setNode(nodeId, {} as NodeAddress); + + // Checking bucket + const bucket = await nodeManager.getBucket(bucketIndex); + expect(bucket).toHaveLength(1); + }); + test('should update a node if node exists', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + ); + await nodeManager.setNode(nodeId, { + host: '' as Host, + port: 11111 as Port, + }); + + const nodeData = (await nodeGraph.getNode(nodeId))!; + await sleep(1100); + + // Should update the node + await nodeManager.setNode(nodeId, { + host: '' as Host, + port: 22222 as Port, + }); + + const newNodeData = (await nodeGraph.getNode(nodeId))!; + expect(newNodeData.address.port).not.toEqual(nodeData.address.port); + expect(newNodeData.lastUpdated).not.toEqual(nodeData.lastUpdated); + }); + test('should not add node if bucket is full and old node is alive', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + // Creating 20 nodes in bucket + for (let i = 1; i <= 20; i++) { + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + i, + ); + await nodeManager.setNode(nodeId, { port: i } as NodeAddress); + } + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + ); + // Mocking ping + const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode'); + nodeManagerPingMock.mockResolvedValue(true); + const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex); + const oldestNode = await nodeGraph.getNode(oldestNodeId!); + // Waiting for a second to tick over + await sleep(1100); + // Adding a new node with bucket full + await nodeManager.setNode(nodeId, { port: 55555 } as NodeAddress); + // Bucket still contains max nodes + const bucket = await nodeManager.getBucket(bucketIndex); + expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit); + // New node was not added + const node = await nodeGraph.getNode(nodeId); + expect(node).toBeUndefined(); + // Oldest node was updated + const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!); + expect(oldestNodeNew!.lastUpdated).not.toEqual(oldestNode!.lastUpdated); + nodeManagerPingMock.mockRestore(); + }); + test('should add node if bucket is full, old node is alive and force is set', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + // Creating 20 nodes in bucket + for (let i = 1; i <= 20; i++) { + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + i, + ); + await nodeManager.setNode(nodeId, { port: i } as NodeAddress); + } + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + ); + // Mocking ping + const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode'); + nodeManagerPingMock.mockResolvedValue(true); + const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex); + // Adding a new node with bucket full + await nodeManager.setNode(nodeId, { port: 55555 } as NodeAddress, true); + // Bucket still contains max nodes + const bucket = await nodeManager.getBucket(bucketIndex); + expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit); + // New node was added + const node = await nodeGraph.getNode(nodeId); + expect(node).toBeDefined(); + // Oldest node was removed + const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!); + expect(oldestNodeNew).toBeUndefined(); + nodeManagerPingMock.mockRestore(); + }); + test('should add node if bucket is full and old node is dead', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + // Creating 20 nodes in bucket + for (let i = 1; i <= 20; i++) { + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + i, + ); + await nodeManager.setNode(nodeId, { port: i } as NodeAddress); + } + const nodeId = nodesTestUtils.generateNodeIdForBucket( + localNodeId, + bucketIndex, + ); + // Mocking ping + const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode'); + nodeManagerPingMock.mockResolvedValue(false); + const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex); + // Adding a new node with bucket full + await nodeManager.setNode(nodeId, { port: 55555 } as NodeAddress, true); + // Bucket still contains max nodes + const bucket = await nodeManager.getBucket(bucketIndex); + expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit); + // New node was added + const node = await nodeGraph.getNode(nodeId); + expect(node).toBeDefined(); + // Oldest node was removed + const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!); + expect(oldestNodeNew).toBeUndefined(); + nodeManagerPingMock.mockRestore(); + }); });