Skip to content

Commit

Permalink
fix: NodeManager.setNode Properly handles adding new node when buck…
Browse files Browse the repository at this point in the history
…et is full

Logic of adding nodes has been split between `NodeManager` and `NodeGraph`. The `NodeGraph.setNode` just handles adding a node to the bucket where the `NodeManager.setNode` contains the logic of when to add the node

Relates #359
  • Loading branch information
tegefaulkes committed Jun 2, 2022
1 parent 51e2e5a commit 1bc8d57
Show file tree
Hide file tree
Showing 10 changed files with 375 additions and 75 deletions.
3 changes: 2 additions & 1 deletion src/agent/service/nodesClosestLocalNodesGet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ function nodesClosestLocalNodesGet({
);
// Get all local nodes that are closest to the target node from the request
const closestNodes = await db.withTransactionF(
async (tran) => await nodeGraph.getClosestNodes(nodeId, tran),
async (tran) =>
await nodeGraph.getClosestNodes(nodeId, undefined, tran),
);
for (const [nodeId, nodeData] of closestNodes) {
const addressMessage = new nodesPB.Address();
Expand Down
11 changes: 8 additions & 3 deletions src/client/GRPCClientClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import type { ClientReadableStream } from '@grpc/grpc-js/build/src/call';
import type { AsyncGeneratorReadableStreamClient } from '../grpc/types';
import type { Session } from '../sessions';
import type { NodeId } from '../nodes/types';
import type { Host, Port, TLSConfig, ProxyConfig } from '../network/types';
import type { Host, Port, ProxyConfig, TLSConfig } from '../network/types';
import type * as utilsPB from '../proto/js/polykey/v1/utils/utils_pb';
import type * as agentPB from '../proto/js/polykey/v1/agent/agent_pb';
import type * as vaultsPB from '../proto/js/polykey/v1/vaults/vaults_pb';
Expand Down Expand Up @@ -68,7 +68,7 @@ class GRPCClientClient extends GRPCClient<ClientServiceClient> {
interceptors,
logger,
});
const grpcClientClient = new GRPCClientClient({
return new GRPCClientClient({
client,
nodeId,
host,
Expand All @@ -80,7 +80,6 @@ class GRPCClientClient extends GRPCClient<ClientServiceClient> {
destroyCallback,
logger,
});
return grpcClientClient;
}

public async destroy() {
Expand Down Expand Up @@ -905,6 +904,12 @@ class GRPCClientClient extends GRPCClient<ClientServiceClient> {
public nodesGetAll(...args) {
return grpcUtils.promisifyUnaryCall<nodesPB.NodeBuckets>(
this.client,
{
nodeId: this.nodeId,
host: this.host,
port: this.port,
command: this.identitiesAuthenticate.name,
},
this.client.nodesGetAll,
)(...args);
}
Expand Down
1 change: 1 addition & 0 deletions src/client/service/nodesAdd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ function nodesAdd({
host,
port,
} as NodeAddress,
undefined,
tran,
),
);
Expand Down
10 changes: 7 additions & 3 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,7 @@ import type {
NodeId,
NodeIdString,
SeedNodes,
NodeEntry,
} from './types';
import type { DBTransaction } from '@matrixai/db';
import { withF } from '@matrixai/resources';
import Logger from '@matrixai/logger';
import { ready, StartStop } from '@matrixai/async-init/dist/StartStop';
Expand Down Expand Up @@ -103,7 +101,7 @@ class NodeConnectionManager {
this.logger.info(`Starting ${this.constructor.name}`);
for (const nodeIdEncoded in this.seedNodes) {
const nodeId = nodesUtils.decodeNodeId(nodeIdEncoded)!;
await this.nodeGraph.setNode(nodeId, this.seedNodes[nodeIdEncoded]);
await this.nodeGraph.setNode(nodeId, this.seedNodes[nodeIdEncoded]); // FIXME: also fine implicit transactions
}
this.logger.info(`Started ${this.constructor.name}`);
}
Expand Down Expand Up @@ -243,6 +241,7 @@ class NodeConnectionManager {
)}`,
);
// Creating the connection and set in map
// FIXME: this is fine, just use the implicit tran. fix this when adding optional transactions
const targetAddress = await this.findNode(targetNodeId);
// If the stored host is not a valid host (IP address),
// then we assume it to be a hostname
Expand Down Expand Up @@ -363,6 +362,7 @@ class NodeConnectionManager {
* Retrieves the node address. If an entry doesn't exist in the db, then
* proceeds to locate it using Kademlia.
* @param targetNodeId Id of the node we are tying to find
* @param tran
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async findNode(targetNodeId: NodeId): Promise<NodeAddress> {
Expand Down Expand Up @@ -405,6 +405,7 @@ class NodeConnectionManager {
// Let foundTarget: boolean = false;
let foundAddress: NodeAddress | undefined = undefined;
// Get the closest alpha nodes to the target node (set as shortlist)
// FIXME: no tran
const shortlist = await this.nodeGraph.getClosestNodes(
targetNodeId,
this.initialClosestNodes,
Expand Down Expand Up @@ -438,6 +439,7 @@ class NodeConnectionManager {
try {
// Add the node to the database so that we can find its address in
// call to getConnectionToNode
// FIXME: no tran
await this.nodeGraph.setNode(nextNodeId, nextNodeAddress.address);
await this.getConnection(nextNodeId);
} catch (e) {
Expand All @@ -458,6 +460,7 @@ class NodeConnectionManager {
continue;
}
if (nodeId.equals(targetNodeId)) {
// FIXME: no tran
await this.nodeGraph.setNode(nodeId, nodeData.address);
foundAddress = nodeData.address;
// We have found the target node, so we can stop trying to look for it
Expand Down Expand Up @@ -556,6 +559,7 @@ class NodeConnectionManager {
);
for (const [nodeId, nodeData] of nodes) {
// FIXME: this should be the `nodeManager.setNode`
// FIXME: no tran needed
await this.nodeGraph.setNode(nodeId, nodeData.address);
}
}
Expand Down
153 changes: 113 additions & 40 deletions src/nodes/NodeGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,14 @@ class NodeGraph {
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async getNode(
nodeId: NodeId,
tran: DBTransaction,
tran?: DBTransaction,
): Promise<NodeData | undefined> {
if (tran == null) {
return this.db.withTransactionF(async (tran) =>
this.getNode(nodeId, tran),
);
}

const [bucketIndex] = this.bucketIndex(nodeId);
const bucketDomain = [
...this.nodeGraphBucketsDbPath,
Expand All @@ -176,8 +182,15 @@ class NodeGraph {
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async *getNodes(
order: 'asc' | 'desc' = 'asc',
tran: DBTransaction,
tran?: DBTransaction,
): AsyncGenerator<[NodeId, NodeData]> {
if (tran == null) {
const getNodes = (tran) => this.getNodes(order, tran);
return yield* this.db.withTransactionG(async function* (tran) {
return yield* getNodes(tran);
});
}

for await (const [key, nodeData] of tran.iterator<NodeData>(
{
reverse: order !== 'asc',
Expand All @@ -190,70 +203,93 @@ class NodeGraph {
}
}

/**
* Will add a node to the node graph and increment the bucket count.
* If the node already existed it will be updated.
* @param nodeId NodeId to add to the NodeGraph
* @param nodeAddress Address information to add
* @param tran
*/
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async setNode(
nodeId: NodeId,
nodeAddress: NodeAddress,
tran: DBTransaction,
tran?: DBTransaction,
): Promise<void> {
if (tran == null) {
return this.db.withTransactionF(async (tran) =>
this.setNode(nodeId, nodeAddress, tran),
);
}

const [bucketIndex, bucketKey] = this.bucketIndex(nodeId);
const lastUpdatedPath = [...this.nodeGraphLastUpdatedDbPath, bucketKey];
const bucketPath = [...this.nodeGraphBucketsDbPath, bucketKey];
const nodeData = await tran.get<NodeData>([
...bucketPath,
nodesUtils.bucketDbKey(nodeId),
]);
// If this is a new entry, check the bucket limit
if (nodeData == null) {
const count = await this.getBucketMetaProp(bucketIndex, 'count', tran);
if (count < this.nodeBucketLimit) {
// Increment the bucket count
await this.setBucketMetaProp(bucketIndex, 'count', count + 1, tran);
} else {
// Remove the oldest entry in the bucket
let oldestLastUpdatedKey: Buffer;
let oldestNodeId: NodeId;
for await (const [key] of tran.iterator(
{
limit: 1,
values: false,
},
this.nodeGraphLastUpdatedDbPath,
)) {
oldestLastUpdatedKey = key as unknown as Buffer;
({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey(
key as unknown as Buffer,
));
}
await tran.del([...bucketPath, oldestNodeId!.toBuffer()]);
await tran.del([...lastUpdatedPath, oldestLastUpdatedKey!]);
}
} else {
// This is an existing entry, so the index entry must be reset
if (nodeData != null) {
// If the node already exists we want to remove the old `lastUpdated`
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(
nodeData.lastUpdated,
nodeId,
);
await tran.del([...lastUpdatedPath, lastUpdatedKey]);
} else {
// It didn't exist so we want to increment the bucket count
const count = await this.getBucketMetaProp(bucketIndex, 'count', tran);
await this.setBucketMetaProp(bucketIndex, 'count', count + 1, tran);
}
const lastUpdated = getUnixtime();
await tran.put([...bucketPath, nodesUtils.bucketDbKey(nodeId)], {
address: nodeAddress,
lastUpdated,
});
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(
const newLastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(
lastUpdated,
nodeId,
);
await tran.put(
[...lastUpdatedPath, lastUpdatedKey],
[...lastUpdatedPath, newLastUpdatedKey],
nodesUtils.bucketDbKey(nodeId),
true,
);
}

@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async unsetNode(nodeId: NodeId, tran: DBTransaction): Promise<void> {
public async getOldestNode(
bucketIndex: number,
tran?: DBTransaction,
): Promise<NodeId | undefined> {
if (tran == null) {
return this.db.withTransactionF(async (tran) =>
this.getOldestNode(bucketIndex, tran),
);
}

const bucketKey = nodesUtils.bucketKey(bucketIndex);
// Remove the oldest entry in the bucket
let oldestNodeId: NodeId | undefined;
for await (const [key] of tran.iterator({ limit: 1 }, [
...this.nodeGraphLastUpdatedDbPath,
bucketKey,
])) {
({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey(
key as unknown as Buffer,
));
}
return oldestNodeId;
}

@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async unsetNode(nodeId: NodeId, tran?: DBTransaction): Promise<void> {
if (tran == null) {
return this.db.withTransactionF(async (tran) =>
this.unsetNode(nodeId, tran),
);
}

const [bucketIndex, bucketKey] = this.bucketIndex(nodeId);
const bucketPath = [...this.nodeGraphBucketsDbPath, bucketKey];
const lastUpdatedPath = [...this.nodeGraphLastUpdatedDbPath, bucketKey];
Expand Down Expand Up @@ -284,8 +320,14 @@ class NodeGraph {
bucketIndex: NodeBucketIndex,
sort: 'nodeId' | 'distance' | 'lastUpdated' = 'nodeId',
order: 'asc' | 'desc' = 'asc',
tran: DBTransaction,
tran?: DBTransaction,
): Promise<NodeBucket> {
if (tran == null) {
return this.db.withTransactionF(async (tran) =>
this.getBucket(bucketIndex, sort, order, tran),
);
}

if (bucketIndex < 0 || bucketIndex >= this.nodeIdBits) {
throw new nodesErrors.ErrorNodeGraphBucketIndex(
`bucketIndex must be between 0 and ${this.nodeIdBits - 1} inclusive`,
Expand Down Expand Up @@ -355,8 +397,15 @@ class NodeGraph {
public async *getBuckets(
sort: 'nodeId' | 'distance' | 'lastUpdated' = 'nodeId',
order: 'asc' | 'desc' = 'asc',
tran: DBTransaction,
tran?: DBTransaction,
): AsyncGenerator<[NodeBucketIndex, NodeBucket]> {
if (tran == null) {
const getBuckets = (tran) => this.getBuckets(sort, order, tran);
return yield* this.db.withTransactionG(async function* (tran) {
return yield* getBuckets(tran);
});
}

let bucketIndex: NodeBucketIndex | undefined = undefined;
let bucket: NodeBucket = [];
if (sort === 'nodeId' || sort === 'distance') {
Expand Down Expand Up @@ -448,8 +497,14 @@ class NodeGraph {
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async resetBuckets(
nodeIdOwn: NodeId,
tran: DBTransaction,
tran?: DBTransaction,
): Promise<void> {
if (tran == null) {
return this.db.withTransactionF(async (tran) =>
this.resetBuckets(nodeIdOwn, tran),
);
}

// Setup new space
const spaceNew = this.space === '0' ? '1' : '0';
const nodeGraphMetaDbPathNew = [...this.nodeGraphDbPath, 'meta' + spaceNew];
Expand Down Expand Up @@ -536,8 +591,14 @@ class NodeGraph {
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async getBucketMeta(
bucketIndex: NodeBucketIndex,
tran: DBTransaction,
tran?: DBTransaction,
): Promise<NodeBucketMeta> {
if (tran == null) {
return this.db.withTransactionF(async (tran) =>
this.getBucketMeta(bucketIndex, tran),
);
}

if (bucketIndex < 0 || bucketIndex >= this.nodeIdBits) {
throw new nodesErrors.ErrorNodeGraphBucketIndex(
`bucketIndex must be between 0 and ${this.nodeIdBits - 1} inclusive`,
Expand All @@ -561,8 +622,14 @@ class NodeGraph {
public async getBucketMetaProp<Key extends keyof NodeBucketMeta>(
bucketIndex: NodeBucketIndex,
key: Key,
tran: DBTransaction,
tran?: DBTransaction,
): Promise<NodeBucketMeta[Key]> {
if (tran == null) {
return this.db.withTransactionF(async (tran) =>
this.getBucketMetaProp(bucketIndex, key, tran),
);
}

if (bucketIndex < 0 || bucketIndex >= this.nodeIdBits) {
throw new nodesErrors.ErrorNodeGraphBucketIndex(
`bucketIndex must be between 0 and ${this.nodeIdBits - 1} inclusive`,
Expand Down Expand Up @@ -602,8 +669,14 @@ class NodeGraph {
public async getClosestNodes(
nodeId: NodeId,
limit: number = this.nodeBucketLimit,
tran: DBTransaction,
tran?: DBTransaction,
): Promise<NodeBucket> {
if (tran == null) {
return this.db.withTransactionF(async (tran) =>
this.getClosestNodes(nodeId, limit, tran),
);
}

// Buckets map to the target node in the following way;
// 1. 0, 1, ..., T-1 -> T
// 2. T -> 0, 1, ..., T-1
Expand Down Expand Up @@ -736,7 +809,7 @@ class NodeGraph {
* The bucket key is the string encoded version of bucket index
* that preserves lexicographic order
*/
protected bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] {
public bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] {
const nodeIdOwn = this.keyManager.getNodeId();
if (nodeId.equals(nodeIdOwn)) {
throw new nodesErrors.ErrorNodeGraphSameNodeId();
Expand Down
Loading

0 comments on commit 1bc8d57

Please sign in to comment.