Skip to content

Commit

Permalink
fix: NodeManager.setNode Properly handles adding new node when buck…
Browse files Browse the repository at this point in the history
…et is full

Logic of adding nodes has been split between `NodeManager` and `NodeGraph`. The `NodeGraph.setNode` just handles adding a node to the bucket where the `NodeManager.setNode` contains the logic of when to add the node

Relates #359
  • Loading branch information
tegefaulkes committed Jun 1, 2022
1 parent e93e9a1 commit 2ea69ce
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 33 deletions.
66 changes: 35 additions & 31 deletions src/nodes/NodeGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,12 @@ class NodeGraph {
}
}

/**
* Will add a node to the node graph and increment the bucket count.
* If the node already existed it will be updated.
* @param nodeId NodeId to add to the NodeGraph
* @param nodeAddress Address information to add
*/
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async setNode(
nodeId: NodeId,
Expand All @@ -269,56 +275,54 @@ class NodeGraph {
bucketDomain,
nodesUtils.bucketDbKey(nodeId),
);
// If this is a new entry, check the bucket limit
if (nodeData == null) {
const count = await this.getBucketMetaProp(bucketIndex, 'count');
if (count < this.nodeBucketLimit) {
// Increment the bucket count
this.setBucketMetaProp(bucketIndex, 'count', count + 1);
} else {
// Remove the oldest entry in the bucket
const lastUpdatedBucketDb = await this.db.level(
bucketKey,
this.nodeGraphLastUpdatedDb,
);
let oldestLastUpdatedKey: Buffer;
let oldestNodeId: NodeId;
for await (const key of lastUpdatedBucketDb.createKeyStream({
limit: 1,
})) {
oldestLastUpdatedKey = key as Buffer;
({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey(
key as Buffer,
));
}
await this.db.del(bucketDomain, oldestNodeId!.toBuffer());
await this.db.del(lastUpdatedDomain, oldestLastUpdatedKey!);
}
} else {
// This is an existing entry, so the index entry must be reset
if (nodeData != null) {
// If the node already exists we want to remove the old `lastUpdated`
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(
nodeData.lastUpdated,
nodeId,
);
await this.db.del(lastUpdatedDomain, lastUpdatedKey);
} else {
// It didn't exist so we want to increment the bucket count
const count = await this.getBucketMetaProp(bucketIndex, 'count');
await this.setBucketMetaProp(bucketIndex, 'count', count + 1);
}
const lastUpdated = getUnixtime();
await this.db.put(bucketDomain, nodesUtils.bucketDbKey(nodeId), {
address: nodeAddress,
lastUpdated,
});
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(
const newLastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(
lastUpdated,
nodeId,
);
await this.db.put(
lastUpdatedDomain,
lastUpdatedKey,
newLastUpdatedKey,
nodesUtils.bucketDbKey(nodeId),
true,
);
}

@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async getOldestNode(bucketIndex: number): Promise<NodeId | undefined> {
const bucketKey = nodesUtils.bucketKey(bucketIndex);
// Remove the oldest entry in the bucket
const lastUpdatedBucketDb = await this.db.level(
bucketKey,
this.nodeGraphLastUpdatedDb,
);
let oldestLastUpdatedKey: Buffer;
let oldestNodeId: NodeId | undefined;
for await (const key of lastUpdatedBucketDb.createKeyStream({ limit: 1 })) {
oldestLastUpdatedKey = key as Buffer;
({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey(
key as Buffer,
));
}
return oldestNodeId;
}

@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async unsetNode(nodeId: NodeId): Promise<void> {
const [bucketIndex, bucketKey] = this.bucketIndex(nodeId);
Expand All @@ -330,7 +334,7 @@ class NodeGraph {
);
if (nodeData != null) {
const count = await this.getBucketMetaProp(bucketIndex, 'count');
this.setBucketMetaProp(bucketIndex, 'count', count - 1);
await this.setBucketMetaProp(bucketIndex, 'count', count - 1);
await this.db.del(bucketDomain, nodesUtils.bucketDbKey(nodeId));
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(
nodeData.lastUpdated,
Expand Down Expand Up @@ -790,7 +794,7 @@ class NodeGraph {
* The bucket key is the string encoded version of bucket index
* that preserves lexicographic order
*/
protected bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] {
public bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] {
const nodeIdOwn = this.keyManager.getNodeId();
if (nodeId.equals(nodeIdOwn)) {
throw new nodesErrors.ErrorNodeGraphSameNodeId();
Expand Down
43 changes: 41 additions & 2 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { ChainData, ChainDataEncoded } from '../sigchain/types';
import type { NodeId, NodeAddress, NodeBucket } from '../nodes/types';
import type { ClaimEncoded } from '../claims/types';
import Logger from '@matrixai/logger';
import { getUnixtime } from '@/utils';
import * as nodesErrors from './errors';
import * as nodesUtils from './utils';
import { utils as validationUtils } from '../validation';
Expand All @@ -17,6 +18,7 @@ import * as networkErrors from '../network/errors';
import * as networkUtils from '../network/utils';
import * as sigchainUtils from '../sigchain/utils';
import * as claimsUtils from '../claims/utils';
import { NodeData } from '../nodes/types';

class NodeManager {
protected db: DB;
Expand Down Expand Up @@ -53,6 +55,10 @@ class NodeManager {
* Determines whether a node in the Polykey network is online.
* @return true if online, false if offline
*/
// FIXME: We shouldn't be trying to find the node just to ping it
// since we are usually pinging it during the find procedure anyway.
// I think we should be providing the address of what we're trying to ping,
// possibly make it an optional parameter?
public async pingNode(targetNodeId: NodeId): Promise<boolean> {
const targetAddress: NodeAddress =
await this.nodeConnectionManager.findNode(targetNodeId);
Expand Down Expand Up @@ -326,13 +332,46 @@ class NodeManager {
}

/**
* Sets a node in the NodeGraph
* Adds a node to the node graph.
* Updates the node if the node already exists.
*
*/
public async setNode(
nodeId: NodeId,
nodeAddress: NodeAddress,
force = false,
): Promise<void> {
return await this.nodeGraph.setNode(nodeId, nodeAddress);
// When adding a node we need to handle 3 cases
// 1. The node already exists. We need to update it's last updated field
// 2. The node doesn't exist and bucket has room.
// We need to add the node to the bucket
// 3. The node doesn't exist and the bucket is full.
// We need to ping the oldest node. If the ping succeeds we need to update
// the lastUpdated of the oldest node and drop the new one. If the ping
// fails we delete the old node and add in the new one.
const nodeData = await this.nodeGraph.getNode(nodeId);
// If this is a new entry, check the bucket limit
const [bucketIndex] = this.nodeGraph.bucketIndex(nodeId);
const count = await this.nodeGraph.getBucketMetaProp(bucketIndex, 'count');
if (nodeData != null || count < this.nodeGraph.nodeBucketLimit) {
// Either already exists or has room in the bucket
// We want to add or update the node
await this.nodeGraph.setNode(nodeId, nodeAddress);
} else {
// We want to add a node but the bucket is full
// We need to ping the oldest node
const oldestNodeId = (await this.nodeGraph.getOldestNode(bucketIndex))!;
if ((await this.pingNode(oldestNodeId)) && !force) {
// The node responded, we need to update it's info and drop the new node
const oldestNode = (await this.nodeGraph.getNode(oldestNodeId))!;
await this.nodeGraph.setNode(oldestNodeId, oldestNode.address);
} else {
// The node could not be contacted or force was set,
// we drop it in favor of the new node
await this.nodeGraph.unsetNode(oldestNodeId);
await this.nodeGraph.setNode(nodeId, nodeAddress);
}
}
}

// FIXME
Expand Down
181 changes: 181 additions & 0 deletions tests/nodes/NodeManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Sigchain from '@/sigchain/Sigchain';
import * as claimsUtils from '@/claims/utils';
import { promisify, sleep } from '@/utils';
import * as nodesUtils from '@/nodes/utils';
import * as nodesTestUtils from './utils';

describe(`${NodeManager.name} test`, () => {
const password = 'password';
Expand Down Expand Up @@ -425,4 +426,184 @@ describe(`${NodeManager.name} test`, () => {
expect(chainData).toContain(nodesUtils.encodeNodeId(yNodeId));
});
});
test('should add a node when bucket has room', async () => {
const nodeManager = new NodeManager({
db,
sigchain: {} as Sigchain,
keyManager,
nodeGraph,
nodeConnectionManager: {} as NodeConnectionManager,
logger,
});
const localNodeId = keyManager.getNodeId();
const bucketIndex = 100;
const nodeId = nodesTestUtils.generateNodeIdForBucket(
localNodeId,
bucketIndex,
);
await nodeManager.setNode(nodeId, {} as NodeAddress);

// Checking bucket
const bucket = await nodeManager.getBucket(bucketIndex);
expect(bucket).toHaveLength(1);
});
test('should update a node if node exists', async () => {
const nodeManager = new NodeManager({
db,
sigchain: {} as Sigchain,
keyManager,
nodeGraph,
nodeConnectionManager: {} as NodeConnectionManager,
logger,
});
const localNodeId = keyManager.getNodeId();
const bucketIndex = 100;
const nodeId = nodesTestUtils.generateNodeIdForBucket(
localNodeId,
bucketIndex,
);
await nodeManager.setNode(nodeId, {
host: '' as Host,
port: 11111 as Port,
});

const nodeData = (await nodeGraph.getNode(nodeId))!;
await sleep(1100);

// Should update the node
await nodeManager.setNode(nodeId, {
host: '' as Host,
port: 22222 as Port,
});

const newNodeData = (await nodeGraph.getNode(nodeId))!;
expect(newNodeData.address.port).not.toEqual(nodeData.address.port);
expect(newNodeData.lastUpdated).not.toEqual(nodeData.lastUpdated);
});
test('should not add node if bucket is full and old node is alive', async () => {
const nodeManager = new NodeManager({
db,
sigchain: {} as Sigchain,
keyManager,
nodeGraph,
nodeConnectionManager: {} as NodeConnectionManager,
logger,
});
const localNodeId = keyManager.getNodeId();
const bucketIndex = 100;
// Creating 20 nodes in bucket
for (let i = 1; i <= 20; i++) {
const nodeId = nodesTestUtils.generateNodeIdForBucket(
localNodeId,
bucketIndex,
i,
);
await nodeManager.setNode(nodeId, { port: i } as NodeAddress);
}
const nodeId = nodesTestUtils.generateNodeIdForBucket(
localNodeId,
bucketIndex,
);
// Mocking ping
const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode');
nodeManagerPingMock.mockResolvedValue(true);
const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex);
const oldestNode = await nodeGraph.getNode(oldestNodeId!);
// Waiting for a second to tick over
await sleep(1100);
// Adding a new node with bucket full
await nodeManager.setNode(nodeId, { port: 55555 } as NodeAddress);
// Bucket still contains max nodes
const bucket = await nodeManager.getBucket(bucketIndex);
expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit);
// New node was not added
const node = await nodeGraph.getNode(nodeId);
expect(node).toBeUndefined();
// Oldest node was updated
const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!);
expect(oldestNodeNew!.lastUpdated).not.toEqual(oldestNode!.lastUpdated);
nodeManagerPingMock.mockRestore();
});
test('should add node if bucket is full, old node is alive and force is set', async () => {
const nodeManager = new NodeManager({
db,
sigchain: {} as Sigchain,
keyManager,
nodeGraph,
nodeConnectionManager: {} as NodeConnectionManager,
logger,
});
const localNodeId = keyManager.getNodeId();
const bucketIndex = 100;
// Creating 20 nodes in bucket
for (let i = 1; i <= 20; i++) {
const nodeId = nodesTestUtils.generateNodeIdForBucket(
localNodeId,
bucketIndex,
i,
);
await nodeManager.setNode(nodeId, { port: i } as NodeAddress);
}
const nodeId = nodesTestUtils.generateNodeIdForBucket(
localNodeId,
bucketIndex,
);
// Mocking ping
const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode');
nodeManagerPingMock.mockResolvedValue(true);
const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex);
// Adding a new node with bucket full
await nodeManager.setNode(nodeId, { port: 55555 } as NodeAddress, true);
// Bucket still contains max nodes
const bucket = await nodeManager.getBucket(bucketIndex);
expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit);
// New node was added
const node = await nodeGraph.getNode(nodeId);
expect(node).toBeDefined();
// Oldest node was removed
const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!);
expect(oldestNodeNew).toBeUndefined();
nodeManagerPingMock.mockRestore();
});
test('should add node if bucket is full and old node is dead', async () => {
const nodeManager = new NodeManager({
db,
sigchain: {} as Sigchain,
keyManager,
nodeGraph,
nodeConnectionManager: {} as NodeConnectionManager,
logger,
});
const localNodeId = keyManager.getNodeId();
const bucketIndex = 100;
// Creating 20 nodes in bucket
for (let i = 1; i <= 20; i++) {
const nodeId = nodesTestUtils.generateNodeIdForBucket(
localNodeId,
bucketIndex,
i,
);
await nodeManager.setNode(nodeId, { port: i } as NodeAddress);
}
const nodeId = nodesTestUtils.generateNodeIdForBucket(
localNodeId,
bucketIndex,
);
// Mocking ping
const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode');
nodeManagerPingMock.mockResolvedValue(false);
const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex);
// Adding a new node with bucket full
await nodeManager.setNode(nodeId, { port: 55555 } as NodeAddress, true);
// Bucket still contains max nodes
const bucket = await nodeManager.getBucket(bucketIndex);
expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit);
// New node was added
const node = await nodeGraph.getNode(nodeId);
expect(node).toBeDefined();
// Oldest node was removed
const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!);
expect(oldestNodeNew).toBeUndefined();
nodeManagerPingMock.mockRestore();
});
});

0 comments on commit 2ea69ce

Please sign in to comment.