Skip to content

Commit

Permalink
feat: added NodeGraph.getClosestNodes()
Browse files Browse the repository at this point in the history
Implemented `getClosestNodes()` and relevant tests in `NodeGraph.test.ts`.

Relates to #212
  • Loading branch information
tegefaulkes committed Jun 1, 2022
1 parent c6d01f9 commit e93e9a1
Show file tree
Hide file tree
Showing 5 changed files with 459 additions and 185 deletions.
20 changes: 7 additions & 13 deletions src/agent/service/nodesClosestLocalNodesGet.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type * as grpc from '@grpc/grpc-js';
import type { NodeConnectionManager } from '../../nodes';
import type { NodeGraph } from '../../nodes';
import type { NodeId } from '../../nodes/types';
import { utils as grpcUtils } from '../../grpc';
import { utils as nodesUtils } from '../../nodes';
Expand All @@ -11,11 +11,7 @@ import * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb';
* Retrieves the local nodes (i.e. from the current node) that are closest
* to some provided node ID.
*/
function nodesClosestLocalNodesGet({
nodeConnectionManager,
}: {
nodeConnectionManager: NodeConnectionManager;
}) {
function nodesClosestLocalNodesGet({ nodeGraph }: { nodeGraph: NodeGraph }) {
return async (
call: grpc.ServerUnaryCall<nodesPB.Node, nodesPB.NodeTable>,
callback: grpc.sendUnaryData<nodesPB.NodeTable>,
Expand All @@ -38,17 +34,15 @@ function nodesClosestLocalNodesGet({
},
);
// Get all local nodes that are closest to the target node from the request
const closestNodes = await nodeConnectionManager.getClosestLocalNodes(
nodeId,
);
for (const node of closestNodes) {
const closestNodes = await nodeGraph.getClosestNodes(nodeId);
for (const [nodeId, nodeData] of closestNodes) {
const addressMessage = new nodesPB.Address();
addressMessage.setHost(node.address.host);
addressMessage.setPort(node.address.port);
addressMessage.setHost(nodeData.address.host);
addressMessage.setPort(nodeData.address.port);
// Add the node to the response's map (mapping of node ID -> node address)
response
.getNodeTableMap()
.set(nodesUtils.encodeNodeId(node.id), addressMessage);
.set(nodesUtils.encodeNodeId(nodeId), addressMessage);
}
callback(null, response);
return;
Expand Down
54 changes: 7 additions & 47 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -437,47 +437,6 @@ class NodeConnectionManager {
return address;
}

/**
* Finds the set of nodes (of size k) known by the current node (i.e. in its
* buckets database) that have the smallest distance to the target node (i.e.
* are closest to the target node).
* i.e. FIND_NODE RPC from Kademlia spec
*
* Used by the RPC service.
*
* @param targetNodeId the node ID to find other nodes closest to it
* @param numClosest the number of closest nodes to return (by default, returns
* according to the maximum number of nodes per bucket)
* @returns a mapping containing exactly k nodeIds -> nodeAddresses (unless the
* current node has less than k nodes in all of its buckets, in which case it
* returns all nodes it has knowledge of)
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async getClosestLocalNodes(
targetNodeId: NodeId,
numClosest: number = this.nodeGraph.maxNodesPerBucket,
): Promise<Array<NodeData>> {
// Retrieve all nodes from buckets in database
const buckets = await this.nodeGraph.getAllBuckets();
// Iterate over all of the nodes in each bucket
const distanceToNodes: Array<NodeData> = [];
buckets.forEach(function (bucket) {
for (const nodeIdString of Object.keys(bucket)) {
// Compute the distance from the node, and add it to the array
const nodeId = IdInternal.fromString<NodeId>(nodeIdString);
distanceToNodes.push({
id: nodeId,
address: bucket[nodeId].address,
distance: nodesUtils.calculateDistance(nodeId, targetNodeId),
});
}
});
// Sort the array (based on the distance at index 1)
distanceToNodes.sort(nodesUtils.sortByDistance);
// Return the closest k nodes (i.e. the first k), or all nodes if < k in array
return distanceToNodes.slice(0, numClosest);
}

/**
* Attempts to locate a target node in the network (using Kademlia).
* Adds all discovered, active nodes to the current node's database (up to k
Expand All @@ -499,7 +458,7 @@ class NodeConnectionManager {
// Let foundTarget: boolean = false;
let foundAddress: NodeAddress | undefined = undefined;
// Get the closest alpha nodes to the target node (set as shortlist)
const shortlist: Array<NodeData> = await this.getClosestLocalNodes(
const shortlist = await this.nodeGraph.getClosestNodes(
targetNodeId,
this.initialClosestNodes,
);
Expand All @@ -522,25 +481,26 @@ class NodeConnectionManager {
if (nextNode == null) {
break;
}
const [nextNodeId, nextNodeAddress] = nextNode;
// Skip if the node has already been contacted
if (contacted[nextNode.id]) {
if (contacted[nextNodeId]) {
continue;
}
// Connect to the node (check if pre-existing connection exists, otherwise
// create a new one)
try {
// Add the node to the database so that we can find its address in
// call to getConnectionToNode
await this.nodeGraph.setNode(nextNode.id, nextNode.address);
await this.getConnection(nextNode.id);
await this.nodeGraph.setNode(nextNodeId, nextNodeAddress.address);
await this.getConnection(nextNodeId);
} catch (e) {
// If we can't connect to the node, then skip it
continue;
}
contacted[nextNode.id] = true;
contacted[nextNodeId] = true;
// Ask the node to get their own closest nodes to the target
const foundClosest = await this.getRemoteNodeClosestNodes(
nextNode.id,
nextNodeId,
targetNodeId,
);
// Check to see if any of these are the target node. At the same time, add
Expand Down
105 changes: 105 additions & 0 deletions src/nodes/NodeGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,111 @@ class NodeGraph {
return value;
}

/**
* Finds the set of nodes (of size k) known by the current node (i.e. in its
* buckets database) that have the smallest distance to the target node (i.e.
* are closest to the target node).
* i.e. FIND_NODE RPC from Kademlia spec
*
* Used by the RPC service.
*
* @param nodeId the node ID to find other nodes closest to it
* @param limit the number of closest nodes to return (by default, returns
* according to the maximum number of nodes per bucket)
* @returns a mapping containing exactly k nodeIds -> nodeAddresses (unless the
* current node has less than k nodes in all of its buckets, in which case it
* returns all nodes it has knowledge of)
*/
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async getClosestNodes(
nodeId: NodeId,
limit: number = this.nodeBucketLimit,
): Promise<NodeBucket> {
// Buckets map to the target node in the following way;
// 1. 0, 1, ..., T-1 -> T
// 2. T -> 0, 1, ..., T-1
// 3. T+1, T+2, ..., 255 are unchanged
// We need to obtain nodes in the following bucket order
// 1. T
// 2. iterate over 0 ---> T-1
// 3. iterate over T+1 ---> K
// Need to work out the relevant bucket to start from
const startingBucket = nodesUtils.bucketIndex(
this.keyManager.getNodeId(),
nodeId,
);
// Getting the whole target's bucket first
const nodeIds: NodeBucket = await this.getBucket(startingBucket);
// We need to iterate over the key stream
// When streaming we want all nodes in the starting bucket
// The keys takes the form `!(lexpack bucketId)!(nodeId)`
// We can just use `!(lexpack bucketId)` to start from
// Less than `!(bucketId 101)!` gets us buckets 100 and lower
// greater than `!(bucketId 99)!` gets up buckets 100 and greater
const prefix = Buffer.from([33]); // Code for `!` prefix
if (nodeIds.length < limit) {
// Just before target bucket
const bucketId = Buffer.from(nodesUtils.bucketKey(startingBucket));
const endKeyLower = Buffer.concat([prefix, bucketId, prefix]);
const remainingLimit = limit - nodeIds.length;
// Iterate over lower buckets
for await (const o of this.nodeGraphBucketsDb.createReadStream({
lt: endKeyLower,
limit: remainingLimit,
})) {
const element = o as any as { key: Buffer; value: Buffer };
const info = nodesUtils.parseBucketsDbKey(element.key);
const nodeData = await this.db.deserializeDecrypt<NodeData>(
element.value,
false,
);
nodeIds.push([info.nodeId, nodeData]);
}
}
if (nodeIds.length < limit) {
// Just after target bucket
const bucketId = Buffer.from(nodesUtils.bucketKey(startingBucket + 1));
const startKeyUpper = Buffer.concat([prefix, bucketId, prefix]);
const remainingLimit = limit - nodeIds.length;
// Iterate over ids further away
for await (const o of this.nodeGraphBucketsDb.createReadStream({
gt: startKeyUpper,
limit: remainingLimit,
})) {
const element = o as any as { key: Buffer; value: Buffer };
const info = nodesUtils.parseBucketsDbKey(element.key);
const nodeData = await this.db.deserializeDecrypt<NodeData>(
element.value,
false,
);
nodeIds.push([info.nodeId, nodeData]);
}
}
// If no nodes were found, return nothing
if (nodeIds.length === 0) return [];
// Need to get the whole of the last bucket
const lastBucketIndex = nodesUtils.bucketIndex(
this.keyManager.getNodeId(),
nodeIds[nodeIds.length - 1][0],
);
const lastBucket = await this.getBucket(lastBucketIndex);
// Pop off elements of the same bucket to avoid duplicates
let element = nodeIds.pop();
while (
element != null &&
nodesUtils.bucketIndex(this.keyManager.getNodeId(), element[0]) ===
lastBucketIndex
) {
element = nodeIds.pop();
}
if (element != null) nodeIds.push(element);
// Adding last bucket to the list
nodeIds.push(...lastBucket);

nodesUtils.bucketSortByDistance(nodeIds, nodeId, 'asc');
return nodeIds.slice(0, limit);
}

/**
* Sets a bucket meta property
* This is protected because users cannot directly manipulate bucket meta
Expand Down
124 changes: 0 additions & 124 deletions tests/nodes/NodeConnectionManager.general.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ describe(`${NodeConnectionManager.name} general test`, () => {
let keyManager: KeyManager;
let db: DB;
let proxy: Proxy;

let nodeGraph: NodeGraph;

let remoteNode1: PolykeyAgent;
Expand Down Expand Up @@ -336,129 +335,6 @@ describe(`${NodeConnectionManager.name} general test`, () => {
},
global.failedConnectionTimeout * 2,
);
test('finds a single closest node', async () => {
// NodeConnectionManager under test
const nodeConnectionManager = new NodeConnectionManager({
keyManager,
nodeGraph,
proxy,
logger: nodeConnectionManagerLogger,
});
await nodeConnectionManager.start();
try {
// New node added
const newNode2Id = nodeId1;
const newNode2Address = { host: '227.1.1.1', port: 4567 } as NodeAddress;
await nodeGraph.setNode(newNode2Id, newNode2Address);

// Find the closest nodes to some node, NODEID3
const closest = await nodeConnectionManager.getClosestLocalNodes(nodeId3);
expect(closest).toContainEqual({
id: newNode2Id,
distance: 121n,
address: { host: '227.1.1.1', port: 4567 },
});
} finally {
await nodeConnectionManager.stop();
}
});
test('finds 3 closest nodes', async () => {
const nodeConnectionManager = new NodeConnectionManager({
keyManager,
nodeGraph,
proxy,
logger: nodeConnectionManagerLogger,
});
await nodeConnectionManager.start();
try {
// Add 3 nodes
await nodeGraph.setNode(nodeId1, {
host: '2.2.2.2',
port: 2222,
} as NodeAddress);
await nodeGraph.setNode(nodeId2, {
host: '3.3.3.3',
port: 3333,
} as NodeAddress);
await nodeGraph.setNode(nodeId3, {
host: '4.4.4.4',
port: 4444,
} as NodeAddress);

// Find the closest nodes to some node, NODEID4
const closest = await nodeConnectionManager.getClosestLocalNodes(nodeId3);
expect(closest.length).toBe(5);
expect(closest).toContainEqual({
id: nodeId3,
distance: 0n,
address: { host: '4.4.4.4', port: 4444 },
});
expect(closest).toContainEqual({
id: nodeId2,
distance: 116n,
address: { host: '3.3.3.3', port: 3333 },
});
expect(closest).toContainEqual({
id: nodeId1,
distance: 121n,
address: { host: '2.2.2.2', port: 2222 },
});
} finally {
await nodeConnectionManager.stop();
}
});
test('finds the 20 closest nodes', async () => {
const nodeConnectionManager = new NodeConnectionManager({
keyManager,
nodeGraph,
proxy,
logger: nodeConnectionManagerLogger,
});
await nodeConnectionManager.start();
try {
// Generate the node ID to find the closest nodes to (in bucket 100)
const nodeId = keyManager.getNodeId();
const nodeIdToFind = testNodesUtils.generateNodeIdForBucket(nodeId, 100);
// Now generate and add 20 nodes that will be close to this node ID
const addedClosestNodes: NodeData[] = [];
for (let i = 1; i < 101; i += 5) {
const closeNodeId = testNodesUtils.generateNodeIdForBucket(
nodeIdToFind,
i,
);
const nodeAddress = {
host: (i + '.' + i + '.' + i + '.' + i) as Host,
port: i as Port,
};
await nodeGraph.setNode(closeNodeId, nodeAddress);
addedClosestNodes.push({
id: closeNodeId,
address: nodeAddress,
distance: nodesUtils.calculateDistance(nodeIdToFind, closeNodeId),
});
}
// Now create and add 10 more nodes that are far away from this node
for (let i = 1; i <= 10; i++) {
const farNodeId = nodeIdGenerator(i);
const nodeAddress = {
host: `${i}.${i}.${i}.${i}` as Host,
port: i as Port,
};
await nodeGraph.setNode(farNodeId, nodeAddress);
}

// Find the closest nodes to the original generated node ID
const closest = await nodeConnectionManager.getClosestLocalNodes(
nodeIdToFind,
);
// We should always only receive k nodes
expect(closest.length).toBe(nodeGraph.maxNodesPerBucket);
// Retrieved closest nodes should be exactly the same as the ones we added
expect(closest).toEqual(addedClosestNodes);
} finally {
await nodeConnectionManager.stop();
}
});
test('receives 20 closest local nodes from connected target', async () => {
let serverPKAgent: PolykeyAgent | undefined;
let nodeConnectionManager: NodeConnectionManager | undefined;
Expand Down
Loading

0 comments on commit e93e9a1

Please sign in to comment.