Skip to content

Commit

Permalink
Merge pull request #310 from MatrixAI/nodeconnectionmanager
Browse files Browse the repository at this point in the history
Extracting Node Connection Management out of `NodeManager` to  `NodeConnectionManager`
  • Loading branch information
CMCDragonkai authored Feb 14, 2022
2 parents ae21266 + b09e000 commit 4aacfb8
Show file tree
Hide file tree
Showing 100 changed files with 6,083 additions and 2,597 deletions.
5 changes: 5 additions & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
"ignoreConsecutiveComments": true
}
],
"curly": [
"error",
"multi-line",
"consistent"
],
"import/order": [
"error",
{
Expand Down
84 changes: 66 additions & 18 deletions src/PolykeyAgent.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { FileSystem } from './types';
import type { PolykeyWorkerManagerInterface } from './workers/types';
import type { Host, Port } from './network/types';
import type { NodeMapping } from './nodes/types';
import type { SeedNodes } from './nodes/types';

import type { RootKeyPairChangeData } from './keys/types';
import path from 'path';
Expand All @@ -14,15 +14,16 @@ import { Status } from './status';
import { Schema } from './schema';
import { VaultManager } from './vaults';
import { ACL } from './acl';
import { NodeManager } from './nodes';
import { NodeConnectionManager, NodeGraph, NodeManager } from './nodes';
import { NotificationsManager } from './notifications';
import { GestaltGraph } from './gestalts';
import { Sigchain } from './sigchain';
import { Discovery } from './discovery';
import { SessionManager } from './sessions';
import { GRPCServer } from './grpc';
import { IdentitiesManager, providers } from './identities';
import { ForwardProxy, ReverseProxy } from './network';
import ForwardProxy from './network/ForwardProxy';
import ReverseProxy from './network/ReverseProxy';
import { EventBus, captureRejectionSymbol } from './events';
import { createAgentService, AgentServiceService } from './agent';
import { createClientService, ClientServiceService } from './client';
Expand Down Expand Up @@ -61,6 +62,7 @@ class PolykeyAgent {
networkConfig = {},
forwardProxyConfig = {},
reverseProxyConfig = {},
nodeConnectionManagerConfig = {},
seedNodes = {},
// Optional dependencies
status,
Expand All @@ -73,6 +75,8 @@ class PolykeyAgent {
gestaltGraph,
fwdProxy,
revProxy,
nodeGraph,
nodeConnectionManager,
nodeManager,
discovery,
vaultManager,
Expand Down Expand Up @@ -102,8 +106,13 @@ class PolykeyAgent {
connConnectTime?: number;
connTimeoutTime?: number;
};
nodeConnectionManagerConfig?: {
connConnectTime?: number;
connTimeoutTime?: number;
initialClosestNodes?: number;
};
networkConfig?: NetworkConfig;
seedNodes?: NodeMapping;
seedNodes?: SeedNodes;
status?: Status;
schema?: Schema;
keyManager?: KeyManager;
Expand All @@ -114,6 +123,8 @@ class PolykeyAgent {
gestaltGraph?: GestaltGraph;
fwdProxy?: ForwardProxy;
revProxy?: ReverseProxy;
nodeGraph?: NodeGraph;
nodeConnectionManager?: NodeConnectionManager;
nodeManager?: NodeManager;
discovery?: Discovery;
vaultManager?: VaultManager;
Expand Down Expand Up @@ -146,6 +157,10 @@ class PolykeyAgent {
...config.defaults.reverseProxyConfig,
...utils.filterEmptyObject(reverseProxyConfig),
};
const nodeConnectionManagerConfig_ = {
...config.defaults.nodeConnectionManagerConfig,
...utils.filterEmptyObject(nodeConnectionManagerConfig),
};
await utils.mkdirExists(fs, nodePath);
const statusPath = path.join(nodePath, config.defaults.statusBase);
const statusLockPath = path.join(nodePath, config.defaults.statusLockBase);
Expand Down Expand Up @@ -254,26 +269,45 @@ class PolykeyAgent {
...reverseProxyConfig_,
logger: logger.getChild(ReverseProxy.name),
});
nodeGraph =
nodeGraph ??
(await NodeGraph.createNodeGraph({
db,
fresh,
keyManager,
logger: logger.getChild(NodeGraph.name),
}));
nodeConnectionManager =
nodeConnectionManager ??
new NodeConnectionManager({
keyManager,
nodeGraph,
fwdProxy,
revProxy,
seedNodes,
...nodeConnectionManagerConfig_,
logger: logger.getChild(NodeConnectionManager.name),
});
nodeManager =
nodeManager ??
(await NodeManager.createNodeManager({
new NodeManager({
db,
seedNodes,
sigchain,
keyManager,
fwdProxy,
revProxy,
nodeGraph,
nodeConnectionManager,
logger: logger.getChild(NodeManager.name),
fresh,
}));
});
// Discovery uses in-memory CreateDestroy pattern
// Therefore it should be destroyed during stop
discovery =
discovery ??
(await Discovery.createDiscovery({
keyManager,
gestaltGraph,
identitiesManager,
nodeManager,
sigchain,
logger: logger.getChild(Discovery.name),
}));
vaultManager =
Expand All @@ -282,7 +316,7 @@ class PolykeyAgent {
vaultsKey: keyManager.vaultKey,
vaultsPath,
keyManager,
nodeManager,
nodeConnectionManager,
gestaltGraph,
acl,
db,
Expand All @@ -295,6 +329,7 @@ class PolykeyAgent {
(await NotificationsManager.createNotificationsManager({
acl,
db,
nodeConnectionManager,
nodeManager,
keyManager,
logger: logger.getChild(NotificationsManager.name),
Expand Down Expand Up @@ -324,7 +359,6 @@ class PolykeyAgent {
await notificationsManager?.stop();
await vaultManager?.stop();
await discovery?.destroy();
await nodeManager?.stop();
await revProxy?.stop();
await fwdProxy?.stop();
await gestaltGraph?.stop();
Expand All @@ -349,6 +383,8 @@ class PolykeyAgent {
gestaltGraph,
fwdProxy,
revProxy,
nodeGraph,
nodeConnectionManager,
nodeManager,
discovery,
vaultManager,
Expand Down Expand Up @@ -380,6 +416,8 @@ class PolykeyAgent {
public readonly gestaltGraph: GestaltGraph;
public readonly fwdProxy: ForwardProxy;
public readonly revProxy: ReverseProxy;
public readonly nodeGraph: NodeGraph;
public readonly nodeConnectionManager: NodeConnectionManager;
public readonly nodeManager: NodeManager;
public readonly discovery: Discovery;
public readonly vaultManager: VaultManager;
Expand All @@ -404,6 +442,8 @@ class PolykeyAgent {
gestaltGraph,
fwdProxy,
revProxy,
nodeGraph,
nodeConnectionManager,
nodeManager,
discovery,
vaultManager,
Expand All @@ -426,6 +466,8 @@ class PolykeyAgent {
gestaltGraph: GestaltGraph;
fwdProxy: ForwardProxy;
revProxy: ReverseProxy;
nodeGraph: NodeGraph;
nodeConnectionManager: NodeConnectionManager;
nodeManager: NodeManager;
discovery: Discovery;
vaultManager: VaultManager;
Expand All @@ -449,6 +491,8 @@ class PolykeyAgent {
this.gestaltGraph = gestaltGraph;
this.fwdProxy = fwdProxy;
this.revProxy = revProxy;
this.nodeGraph = nodeGraph;
this.nodeConnectionManager = nodeConnectionManager;
this.nodeManager = nodeManager;
this.discovery = discovery;
this.vaultManager = vaultManager;
Expand Down Expand Up @@ -513,7 +557,9 @@ class PolykeyAgent {
keyManager: this.keyManager,
vaultManager: this.vaultManager,
nodeManager: this.nodeManager,
nodeGraph: this.nodeGraph,
sigchain: this.sigchain,
nodeConnectionManager: this.nodeConnectionManager,
notificationsManager: this.notificationsManager,
});
const clientService = createClientService({
Expand All @@ -522,6 +568,8 @@ class PolykeyAgent {
gestaltGraph: this.gestaltGraph,
identitiesManager: this.identitiesManager,
keyManager: this.keyManager,
nodeGraph: this.nodeGraph,
nodeConnectionManager: this.nodeConnectionManager,
nodeManager: this.nodeManager,
notificationsManager: this.notificationsManager,
sessionManager: this.sessionManager,
Expand Down Expand Up @@ -575,9 +623,9 @@ class PolykeyAgent {
ingressPort: networkConfig_.ingressPort,
tlsConfig,
});
await this.nodeManager.start({ fresh });
await this.nodeManager.getConnectionsToSeedNodes();
await this.nodeManager.syncNodeGraph();
await this.nodeConnectionManager.start();
await this.nodeGraph.start({ fresh });
await this.nodeConnectionManager.syncNodeGraph();
await this.vaultManager.start({ fresh });
await this.notificationsManager.start({ fresh });
await this.sessionManager.start({ fresh });
Expand All @@ -597,7 +645,6 @@ class PolykeyAgent {
await this.notificationsManager?.stop();
await this.vaultManager?.stop();
await this.discovery?.destroy();
await this.nodeManager?.stop();
await this.revProxy?.stop();
await this.fwdProxy?.stop();
await this.grpcServerAgent?.stop();
Expand Down Expand Up @@ -625,7 +672,8 @@ class PolykeyAgent {
await this.notificationsManager.stop();
await this.vaultManager.stop();
await this.discovery.destroy();
await this.nodeManager.stop();
await this.nodeConnectionManager.stop();
await this.nodeGraph.stop();
await this.revProxy.stop();
await this.fwdProxy.stop();
await this.grpcServerAgent.stop();
Expand All @@ -649,7 +697,7 @@ class PolykeyAgent {
await this.sessionManager.destroy();
await this.notificationsManager.destroy();
await this.vaultManager.destroy();
await this.nodeManager.destroy();
await this.nodeGraph.destroy();
await this.gestaltGraph.destroy();
await this.acl.destroy();
await this.sigchain.destroy();
Expand Down
5 changes: 4 additions & 1 deletion src/agent/GRPCClientAgent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ class GRPCClientAgent extends GRPCClient<AgentServiceClient> {
tlsConfig,
proxyConfig,
timeout = Infinity,
destroyCallback = async () => {},
logger = new Logger(this.name),
}: {
nodeId: NodeId;
host: Host;
port: Port;
proxyConfig?: ProxyConfig;
tlsConfig?: Partial<TLSConfig>;
proxyConfig?: ProxyConfig;
timeout?: number;
destroyCallback?: () => Promise<void>;
logger?: Logger;
}): Promise<GRPCClientAgent> {
const { client, serverCertChain, flowCountInterceptor } =
Expand All @@ -63,6 +65,7 @@ class GRPCClientAgent extends GRPCClient<AgentServiceClient> {
proxyConfig,
serverCertChain,
flowCountInterceptor,
destroyCallback,
logger,
});
return grpcClientAgent;
Expand Down
8 changes: 7 additions & 1 deletion src/agent/service/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import type { KeyManager } from '../../keys';
import type { VaultManager } from '../../vaults';
import type { NodeManager } from '../../nodes';
import type {
NodeGraph,
NodeManager,
NodeConnectionManager,
} from '../../nodes';
import type { NotificationsManager } from '../../notifications';
import type { Sigchain } from '../../sigchain';
import type { IAgentServiceServer } from '../../proto/js/polykey/v1/agent_service_grpc_pb';
Expand All @@ -20,7 +24,9 @@ import { AgentServiceService } from '../../proto/js/polykey/v1/agent_service_grp
function createService(container: {
keyManager: KeyManager;
vaultManager: VaultManager;
nodeConnectionManager: NodeConnectionManager;
nodeManager: NodeManager;
nodeGraph: NodeGraph;
notificationsManager: NotificationsManager;
sigchain: Sigchain;
}) {
Expand Down
16 changes: 8 additions & 8 deletions src/agent/service/nodesChainDataGet.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
import type * as grpc from '@grpc/grpc-js';
import type { ClaimIdEncoded } from '../../claims/types';
import type { NodeManager } from '../../nodes';
import type { Sigchain } from '../../sigchain';
import type * as utilsPB from '../../proto/js/polykey/v1/utils/utils_pb';
import type { ClaimIdEncoded } from '../../claims/types';
import { utils as grpcUtils } from '../../grpc';
import * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb';

/**
* Retrieves the ChainDataEncoded of this node.
*/
function nodesChainDataGet({ nodeManager }: { nodeManager: NodeManager }) {
function nodesChainDataGet({ sigchain }: { sigchain: Sigchain }) {
return async (
call: grpc.ServerUnaryCall<utilsPB.EmptyMessage, nodesPB.ChainData>,
callback: grpc.sendUnaryData<nodesPB.ChainData>,
): Promise<void> => {
try {
const response = new nodesPB.ChainData();
const chainData = await nodeManager.getChainData();
const chainData = await sigchain.getChainData();
// Iterate through each claim in the chain, and serialize for transport
for (const c in chainData) {
const claimId = c as ClaimIdEncoded;
const claim = chainData[claimId];
let claimIdEncoded: ClaimIdEncoded;
for (claimIdEncoded in chainData) {
const claim = chainData[claimIdEncoded];
const claimMessage = new nodesPB.AgentClaim();
// Will always have a payload (never undefined) so cast as string
claimMessage.setPayload(claim.payload as string);
Expand All @@ -32,7 +32,7 @@ function nodesChainDataGet({ nodeManager }: { nodeManager: NodeManager }) {
claimMessage.getSignaturesList().push(signature);
}
// Add the serialized claim
response.getChainDataMap().set(claimId, claimMessage);
response.getChainDataMap().set(claimIdEncoded, claimMessage);
}
callback(null, response);
return;
Expand Down
10 changes: 6 additions & 4 deletions src/agent/service/nodesClosestLocalNodesGet.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type * as grpc from '@grpc/grpc-js';
import type { NodeManager } from '../../nodes';
import type { NodeConnectionManager } from '../../nodes';
import type { NodeId } from '../../nodes/types';
import { utils as grpcUtils } from '../../grpc';
import { utils as nodesUtils } from '../../nodes';
Expand All @@ -12,9 +12,9 @@ import * as nodesPB from '../../proto/js/polykey/v1/nodes/nodes_pb';
* to some provided node ID.
*/
function nodesClosestLocalNodesGet({
nodeManager,
nodeConnectionManager,
}: {
nodeManager: NodeManager;
nodeConnectionManager: NodeConnectionManager;
}) {
return async (
call: grpc.ServerUnaryCall<nodesPB.Node, nodesPB.NodeTable>,
Expand All @@ -38,7 +38,9 @@ function nodesClosestLocalNodesGet({
},
);
// Get all local nodes that are closest to the target node from the request
const closestNodes = await nodeManager.getClosestLocalNodes(nodeId);
const closestNodes = await nodeConnectionManager.getClosestLocalNodes(
nodeId,
);
for (const node of closestNodes) {
const addressMessage = new nodesPB.Address();
addressMessage.setHost(node.address.host);
Expand Down
Loading

0 comments on commit 4aacfb8

Please sign in to comment.