diff --git a/src/constants.js b/src/constants.js index 3f063b7..7d509d8 100644 --- a/src/constants.js +++ b/src/constants.js @@ -2,4 +2,4 @@ exports.PROTOCOL_MULTICODEC = '/rendezvous/1.0.0' exports.MAX_NS_LENGTH = 255 // TODO: spec this -exports.MAX_LIMIT = 1000 // TODO: spec this +exports.MAX_LIMIT = 1000 diff --git a/src/index.js b/src/index.js index b6e1017..12ae4c1 100644 --- a/src/index.js +++ b/src/index.js @@ -22,7 +22,8 @@ const { Message } = require('./proto') const MESSAGE_TYPE = Message.MessageType const defaultServerOptions = { - enabled: true + enabled: true, + gcInterval: 3e5 } /** @@ -40,6 +41,7 @@ class Rendezvous { * @param {number} [params.discovery.interval = 5000] * @param {object} [params.server] * @param {boolean} [params.server.enabled = true] + * @param {number} [params.server.gcInterval = 3e5] */ constructor ({ libp2p, options = {} }) { this._libp2p = libp2p @@ -79,7 +81,8 @@ class Rendezvous { // Create Rendezvous point if enabled if (this._serverOptions.enabled) { - this._server = new Server({ registrar: this._registrar }) + this._server = new Server(this._registrar, this._serverOptions) + this._server.start() } // register protocol with topology @@ -109,8 +112,12 @@ class Rendezvous { log('stopping') clearInterval(this._interval) + // unregister protocol and handlers await this._registrar.unregister(this._registrarId) + if (this._serverOptions.enabled) { + this._server.stop() + } this._registrarId = undefined log('stopped') diff --git a/src/server/index.js b/src/server/index.js index 0786ccb..0d4f88e 100644 --- a/src/server/index.js +++ b/src/server/index.js @@ -21,24 +21,69 @@ const rpc = require('./rpc') class RendezvousServer { /** * @constructor - * @param {object} params - * @param {Registrar} params.registrar + * @param {Registrar} registrar + * @param {object} options + * @param {number} options.gcInterval */ - constructor ({ registrar }) { + constructor (registrar, { gcInterval = 3e5 } = {}) { this._registrar = registrar + this._gcInterval = gcInterval /** * Registrations per namespace. * @type {Map>} */ this.registrations = new Map() + } + + /** + * Start rendezvous server for handling rendezvous streams and gc. + * @returns {void} + */ + start () { + if (this._interval) { + return + } + + log('starting') + + // Garbage collection + this._interval = setInterval(this._gc, this._gcInterval) // Incoming streams handling this._registrar.handle(PROTOCOL_MULTICODEC, rpc(this)) + + log('started') + } + + /** + * Stops rendezvous server gc and clears registrations + */ + stop () { + clearInterval(this._interval) + this._interval = undefined + this.registrations.clear() + + log('stopped') } - // TODO: Should we have a start method to gv the expired registrations? - // I am removing them on discover, but it should be useful to have a gc too + /** + * Garbage collector to removed outdated registrations. + * @returns {void} + */ + _gc () { + const now = Date.now() + + // Iterate namespaces + this.registrations.forEach((nsRegistrations) => { + // Iterate registrations for namespaces + nsRegistrations.forEach((reg, idStr) => { + if (now >= reg.expiration) { + nsRegistrations.delete(idStr) + } + }) + }) + } /** * Add a peer registration to a namespace. diff --git a/test/server.spec.js b/test/server.spec.js new file mode 100644 index 0000000..da3128a --- /dev/null +++ b/test/server.spec.js @@ -0,0 +1,167 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +chai.use(require('chai-as-promised')) +const { expect } = chai + +const delay = require('delay') +const sinon = require('sinon') +const multiaddr = require('multiaddr') + +const RendezvousServer = require('../src/server') + +const { createPeerId } = require('./utils') + +const registrar = { + handle: () => { } +} +const testNamespace = 'test-namespace' +const multiaddrs = [multiaddr('/ip4/127.0.0.1/tcp/0')].map((m) => m.buffer) + +describe('rendezvous server', () => { + let rServer + let peerIds + + before(async () => { + peerIds = await createPeerId({ number: 3 }) + }) + + afterEach(() => { + rServer && rServer.stop() + }) + + it('calls registrar handle on start once', () => { + rServer = new RendezvousServer(registrar) + + // Spy for handle + const spyHandle = sinon.spy(registrar, 'handle') + + rServer.start() + expect(spyHandle).to.have.property('callCount', 1) + + rServer.start() + expect(spyHandle).to.have.property('callCount', 1) + }) + + it('can add registrations to multiple namespaces', () => { + const otherNamespace = 'other-namespace' + rServer = new RendezvousServer(registrar) + + // Add registration for peer 1 in test namespace + rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000) + // Add registration for peer 1 in a different namespace + rServer.addRegistration(otherNamespace, peerIds[0], multiaddrs, 1000) + + // Add registration for peer 2 in test namespace + rServer.addRegistration(testNamespace, peerIds[1], multiaddrs, 1000) + + const testNsRegistrations = rServer.getRegistrations(testNamespace) + expect(testNsRegistrations).to.have.lengthOf(2) + + const otherNsRegistrations = rServer.getRegistrations(otherNamespace) + expect(otherNsRegistrations).to.have.lengthOf(1) + }) + + it('should be able to limit registrations to get', () => { + rServer = new RendezvousServer(registrar) + + // Add registration for peer 1 in test namespace + rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000) + // Add registration for peer 2 in test namespace + rServer.addRegistration(testNamespace, peerIds[1], multiaddrs, 1000) + + let testNsRegistrations = rServer.getRegistrations(testNamespace, 1) + expect(testNsRegistrations).to.have.lengthOf(1) + + testNsRegistrations = rServer.getRegistrations(testNamespace) + expect(testNsRegistrations).to.have.lengthOf(2) + }) + + it('can remove registrations from a peer in a given namespace', () => { + rServer = new RendezvousServer(registrar) + + // Add registration for peer 1 in test namespace + rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000) + // Add registration for peer 2 in test namespace + rServer.addRegistration(testNamespace, peerIds[1], multiaddrs, 1000) + + let testNsRegistrations = rServer.getRegistrations(testNamespace) + expect(testNsRegistrations).to.have.lengthOf(2) + + // Remove registration for peer0 + rServer.removeRegistration(testNamespace, peerIds[0]) + + testNsRegistrations = rServer.getRegistrations(testNamespace) + expect(testNsRegistrations).to.have.lengthOf(1) + }) + + it('can remove all registrations from a peer', () => { + const otherNamespace = 'other-namespace' + rServer = new RendezvousServer(registrar) + + // Add registration for peer 1 in test namespace + rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000) + // Add registration for peer 1 in a different namespace + rServer.addRegistration(otherNamespace, peerIds[0], multiaddrs, 1000) + + let testNsRegistrations = rServer.getRegistrations(testNamespace) + expect(testNsRegistrations).to.have.lengthOf(1) + + let otherNsRegistrations = rServer.getRegistrations(otherNamespace) + expect(otherNsRegistrations).to.have.lengthOf(1) + + // Remove all registrations for peer0 + rServer.removePeerRegistrations(peerIds[0]) + + testNsRegistrations = rServer.getRegistrations(testNamespace) + expect(testNsRegistrations).to.have.lengthOf(0) + + otherNsRegistrations = rServer.getRegistrations(otherNamespace) + expect(otherNsRegistrations).to.have.lengthOf(0) + }) + + it('can attempt to remove a registration for a non existent namespace', () => { + const otherNamespace = 'other-namespace' + rServer = new RendezvousServer(registrar) + + rServer.removeRegistration(otherNamespace, peerIds[0]) + }) + + it('can attempt to remove a registration for a non existent peer', () => { + rServer = new RendezvousServer(registrar) + + // Add registration for peer 1 in test namespace + rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000) + + let testNsRegistrations = rServer.getRegistrations(testNamespace) + expect(testNsRegistrations).to.have.lengthOf(1) + + // Remove registration for peer0 + rServer.removeRegistration(testNamespace, peerIds[1]) + + testNsRegistrations = rServer.getRegistrations(testNamespace) + expect(testNsRegistrations).to.have.lengthOf(1) + }) + + it('gc expired records', async () => { + rServer = new RendezvousServer(registrar, { gcInterval: 300 }) + + // Add registration for peer 1 in test namespace + rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 500) + rServer.addRegistration(testNamespace, peerIds[1], multiaddrs, 1000) + + let testNsRegistrations = rServer.getRegistrations(testNamespace) + expect(testNsRegistrations).to.have.lengthOf(2) + + // wait for firt record to be removed + await delay(650) + testNsRegistrations = rServer.getRegistrations(testNamespace) + expect(testNsRegistrations).to.have.lengthOf(1) + + await delay(400) + testNsRegistrations = rServer.getRegistrations(testNamespace) + expect(testNsRegistrations).to.have.lengthOf(0) + }) +}) diff --git a/test/utils.js b/test/utils.js index 1504cd5..d6db54f 100644 --- a/test/utils.js +++ b/test/utils.js @@ -23,6 +23,20 @@ const defaultConfig = { } } +/** + * Create Perr Id. + * @param {Object} [properties] + * @param {number} [properties.number] number of peers (default: 1). + * @return {Promise>} + */ +async function createPeerId ({ number = 1 }) { + const peerIds = await pTimes(number, (i) => PeerId.createFromJSON(Peers[i])) + + return peerIds +} + +module.exports.createPeerId = createPeerId + /** * Create libp2p nodes. * @param {Object} [properties]