diff --git a/src/nodes/NodeGraph.ts b/src/nodes/NodeGraph.ts index 717eb39d95..601af91a75 100644 --- a/src/nodes/NodeGraph.ts +++ b/src/nodes/NodeGraph.ts @@ -245,7 +245,12 @@ class NodeGraph { } } - + /** + * Will add a node to the node graph and increment the bucket count. + * If the node already existed it will be updated. + * @param nodeId NodeId to add to the NodeGraph + * @param nodeAddress Address information to add + */ @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async setNode( nodeId: NodeId, @@ -258,34 +263,17 @@ class NodeGraph { bucketDomain, nodesUtils.bucketDbKey(nodeId), ); - // If this is a new entry, check the bucket limit - if (nodeData == null) { - const count = await this.getBucketMetaProp(bucketIndex, 'count'); - if (count < this.nodeBucketLimit) { - // Increment the bucket count - this.setBucketMetaProp(bucketIndex, 'count', count + 1); - } else { - // Remove the oldest entry in the bucket - const lastUpdatedBucketDb = await this.db.level( - bucketKey, - this.nodeGraphLastUpdatedDb - ); - let oldestLastUpdatedKey: Buffer; - let oldestNodeId: NodeId; - for await (const key of lastUpdatedBucketDb.createKeyStream({ limit: 1 })) { - oldestLastUpdatedKey = key as Buffer; - ({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey(key as Buffer)); - } - await this.db.del(bucketDomain, oldestNodeId!.toBuffer()); - await this.db.del(lastUpdatedDomain, oldestLastUpdatedKey!); - } - } else { - // This is an existing entry, so the index entry must be reset + if(nodeData != null){ + // If the node already exists we want to remove the old `lastUpdated` const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(nodeData.lastUpdated, nodeId) await this.db.del( lastUpdatedDomain, lastUpdatedKey ); + } else { + // It didn't exist so we want to increment the bucket count + const count = await this.getBucketMetaProp(bucketIndex, 'count'); + await this.setBucketMetaProp(bucketIndex, 'count', count + 1); } const lastUpdated = getUnixtime(); await this.db.put( @@ -296,15 +284,32 @@ class NodeGraph { lastUpdated, } ); - const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(lastUpdated, nodeId) + const newLastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(lastUpdated, nodeId) await this.db.put( lastUpdatedDomain, - lastUpdatedKey, + newLastUpdatedKey, nodesUtils.bucketDbKey(nodeId), true ); } + @ready(new nodesErrors.ErrorNodeGraphNotRunning()) + public async getOldestNode(bucketIndex: number ): Promise { + const bucketKey = nodesUtils.bucketKey(bucketIndex); + // Remove the oldest entry in the bucket + const lastUpdatedBucketDb = await this.db.level( + bucketKey, + this.nodeGraphLastUpdatedDb + ); + let oldestLastUpdatedKey: Buffer; + let oldestNodeId: NodeId | undefined; + for await (const key of lastUpdatedBucketDb.createKeyStream({ limit: 1 })) { + oldestLastUpdatedKey = key as Buffer; + ({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey(key as Buffer)); + } + return oldestNodeId; + } + @ready(new nodesErrors.ErrorNodeGraphNotRunning()) public async unsetNode(nodeId: NodeId): Promise { const [bucketIndex, bucketKey] = this.bucketIndex(nodeId); @@ -316,7 +321,7 @@ class NodeGraph { ); if (nodeData != null) { const count = await this.getBucketMetaProp(bucketIndex, 'count'); - this.setBucketMetaProp(bucketIndex, 'count', count - 1); + await this.setBucketMetaProp(bucketIndex, 'count', count - 1); await this.db.del( bucketDomain, nodesUtils.bucketDbKey(nodeId) @@ -750,7 +755,7 @@ class NodeGraph { * The bucket key is the string encoded version of bucket index * that preserves lexicographic order */ - protected bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] { + public bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] { const nodeIdOwn = this.keyManager.getNodeId(); if (nodeId.equals(nodeIdOwn)) { throw new nodesErrors.ErrorNodeGraphSameNodeId(); diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index b283436676..10c5ef87e6 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -17,6 +17,8 @@ import * as networkErrors from '../network/errors'; import * as networkUtils from '../network/utils'; import * as sigchainUtils from '../sigchain/utils'; import * as claimsUtils from '../claims/utils'; +import { NodeData } from '../nodes/types'; +import { getUnixtime } from '@/utils'; class NodeManager { protected db: DB; @@ -53,6 +55,10 @@ class NodeManager { * Determines whether a node in the Polykey network is online. * @return true if online, false if offline */ + // FIXME: We shouldn't be trying to find the node just to ping it + // since we are usually pinging it during the find procedure anyway. + // I think we should be providing the address of what we're trying to ping, + // possibly make it an optional parameter? public async pingNode(targetNodeId: NodeId): Promise { const targetAddress: NodeAddress = await this.nodeConnectionManager.findNode(targetNodeId); @@ -326,13 +332,46 @@ class NodeManager { } /** - * Sets a node in the NodeGraph + * Adds a node to the node graph. + * Updates the node if the node already exists. + * */ public async setNode( nodeId: NodeId, nodeAddress: NodeAddress, + force = false, ): Promise { - return await this.nodeGraph.setNode(nodeId, nodeAddress); + // When adding a node we need to handle 3 cases + // 1. The node already exists. We need to update it's last updated field + // 2. The node doesn't exist and bucket has room. + // We need to add the node to the bucket + // 3. The node doesn't exist and the bucket is full. + // We need to ping the oldest node. If the ping succeeds we need to update + // the lastUpdated of the oldest node and drop the new one. If the ping + // fails we delete the old node and add in the new one. + const nodeData = await this.nodeGraph.getNode(nodeId); + // If this is a new entry, check the bucket limit + const [bucketIndex, ] = this.nodeGraph.bucketIndex(nodeId) + const count = await this.nodeGraph.getBucketMetaProp(bucketIndex, 'count'); + if (nodeData == null && count < this.nodeGraph.nodeBucketLimit) { + // We want to add a node but the bucket is full + // We need to ping the oldest node + const oldestNodeId = (await this.nodeGraph.getOldestNode(bucketIndex))!; + if (await this.pingNode(oldestNodeId) && !force){ + // The node responded, we need to update it's info and drop the new node + const oldestNode = (await this.nodeGraph.getNode(oldestNodeId))!; + await this.nodeGraph.setNode(oldestNodeId, oldestNode.address); + } else { + // The node could not be contacted or force was set, + // we drop it in favor of the new node + await this.nodeGraph.unsetNode(oldestNodeId); + await this.nodeGraph.setNode(nodeId, nodeAddress); + } + } else { + // Either already exists or has room in the bucket + // We want to add or update the node + await this.nodeGraph.setNode(nodeId, nodeAddress); + } } /** diff --git a/tests/nodes/NodeManager.test.ts b/tests/nodes/NodeManager.test.ts index a50fe96373..3cdfc47922 100644 --- a/tests/nodes/NodeManager.test.ts +++ b/tests/nodes/NodeManager.test.ts @@ -18,6 +18,7 @@ import Sigchain from '@/sigchain/Sigchain'; import * as claimsUtils from '@/claims/utils'; import { promisify, sleep } from '@/utils'; import * as nodesUtils from '@/nodes/utils'; +import * as nodesTestUtils from './utils'; describe(`${NodeManager.name} test`, () => { const password = 'password'; @@ -408,4 +409,156 @@ describe(`${NodeManager.name} test`, () => { expect(chainData).toContain(nodesUtils.encodeNodeId(yNodeId)); }); }); + test('should add a node when bucket has room', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex); + await nodeManager.setNode(nodeId, {} as NodeAddress); + + // checking bucket + const bucket = await nodeManager.getBucket(bucketIndex); + expect(bucket).toHaveLength(1); + }); + test('should update a node if node exists', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex); + await nodeManager.setNode(nodeId, { + host: '' as Host, + port: 11111 as Port + }); + + const nodeData = (await nodeGraph.getNode(nodeId))!; + await sleep(100); + + // should update the node + await nodeManager.setNode(nodeId, { + host: '' as Host, + port: 22222 as Port + }); + + const newNodeData = (await nodeGraph.getNode(nodeId))!; + expect(newNodeData.address.port).not.toEqual(nodeData.address.port) + expect(newNodeData.lastUpdated).not.toEqual(nodeData.lastUpdated) + + }); + test('should not add node if bucket is full and old node is alive', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + // creating 20 nodes in bucket + for (let i = 1; i <= 20; i++) { + const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex, i); + await nodeManager.setNode(nodeId, {port: i} as NodeAddress); + } + const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex); + //mocking ping + const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode'); + nodeManagerPingMock.mockResolvedValue(true); + const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex); + const oldestNode = await nodeGraph.getNode(oldestNodeId!); + // adding a new node with bucket full + await nodeManager.setNode(nodeId, {port: 55555} as NodeAddress); + // bucket still contains max nodes + const bucket = await nodeManager.getBucket(bucketIndex); + expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit); + // new node was not added + const node = await nodeGraph.getNode(nodeId); + expect(node).toBeUndefined(); + // oldest node was updated + const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!); + expect(oldestNodeNew!.lastUpdated).not.toEqual(oldestNode!.lastUpdated); + nodeManagerPingMock.mockRestore(); + }); + test('should add node if bucket is full, old node is alive and force is set', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + // creating 20 nodes in bucket + for (let i = 1; i <= 20; i++) { + const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex, i); + await nodeManager.setNode(nodeId, {port: i} as NodeAddress); + } + const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex); + //mocking ping + const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode'); + nodeManagerPingMock.mockResolvedValue(true); + const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex); + // adding a new node with bucket full + await nodeManager.setNode(nodeId, {port: 55555} as NodeAddress, true); + // bucket still contains max nodes + const bucket = await nodeManager.getBucket(bucketIndex); + expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit); + // new node was added + const node = await nodeGraph.getNode(nodeId); + expect(node).toBeDefined(); + // oldest node was removed + const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!); + expect(oldestNodeNew).toBeUndefined(); + nodeManagerPingMock.mockRestore(); + }); + test('should add node if bucket is full and old node is dead', async () => { + const nodeManager = new NodeManager({ + db, + sigchain: {} as Sigchain, + keyManager, + nodeGraph, + nodeConnectionManager: {} as NodeConnectionManager, + logger, + }); + const localNodeId = keyManager.getNodeId(); + const bucketIndex = 100; + // creating 20 nodes in bucket + for (let i = 1; i <= 20; i++) { + const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex, i); + await nodeManager.setNode(nodeId, {port: i} as NodeAddress); + } + const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex); + //mocking ping + const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode'); + nodeManagerPingMock.mockResolvedValue(false); + const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex); + // adding a new node with bucket full + await nodeManager.setNode(nodeId, {port: 55555} as NodeAddress, true); + // bucket still contains max nodes + const bucket = await nodeManager.getBucket(bucketIndex); + expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit); + // new node was added + const node = await nodeGraph.getNode(nodeId); + expect(node).toBeDefined(); + // oldest node was removed + const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!); + expect(oldestNodeNew).toBeUndefined(); + nodeManagerPingMock.mockRestore(); + }); });