Skip to content

Commit

Permalink
feat: added NodeGraph.getClosestNodes()
Browse files Browse the repository at this point in the history
Implemented `getClosestNodes()` and relevant tests in `NodeGraph.test.ts`.

Relates to #212
  • Loading branch information
tegefaulkes committed Jun 7, 2022
1 parent fe18226 commit 072b8ca
Show file tree
Hide file tree
Showing 5 changed files with 489 additions and 190 deletions.
21 changes: 8 additions & 13 deletions src/agent/service/nodesClosestLocalNodesGet.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type * as grpc from '@grpc/grpc-js';
import type { NodeGraph } from '../../nodes';
import type { DB } from '@matrixai/db';
import type NodeConnectionManager from '../../nodes/NodeConnectionManager';
import type { NodeId } from '../../nodes/types';
import type Logger from '@matrixai/logger';
import * as grpcUtils from '../../grpc/utils';
Expand All @@ -16,11 +16,11 @@ import * as agentUtils from '../utils';
* to some provided node ID.
*/
function nodesClosestLocalNodesGet({
nodeConnectionManager,
nodeGraph,
db,
logger,
}: {
nodeConnectionManager: NodeConnectionManager;
nodeGraph: NodeGraph;
db: DB;
logger: Logger;
}) {
Expand All @@ -47,21 +47,16 @@ function nodesClosestLocalNodesGet({
);
// Get all local nodes that are closest to the target node from the request
const closestNodes = await db.withTransactionF(
async (tran) =>
await nodeConnectionManager.getClosestLocalNodes(
nodeId,
undefined,
tran,
),
async (tran) => await nodeGraph.getClosestNodes(nodeId, tran),
);
for (const node of closestNodes) {
for (const [nodeId, nodeData] of closestNodes) {
const addressMessage = new nodesPB.Address();
addressMessage.setHost(node.address.host);
addressMessage.setPort(node.address.port);
addressMessage.setHost(nodeData.address.host);
addressMessage.setPort(nodeData.address.port);
// Add the node to the response's map (mapping of node ID -> node address)
response
.getNodeTableMap()
.set(nodesUtils.encodeNodeId(node.id), addressMessage);
.set(nodesUtils.encodeNodeId(nodeId), addressMessage);
}
callback(null, response);
return;
Expand Down
57 changes: 8 additions & 49 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -383,49 +383,7 @@ class NodeConnectionManager {
return address;
}

/**
* Finds the set of nodes (of size k) known by the current node (i.e. in its
* bucket's database) that have the smallest distance to the target node (i.e.
* are closest to the target node).
* i.e. FIND_NODE RPC from Kademlia spec
*
* Used by the RPC service.
*
* @param targetNodeId the node ID to find other nodes closest to it
* @param numClosest the number of the closest nodes to return (by default, returns
* according to the maximum number of nodes per bucket)
* @param tran
* @returns a mapping containing exactly k nodeIds -> nodeAddresses (unless the
* current node has less than k nodes in all of its buckets, in which case it
* returns all nodes it has knowledge of)
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async getClosestLocalNodes(
targetNodeId: NodeId,
numClosest: number = this.nodeGraph.maxNodesPerBucket,
tran?: DBTransaction,
): Promise<Array<NodeData>> {
// Retrieve all nodes from buckets in database
const buckets = await this.nodeGraph.getAllBuckets(tran);
// Iterate over all the nodes in each bucket
const distanceToNodes: Array<NodeData> = [];
buckets.forEach(function (bucket) {
for (const nodeIdString of Object.keys(bucket)) {
// Compute the distance from the node, and add it to the array
const nodeId = IdInternal.fromString<NodeId>(nodeIdString);
distanceToNodes.push({
id: nodeId,
address: bucket[nodeId].address,
distance: nodesUtils.calculateDistance(nodeId, targetNodeId),
});
}
});
// Sort the array (based on the distance at index 1)
distanceToNodes.sort(nodesUtils.sortByDistance);
// Return the closest k nodes (i.e. the first k), or all nodes if < k in array
return distanceToNodes.slice(0, numClosest);
}

// FIXME: getClosestNodes was moved to NodeGraph? that needs to be updated.
/**
* Attempts to locate a target node in the network (using Kademlia).
* Adds all discovered, active nodes to the current node's database (up to k
Expand All @@ -447,7 +405,7 @@ class NodeConnectionManager {
// Let foundTarget: boolean = false;
let foundAddress: NodeAddress | undefined = undefined;
// Get the closest alpha nodes to the target node (set as shortlist)
const shortlist: Array<NodeData> = await this.getClosestLocalNodes(
const shortlist = await this.nodeGraph.getClosestNodes(
targetNodeId,
this.initialClosestNodes,
);
Expand All @@ -470,25 +428,26 @@ class NodeConnectionManager {
if (nextNode == null) {
break;
}
const [nextNodeId, nextNodeAddress] = nextNode;
// Skip if the node has already been contacted
if (contacted[nextNode.id]) {
if (contacted[nextNodeId]) {
continue;
}
// Connect to the node (check if pre-existing connection exists, otherwise
// create a new one)
try {
// Add the node to the database so that we can find its address in
// call to getConnectionToNode
await this.nodeGraph.setNode(nextNode.id, nextNode.address);
await this.getConnection(nextNode.id);
await this.nodeGraph.setNode(nextNodeId, nextNodeAddress.address);
await this.getConnection(nextNodeId);
} catch (e) {
// If we can't connect to the node, then skip it
continue;
}
contacted[nextNode.id] = true;
contacted[nextNodeId] = true;
// Ask the node to get their own closest nodes to the target
const foundClosest = await this.getRemoteNodeClosestNodes(
nextNode.id,
nextNodeId,
targetNodeId,
);
// Check to see if any of these are the target node. At the same time, add
Expand Down
136 changes: 133 additions & 3 deletions src/nodes/NodeGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class NodeGraph {
// Bucket metadata sublevel: `!meta<space>!<lexi(NodeBucketIndex)>!<key> -> value`
this.nodeGraphMetaDbPath = [...this.nodeGraphDbPath, 'meta' + space];
// Bucket sublevel: `!buckets<space>!<lexi(NodeBucketIndex)>!<NodeId> -> NodeData`
// The BucketIndex can range from 0 to NodeId bitsize minus 1
// The BucketIndex can range from 0 to NodeId bit-size minus 1
// So 256 bits means 256 buckets of 0 to 255
this.nodeGraphBucketsDbPath = [...this.nodeGraphDbPath, 'buckets' + space];
// Last updated sublevel: `!lastUpdated<space>!<lexi(NodeBucketIndex)>!<lexi(lastUpdated)>-<NodeId> -> NodeId`
Expand Down Expand Up @@ -166,7 +166,7 @@ class NodeGraph {
}

/**
* Get all nodes
* Get all nodes.
* Nodes are always sorted by `NodeBucketIndex` first
* Then secondly by the node IDs
* The `order` parameter applies to both, for example possible sorts:
Expand Down Expand Up @@ -340,7 +340,7 @@ class NodeGraph {
}

/**
* Gets all buckets
* Gets all buckets.
* Buckets are always sorted by `NodeBucketIndex` first
* Then secondly by the `sort` parameter
* The `order` parameter applies to both, for example possible sorts:
Expand Down Expand Up @@ -582,6 +582,136 @@ class NodeGraph {
return value;
}

/**
* Finds the set of nodes (of size k) known by the current node (i.e. in its
* buckets' database) that have the smallest distance to the target node (i.e.
* are closest to the target node).
* i.e. FIND_NODE RPC from Kademlia spec
*
* Used by the RPC service.
*
* @param nodeId the node ID to find other nodes closest to it
* @param limit the number of the closest nodes to return (by default, returns
* according to the maximum number of nodes per bucket)
* @param tran
* @returns a mapping containing exactly k nodeIds -> nodeAddresses (unless the
* current node has less than k nodes in all of its buckets, in which case it
* returns all nodes it has knowledge of)
*/
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async getClosestNodes(
nodeId: NodeId,
limit: number = this.nodeBucketLimit,
tran: DBTransaction,
): Promise<NodeBucket> {
// Buckets map to the target node in the following way;
// 1. 0, 1, ..., T-1 -> T
// 2. T -> 0, 1, ..., T-1
// 3. T+1, T+2, ..., 255 are unchanged
// We need to obtain nodes in the following bucket order
// 1. T
// 2. iterate over 0 ---> T-1
// 3. iterate over T+1 ---> K
// Need to work out the relevant bucket to start from
const startingBucket = nodesUtils.bucketIndex(
this.keyManager.getNodeId(),
nodeId,
);
// Getting the whole target's bucket first
const nodeIds: NodeBucket = await this.getBucket(
startingBucket,
undefined,
undefined,
tran,
);
// We need to iterate over the key stream
// When streaming we want all nodes in the starting bucket
// The keys takes the form `!(lexpack bucketId)!(nodeId)`
// We can just use `!(lexpack bucketId)` to start from
// Less than `!(bucketId 101)!` gets us buckets 100 and lower
// greater than `!(bucketId 99)!` gets up buckets 100 and greater
const prefix = Buffer.from([33]); // Code for `!` prefix
if (nodeIds.length < limit) {
// Just before target bucket
const bucketId = Buffer.from(nodesUtils.bucketKey(startingBucket));
const endKeyLower = Buffer.concat([prefix, bucketId, prefix]);
const remainingLimit = limit - nodeIds.length;
// Iterate over lower buckets
tran.iterator<NodeData>(
{
lt: endKeyLower,
limit: remainingLimit,
valueAsBuffer: false,
},
this.nodeGraphBucketsDbPath,
);
for await (const [key, nodeData] of tran.iterator<NodeData>(
{
lt: endKeyLower,
limit: remainingLimit,
valueAsBuffer: false,
},
this.nodeGraphBucketsDbPath,
)) {
const info = nodesUtils.parseBucketsDbKey(key as unknown as Buffer);
nodeIds.push([info.nodeId, nodeData]);
}
}
if (nodeIds.length < limit) {
// Just after target bucket
const bucketId = Buffer.from(nodesUtils.bucketKey(startingBucket + 1));
const startKeyUpper = Buffer.concat([prefix, bucketId, prefix]);
const remainingLimit = limit - nodeIds.length;
// Iterate over ids further away
tran.iterator(
{
gt: startKeyUpper,
limit: remainingLimit,
},
this.nodeGraphBucketsDbPath,
);
for await (const [key, nodeData] of tran.iterator<NodeData>(
{
gt: startKeyUpper,
limit: remainingLimit,
valueAsBuffer: false,
},
this.nodeGraphBucketsDbPath,
)) {
const info = nodesUtils.parseBucketsDbKey(key as unknown as Buffer);
nodeIds.push([info.nodeId, nodeData]);
}
}
// If no nodes were found, return nothing
if (nodeIds.length === 0) return [];
// Need to get the whole of the last bucket
const lastBucketIndex = nodesUtils.bucketIndex(
this.keyManager.getNodeId(),
nodeIds[nodeIds.length - 1][0],
);
const lastBucket = await this.getBucket(
lastBucketIndex,
undefined,
undefined,
tran,
);
// Pop off elements of the same bucket to avoid duplicates
let element = nodeIds.pop();
while (
element != null &&
nodesUtils.bucketIndex(this.keyManager.getNodeId(), element[0]) ===
lastBucketIndex
) {
element = nodeIds.pop();
}
if (element != null) nodeIds.push(element);
// Adding last bucket to the list
nodeIds.push(...lastBucket);

nodesUtils.bucketSortByDistance(nodeIds, nodeId, 'asc');
return nodeIds.slice(0, limit);
}

/**
* Sets a bucket meta property
* This is protected because users cannot directly manipulate bucket meta
Expand Down
Loading

0 comments on commit 072b8ca

Please sign in to comment.