diff --git a/munkey/command.ts b/munkey/command.ts index 9392614..b2693ec 100644 --- a/munkey/command.ts +++ b/munkey/command.ts @@ -105,7 +105,6 @@ abstract class CommandServer { return this.onVaultLink(hostname, portNum, vaultName); }, - "sync": this.onVaultSync.bind(this), }, "link": { "up": this.onLinkUp.bind(this), @@ -229,7 +228,6 @@ abstract class CommandServer { abstract onGetVaultEntry(entryKey: string): Promise; abstract onListVaults(): Promise; abstract onVaultLink(hostname: string, portNum: number, vaultName: string, vaultNickname?: string): Promise; - abstract onVaultSync(): Promise; abstract onLinkUp(): Promise; abstract onLinkDown(): Promise; @@ -257,9 +255,13 @@ class ShellCommandServer extends CommandServer { return console.error(`Cannot create vault ${vaultName} (already exists)`); } - const vaultId: string | null = await this.services.vault.createVault(vaultName); - vaultId && this.services.vault.subscribeVaultById(vaultId, () => - this.services.vault.syncActiveVaults(vaultId, this.services.connection)); + try { + const vaultId: string | null = await this.services.vault.createVault(vaultName); + console.info(`Vault created with ID ${vaultId}`); + } + catch (err) { + console.error(err); + } } async onUseVault(vaultName: string): Promise { @@ -289,7 +291,7 @@ class ShellCommandServer extends CommandServer { console.info(":: :: Remote Vaults :: ::"); for (let [name, url] of this.services.connection.getAllConnections()) { - console.info(` ${url.name} = RemoteVault[${name}]`); + console.info(` ${url} = RemoteVault[${name}]`); } } @@ -319,7 +321,6 @@ class ShellCommandServer extends CommandServer { _rev, entries: { ...entries, [entryKey]: data }, }).catch(err => console.error(err)); - await this.services.vault.syncActiveVaults(vaultId, this.services.connection); } async onGetVaultEntry(entryKey: string): Promise { @@ -367,15 +368,15 @@ class ShellCommandServer extends CommandServer { // Query the APL to find the vault ID with that nickname. let { vaultId = null } = activeDevice?.vaults.find(vault => vault.nickname === vaultName) ?? {}; if (vaultId) { - this.services.connection.publishDatabaseConnection({ hostname, portNum }, vaultName, vaultId); - let localVault: PouchDB.Database | null = this.services.vault.getVaultById(vaultId); - // TODO: use the queried vault information to validate the vault. - if (!localVault) { - // TODO: allow the user to specify their own local nickname. - // Relying on the remote database's vault name is subject to collisions. + try { vaultId = await this.services.vault.createVault(vaultNickname, vaultId); - vaultId && this.services.vault.subscribeVaultById(vaultId, () => - this.services.vault.syncActiveVaults(vaultId, this.services.connection)); + let localVault = this.services.vault.getVaultById(vaultId); + let remoteConn = this.services.connection + .publishDatabaseConnection({ hostname, portNum }, vaultName, vaultId, localVault); + remoteConn.catch(err => console.error(err)); + } + catch (err) { + console.error("Failed to create local vault: ", err.message); } } else { @@ -383,13 +384,6 @@ class ShellCommandServer extends CommandServer { } } - async onVaultSync(): Promise { - const vaultId: string = this.services.vault.getActiveVaultId(); - if (await this.services.vault.syncActiveVaults(vaultId, this.services.connection) <= 0) { - console.error("No remote vaults found"); - } - } - async onLinkUp(): Promise { await this.services.web.listen() .catch(() => console.error("Failed to open server")); @@ -418,7 +412,7 @@ class ShellCommandServer extends CommandServer { } async onPeerList(): Promise { - for (let [hostname, portNum, identity] of this.services.activity.getAllDevices()) { + for (let [{ hostname, portNum }, identity] of this.services.activity.getAllDevices()) { console.info(` Peer[${identity.uniqueId}]@${hostname}:${portNum}`); for (let vault of identity.vaults) { console.info(`\t* "${vault.nickname}": Vault[${vault.vaultId}]`); diff --git a/munkey/discovery.ts b/munkey/discovery.ts index dccda70..32bc8f1 100644 --- a/munkey/discovery.ts +++ b/munkey/discovery.ts @@ -22,6 +22,10 @@ interface DeviceDiscoveryDecl { portNum: number; } +interface PeerLinkResponse extends PeerIdentityDecl { + activePeerList: DeviceDiscoveryDecl[]; +} + function isPeerIdentityDecl(decl: Object): decl is PeerIdentityDecl { return decl && ("uniqueId" in decl) && @@ -33,11 +37,27 @@ function isPeerIdentityDecl(decl: Object): decl is PeerIdentityDecl { )); } +function isDeviceDiscoveryDecl(decl: Object): decl is DeviceDiscoveryDecl { + return decl && + ("hostname" in decl) && + ("portNum" in decl); +} + +function isPeerLinkResponse(decl: Object): decl is PeerLinkResponse { + return isPeerIdentityDecl(decl) && + ("activePeerList" in decl) && + (Array.isArray((decl as PeerLinkResponse).activePeerList)) && + ((decl as PeerLinkResponse).activePeerList.every(peer => isDeviceDiscoveryDecl(peer))); +} + export { PeerVaultDecl, PeerIdentityDecl, DeviceDiscoveryDecl, + PeerLinkResponse, /* Validation Functions */ isPeerIdentityDecl, + isDeviceDiscoveryDecl, + isPeerLinkResponse, }; diff --git a/munkey/munkey.ts b/munkey/munkey.ts index 9aac2ca..bcd3b4c 100644 --- a/munkey/munkey.ts +++ b/munkey/munkey.ts @@ -39,16 +39,17 @@ import { const uniformPrint = winston.format.printf(function( info: winston.Logform.TransformableInfo & { label: string, timestamp: string }): string { - return `[${info.level}::${info.label}] ${info.message}`; + let { level, label, message } = info; + return `[${level}::${label}] ${message}`; }); + const addUniformLogger = function(serviceName: string): winston.Logger { winston.loggers.add(serviceName, { format: winston.format.combine( winston.format.splat(), winston.format.colorize(), winston.format.label({ label: serviceName }), - winston.format.timestamp(), uniformPrint, ), transports: [ diff --git a/munkey/services.ts b/munkey/services.ts index 2b2efc6..8282dcc 100644 --- a/munkey/services.ts +++ b/munkey/services.ts @@ -7,8 +7,8 @@ import { DeviceDiscoveryDecl, - isPeerIdentityDecl, - PeerIdentityDecl, + isPeerLinkResponse, + PeerIdentityDecl, PeerLinkResponse, PeerVaultDecl, } from "./discovery"; @@ -36,6 +36,9 @@ interface DatabaseDocument { entries?: { [entry: string]: string }; } +type VaultDB = PouchDB.Database; +type VaultSyncToken = PouchDB.Replication.Sync; + /** * @name generateNewIdentity * @summary Create a brand-new identity object (as of v0.0.1, just a string) from random. @@ -74,9 +77,10 @@ function configureRoutes( request, response: express.Response) { - const identityResponse: PeerIdentityDecl = { + const identityResponse: PeerIdentityDecl & { activePeerList: DeviceDiscoveryDecl[] } = { uniqueId: services.identity.getId(), vaults: await services.vault.getActiveVaultList(), + activePeerList: services.activity.getDeviceList(), }; response.json(identityResponse).end(); @@ -139,7 +143,10 @@ class VaultService extends Service { vaultId ??= (this.vaultIdMap.get(vaultName) || null); let vault: PouchDB.Database | null = vaultId && this.vaultMap.get(vaultId) || null; - if (!vault) { + if (this.vaultIdMap.get(vaultName) && this.vaultIdMap.get(vaultName) !== vaultId) { + throw new Error(`Name conflict; local nickname ${vaultName} already exists`); + } + else if (!vault) { // Vault not found; create it and initialize its schema. this.vaultIdMap.set(vaultName, vaultId ??= randomUUID()); this.vaultMap.set(vaultId, vault = new MemoryDB(vaultName)); @@ -261,43 +268,6 @@ class VaultService extends Service { } return vaultList; } - - /** - * @name syncActiveVaults - * @public - * @function - * - * @description Synchronize the contents of the specified vault with all active remote connections. - * - * @param vaultId {string} UUID of the local vault to replicate from. - * @param connections {ConnectionService} Service container to pull replication targets from. - * All remote vaults with the same UUID as the provided vaultId will be replicated. - * - * @returns {number} Number of vaults synchronized. - */ - public async syncActiveVaults(vaultId: string, connections: ConnectionService): Promise { - const vault: PouchDB.Database = this.getVaultById(vaultId); - - let syncCount: number = 0; - for (let connection of connections.getActiveConnections(vaultId)) { - this.logger.info("Syncing with peer %s", connection.name); - await vault.sync(connection); - syncCount++; - } - - return syncCount; - } - - public subscribeVaultById(vaultId: string, - callback: (value: PouchDB.Core.ChangesResponseChange) => void) - { - const vault: PouchDB.Database = this.getVaultById(vaultId); - if (vault) { - this.logger.info("Change subscription created for vault %s", vaultId); - vault?.changes({ live: true }) - .on("change", callback); - } - } } /** @@ -338,13 +308,16 @@ class IdentityService extends Service { * Note that active connections are not handled by this container, only locational entries. */ class ActivityService extends Service { + public static readonly peerListId = "apl"; private readonly activePeerList: Map; private readonly discoveryPool: Map; + private readonly _activePeerList: PouchDB.Database; constructor() { super(); this.activePeerList = new Map(); this.discoveryPool = new Map(); + this._activePeerList = new MemoryDB(ActivityService.peerListId); } /** @@ -367,7 +340,7 @@ class ActivityService extends Service { */ private async sendLinkRequest( hostname: string, - portNum: number): Promise + portNum: number): Promise { const logger = this.logger; const peerResponse: string|null = await new Promise(function(resolve, reject) { @@ -385,20 +358,13 @@ class ActivityService extends Service { .on("error", (err: NodeJS.ErrnoException) => { if (err.code === "ECONNREFUSED") { logger.error("Connection Refused %s:%d", hostname, portNum); - resolve(null); - } - else { - reject(err); } + reject(err); }); - }) - .catch(err => { - logger.error(err); - return null; }); const parsedResponse = peerResponse && JSON.parse(peerResponse); - return isPeerIdentityDecl(parsedResponse) ? parsedResponse : null; + return isPeerLinkResponse(parsedResponse) ? parsedResponse : null; } /** @@ -421,17 +387,36 @@ class ActivityService extends Service { * It is discouraged to "block" execution (i.e. `await`) based on the result. * * @param device {DeviceDiscoveryDecl} Device discovery record to publish to the APL. + * @param {Set} deviceMask Optional set of already-discovered devices. + * Used to limit the depth of recursively-published devices and prevent infinite loops. + * Any hosts listed in the given set are skipped during the recursive publish search. * @returns {Promise} Promise which resolves to an APL entry, * or null if the device endpoint is deemed invalid. */ - public publishDevice(device: DeviceDiscoveryDecl): Promise + public publishDevice(device: DeviceDiscoveryDecl, deviceMask?: Set): Promise { + deviceMask ??= new Set(); + + this.logger.info("Attempting to publish peer device %s:%d", device.hostname, device.portNum); return this.sendLinkRequest(device.hostname, device.portNum) - .then(decl => { + .then(async decl => { + this.logger.info("Published peer device %s:%d", device.hostname, device.portNum); this.activePeerList.set(`${device.hostname}:${device.portNum}`, decl); + let { activePeerList = [] } = decl ?? {}; + activePeerList = activePeerList.filter( + ({ hostname, portNum }) => !deviceMask.has(`${hostname}:${portNum}`) + ); + activePeerList.forEach(({ hostname, portNum }) => deviceMask.add(`${hostname}:${portNum}`)); + + for (let peerDevice of activePeerList) { + this.logger.info("Discovered peer device %s:%d", peerDevice.hostname, peerDevice.portNum); + await this.publishDevice(peerDevice, deviceMask); + } + return decl as PeerIdentityDecl; }) .catch(err => { + this.activePeerList.delete(`${device.hostname}:${device.portNum}`); this.logger.error(err); return null; }); @@ -482,12 +467,18 @@ class ActivityService extends Service { * @returns Iterator over tuple: (hostname, portNum, identityDocument). * Each tuple represents a single entry in the APL. */ - public *getAllDevices(): Generator<[string, number, PeerIdentityDecl]> { + public *getAllDevices(): Generator<[DeviceDiscoveryDecl, PeerIdentityDecl]> { for (let [location, identity] of this.activePeerList) { const [hostname, portNum]: string[] = location.split(":", 2); - yield [hostname, parseInt(portNum), identity]; + yield [{ hostname, portNum: parseInt(portNum) }, identity]; } } + + public getDeviceList(): DeviceDiscoveryDecl[] { + return Array + .from(this.getAllDevices()) + .map( ([device]) => device ); + } } /** @@ -509,19 +500,67 @@ class ConnectionService extends Service { * @summary Map containing active database connection objects. * Map keys are the UUID of the remote vault, values are the connections themselves. */ - private readonly connections: Map[]>; + private readonly connections: Map>; constructor() { super(); - this.connections = new Map[]>(); + this.connections = new Map>(); + } + + public publishDatabaseConnection( + device: DeviceDiscoveryDecl, + vaultName: string, + vaultId: string, + localVault: VaultDB): VaultSyncToken + { + let connectionMap = this.getOrCreateMap(vaultId); + let connectionKey = `${device.hostname}:${device.portNum}`; + let connectionUrl = `http://${connectionKey}/db/${vaultName}` + + if (!connectionMap.get(connectionKey)) { + this.logger.info("Adding remote connection to %s", connectionKey); + + localVault.replicate.from(connectionUrl); + let connection = localVault.sync(connectionUrl, { live: true, }); + connection + .on("change", info => this.logger.info("Changes received", info)) + .on("error", err => { + this.logger.error("Error in Sync", err); + this.removeRemoteConnection(vaultId, device); + }) + .on("paused", err => this.logger.error("Sync Paused", err)) + .on("complete", err => this.logger.error("Sync Finished", err)) + .catch(err => { + this.logger.error("Rejected Promise in Sync", err); + }); + + return connectionMap.set(connectionKey, connection).get(connectionKey); + } + else { + this.logger.warn("Cannot add remote connection to %s, already exists", connectionKey); + return connectionMap.get(connectionKey); + } } - public publishDatabaseConnection(device: DeviceDiscoveryDecl, vaultName: string, vaultId: string) { - let connectionList: PouchDB.Database[] | null = this.connections.get(vaultId) || null; - if (!connectionList) { - connectionList = this.connections.set(vaultId, []).get(vaultId); + public removeRemoteConnection(vaultId: string, device: DeviceDiscoveryDecl): boolean { + let connectionMap = this.connections.get(vaultId) || null; + let connectionKey = `${device.hostname}:${device.portNum}`; + + if (connectionMap) { + connectionMap.get(connectionKey).cancel(); + return connectionMap + .delete(connectionKey); } - connectionList.push(new PouchDB(`http://${device.hostname}:${device.portNum}/db/${vaultName}`)); + return false; + } + + private getOrCreateMap(vaultId: string): Map { + let connectionMap; + return (connectionMap = this.connections.get(vaultId) || null) + ? connectionMap + : this.connections + .set(vaultId, new Map()) + .get(vaultId); } /** @@ -529,28 +568,18 @@ class ConnectionService extends Service { * @public * @function * - * @summary Iterate over (id, database) pairs of active database connections. + * @summary Iterate over (id, remoteUrl) pairs of active database connections. */ - public *getAllConnections(): Generator<[string, PouchDB.Database]> { - for (let [connection, databaseList] of this.connections) { - for (let database of databaseList) { + public *getAllConnections(): Generator<[string, string]> { + for (let [vaultId, connectionList] of this.connections) { + for (let [connectionKey] of connectionList) { yield [ - connection, - database + vaultId, + connectionKey ]; } } } - - public *getActiveConnections(vaultId: string): Generator> { - for (let [connId, connectionList] of this.connections) { - if (connId === vaultId) { - for (let database of connectionList) { - yield database; - } - } - } - } } class WebService extends Service {