Skip to content

Commit

Permalink
fix: NodeManager.setNode Properly handles adding new node when buck…
Browse files Browse the repository at this point in the history
…et is full

Logic of adding nodes has been split between `NodeManager` and `NodeGraph`. The `NodeGraph.setNode` just handles adding a node to the bucket where the `NodeManager.setNode` contains the logic of when to add the node

Relates #359
  • Loading branch information
tegefaulkes committed Mar 23, 2022
1 parent 8b421ac commit b6d701f
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 30 deletions.
61 changes: 33 additions & 28 deletions src/nodes/NodeGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,12 @@ class NodeGraph {
}
}


/**
* Will add a node to the node graph and increment the bucket count.
* If the node already existed it will be updated.
* @param nodeId NodeId to add to the NodeGraph
* @param nodeAddress Address information to add
*/
@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async setNode(
nodeId: NodeId,
Expand All @@ -258,34 +263,17 @@ class NodeGraph {
bucketDomain,
nodesUtils.bucketDbKey(nodeId),
);
// If this is a new entry, check the bucket limit
if (nodeData == null) {
const count = await this.getBucketMetaProp(bucketIndex, 'count');
if (count < this.nodeBucketLimit) {
// Increment the bucket count
this.setBucketMetaProp(bucketIndex, 'count', count + 1);
} else {
// Remove the oldest entry in the bucket
const lastUpdatedBucketDb = await this.db.level(
bucketKey,
this.nodeGraphLastUpdatedDb
);
let oldestLastUpdatedKey: Buffer;
let oldestNodeId: NodeId;
for await (const key of lastUpdatedBucketDb.createKeyStream({ limit: 1 })) {
oldestLastUpdatedKey = key as Buffer;
({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey(key as Buffer));
}
await this.db.del(bucketDomain, oldestNodeId!.toBuffer());
await this.db.del(lastUpdatedDomain, oldestLastUpdatedKey!);
}
} else {
// This is an existing entry, so the index entry must be reset
if(nodeData != null){
// If the node already exists we want to remove the old `lastUpdated`
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(nodeData.lastUpdated, nodeId)
await this.db.del(
lastUpdatedDomain,
lastUpdatedKey
);
} else {
// It didn't exist so we want to increment the bucket count
const count = await this.getBucketMetaProp(bucketIndex, 'count');
await this.setBucketMetaProp(bucketIndex, 'count', count + 1);
}
const lastUpdated = getUnixtime();
await this.db.put(
Expand All @@ -296,15 +284,32 @@ class NodeGraph {
lastUpdated,
}
);
const lastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(lastUpdated, nodeId)
const newLastUpdatedKey = nodesUtils.lastUpdatedBucketDbKey(lastUpdated, nodeId)
await this.db.put(
lastUpdatedDomain,
lastUpdatedKey,
newLastUpdatedKey,
nodesUtils.bucketDbKey(nodeId),
true
);
}

@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async getOldestNode(bucketIndex: number ): Promise<NodeId | undefined> {
const bucketKey = nodesUtils.bucketKey(bucketIndex);
// Remove the oldest entry in the bucket
const lastUpdatedBucketDb = await this.db.level(
bucketKey,
this.nodeGraphLastUpdatedDb
);
let oldestLastUpdatedKey: Buffer;
let oldestNodeId: NodeId | undefined;
for await (const key of lastUpdatedBucketDb.createKeyStream({ limit: 1 })) {
oldestLastUpdatedKey = key as Buffer;
({ nodeId: oldestNodeId } = nodesUtils.parseLastUpdatedBucketDbKey(key as Buffer));
}
return oldestNodeId;
}

@ready(new nodesErrors.ErrorNodeGraphNotRunning())
public async unsetNode(nodeId: NodeId): Promise<void> {
const [bucketIndex, bucketKey] = this.bucketIndex(nodeId);
Expand All @@ -316,7 +321,7 @@ class NodeGraph {
);
if (nodeData != null) {
const count = await this.getBucketMetaProp(bucketIndex, 'count');
this.setBucketMetaProp(bucketIndex, 'count', count - 1);
await this.setBucketMetaProp(bucketIndex, 'count', count - 1);
await this.db.del(
bucketDomain,
nodesUtils.bucketDbKey(nodeId)
Expand Down Expand Up @@ -750,7 +755,7 @@ class NodeGraph {
* The bucket key is the string encoded version of bucket index
* that preserves lexicographic order
*/
protected bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] {
public bucketIndex(nodeId: NodeId): [NodeBucketIndex, string] {
const nodeIdOwn = this.keyManager.getNodeId();
if (nodeId.equals(nodeIdOwn)) {
throw new nodesErrors.ErrorNodeGraphSameNodeId();
Expand Down
43 changes: 41 additions & 2 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import * as networkErrors from '../network/errors';
import * as networkUtils from '../network/utils';
import * as sigchainUtils from '../sigchain/utils';
import * as claimsUtils from '../claims/utils';
import { NodeData } from '../nodes/types';
import { getUnixtime } from '@/utils';

class NodeManager {
protected db: DB;
Expand Down Expand Up @@ -53,6 +55,10 @@ class NodeManager {
* Determines whether a node in the Polykey network is online.
* @return true if online, false if offline
*/
// FIXME: We shouldn't be trying to find the node just to ping it
// since we are usually pinging it during the find procedure anyway.
// I think we should be providing the address of what we're trying to ping,
// possibly make it an optional parameter?
public async pingNode(targetNodeId: NodeId): Promise<boolean> {
const targetAddress: NodeAddress =
await this.nodeConnectionManager.findNode(targetNodeId);
Expand Down Expand Up @@ -326,13 +332,46 @@ class NodeManager {
}

/**
* Sets a node in the NodeGraph
* Adds a node to the node graph.
* Updates the node if the node already exists.
*
*/
public async setNode(
nodeId: NodeId,
nodeAddress: NodeAddress,
force = false,
): Promise<void> {
return await this.nodeGraph.setNode(nodeId, nodeAddress);
// When adding a node we need to handle 3 cases
// 1. The node already exists. We need to update it's last updated field
// 2. The node doesn't exist and bucket has room.
// We need to add the node to the bucket
// 3. The node doesn't exist and the bucket is full.
// We need to ping the oldest node. If the ping succeeds we need to update
// the lastUpdated of the oldest node and drop the new one. If the ping
// fails we delete the old node and add in the new one.
const nodeData = await this.nodeGraph.getNode(nodeId);
// If this is a new entry, check the bucket limit
const [bucketIndex, ] = this.nodeGraph.bucketIndex(nodeId)
const count = await this.nodeGraph.getBucketMetaProp(bucketIndex, 'count');
if (nodeData == null && count < this.nodeGraph.nodeBucketLimit) {
// We want to add a node but the bucket is full
// We need to ping the oldest node
const oldestNodeId = (await this.nodeGraph.getOldestNode(bucketIndex))!;
if (await this.pingNode(oldestNodeId) && !force){
// The node responded, we need to update it's info and drop the new node
const oldestNode = (await this.nodeGraph.getNode(oldestNodeId))!;
await this.nodeGraph.setNode(oldestNodeId, oldestNode.address);
} else {
// The node could not be contacted or force was set,
// we drop it in favor of the new node
await this.nodeGraph.unsetNode(oldestNodeId);
await this.nodeGraph.setNode(nodeId, nodeAddress);
}
} else {
// Either already exists or has room in the bucket
// We want to add or update the node
await this.nodeGraph.setNode(nodeId, nodeAddress);
}
}

/**
Expand Down
153 changes: 153 additions & 0 deletions tests/nodes/NodeManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Sigchain from '@/sigchain/Sigchain';
import * as claimsUtils from '@/claims/utils';
import { sleep } from '@/utils';
import * as nodesUtils from '@/nodes/utils';
import * as nodesTestUtils from './utils';

describe(`${NodeManager.name} test`, () => {
const password = 'password';
Expand Down Expand Up @@ -412,4 +413,156 @@ describe(`${NodeManager.name} test`, () => {
expect(chainData).toContain(nodesUtils.encodeNodeId(yNodeId));
});
});
test('should add a node when bucket has room', async () => {
const nodeManager = new NodeManager({
db,
sigchain: {} as Sigchain,
keyManager,
nodeGraph,
nodeConnectionManager: {} as NodeConnectionManager,
logger,
});
const localNodeId = keyManager.getNodeId();
const bucketIndex = 100;
const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex);
await nodeManager.setNode(nodeId, {} as NodeAddress);

// checking bucket
const bucket = await nodeManager.getBucket(bucketIndex);
expect(bucket).toHaveLength(1);
});
test('should update a node if node exists', async () => {
const nodeManager = new NodeManager({
db,
sigchain: {} as Sigchain,
keyManager,
nodeGraph,
nodeConnectionManager: {} as NodeConnectionManager,
logger,
});
const localNodeId = keyManager.getNodeId();
const bucketIndex = 100;
const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex);
await nodeManager.setNode(nodeId, {
host: '' as Host,
port: 11111 as Port
});

const nodeData = (await nodeGraph.getNode(nodeId))!;
await sleep(100);

// should update the node
await nodeManager.setNode(nodeId, {
host: '' as Host,
port: 22222 as Port
});

const newNodeData = (await nodeGraph.getNode(nodeId))!;
expect(newNodeData.address.port).not.toEqual(nodeData.address.port)
expect(newNodeData.lastUpdated).not.toEqual(nodeData.lastUpdated)

});
test('should not add node if bucket is full and old node is alive', async () => {
const nodeManager = new NodeManager({
db,
sigchain: {} as Sigchain,
keyManager,
nodeGraph,
nodeConnectionManager: {} as NodeConnectionManager,
logger,
});
const localNodeId = keyManager.getNodeId();
const bucketIndex = 100;
// creating 20 nodes in bucket
for (let i = 1; i <= 20; i++) {
const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex, i);
await nodeManager.setNode(nodeId, {port: i} as NodeAddress);
}
const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex);
//mocking ping
const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode');
nodeManagerPingMock.mockResolvedValue(true);
const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex);
const oldestNode = await nodeGraph.getNode(oldestNodeId!);
// adding a new node with bucket full
await nodeManager.setNode(nodeId, {port: 55555} as NodeAddress);
// bucket still contains max nodes
const bucket = await nodeManager.getBucket(bucketIndex);
expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit);
// new node was not added
const node = await nodeGraph.getNode(nodeId);
expect(node).toBeUndefined();
// oldest node was updated
const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!);
expect(oldestNodeNew!.lastUpdated).not.toEqual(oldestNode!.lastUpdated);
nodeManagerPingMock.mockRestore();
});
test('should add node if bucket is full, old node is alive and force is set', async () => {
const nodeManager = new NodeManager({
db,
sigchain: {} as Sigchain,
keyManager,
nodeGraph,
nodeConnectionManager: {} as NodeConnectionManager,
logger,
});
const localNodeId = keyManager.getNodeId();
const bucketIndex = 100;
// creating 20 nodes in bucket
for (let i = 1; i <= 20; i++) {
const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex, i);
await nodeManager.setNode(nodeId, {port: i} as NodeAddress);
}
const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex);
//mocking ping
const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode');
nodeManagerPingMock.mockResolvedValue(true);
const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex);
// adding a new node with bucket full
await nodeManager.setNode(nodeId, {port: 55555} as NodeAddress, true);
// bucket still contains max nodes
const bucket = await nodeManager.getBucket(bucketIndex);
expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit);
// new node was added
const node = await nodeGraph.getNode(nodeId);
expect(node).toBeDefined();
// oldest node was removed
const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!);
expect(oldestNodeNew).toBeUndefined();
nodeManagerPingMock.mockRestore();
});
test('should add node if bucket is full and old node is dead', async () => {
const nodeManager = new NodeManager({
db,
sigchain: {} as Sigchain,
keyManager,
nodeGraph,
nodeConnectionManager: {} as NodeConnectionManager,
logger,
});
const localNodeId = keyManager.getNodeId();
const bucketIndex = 100;
// creating 20 nodes in bucket
for (let i = 1; i <= 20; i++) {
const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex, i);
await nodeManager.setNode(nodeId, {port: i} as NodeAddress);
}
const nodeId = nodesTestUtils.generateNodeIdForBucket(localNodeId, bucketIndex);
//mocking ping
const nodeManagerPingMock = jest.spyOn(NodeManager.prototype, 'pingNode');
nodeManagerPingMock.mockResolvedValue(false);
const oldestNodeId = await nodeGraph.getOldestNode(bucketIndex);
// adding a new node with bucket full
await nodeManager.setNode(nodeId, {port: 55555} as NodeAddress, true);
// bucket still contains max nodes
const bucket = await nodeManager.getBucket(bucketIndex);
expect(bucket).toHaveLength(nodeGraph.nodeBucketLimit);
// new node was added
const node = await nodeGraph.getNode(nodeId);
expect(node).toBeDefined();
// oldest node was removed
const oldestNodeNew = await nodeGraph.getNode(oldestNodeId!);
expect(oldestNodeNew).toBeUndefined();
nodeManagerPingMock.mockRestore();
});
});

0 comments on commit b6d701f

Please sign in to comment.