Skip to content

Commit

Permalink
feat: garbage collector
Browse files Browse the repository at this point in the history
  • Loading branch information
vasco-santos committed Jul 13, 2020
1 parent d7290df commit b080670
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

exports.PROTOCOL_MULTICODEC = '/rendezvous/1.0.0'
exports.MAX_NS_LENGTH = 255 // TODO: spec this
exports.MAX_LIMIT = 1000 // TODO: spec this
exports.MAX_LIMIT = 1000
11 changes: 9 additions & 2 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const { Message } = require('./proto')
const MESSAGE_TYPE = Message.MessageType

const defaultServerOptions = {
enabled: true
enabled: true,
gcInterval: 3e5
}

/**
Expand All @@ -40,6 +41,7 @@ class Rendezvous {
* @param {number} [params.discovery.interval = 5000]
* @param {object} [params.server]
* @param {boolean} [params.server.enabled = true]
* @param {number} [params.server.gcInterval = 3e5]
*/
constructor ({ libp2p, options = {} }) {
this._libp2p = libp2p
Expand Down Expand Up @@ -79,7 +81,8 @@ class Rendezvous {

// Create Rendezvous point if enabled
if (this._serverOptions.enabled) {
this._server = new Server({ registrar: this._registrar })
this._server = new Server(this._registrar, this._serverOptions)
this._server.start()
}

// register protocol with topology
Expand Down Expand Up @@ -109,8 +112,12 @@ class Rendezvous {
log('stopping')

clearInterval(this._interval)

// unregister protocol and handlers
await this._registrar.unregister(this._registrarId)
if (this._serverOptions.enabled) {
this._server.stop()
}

this._registrarId = undefined
log('stopped')
Expand Down
55 changes: 50 additions & 5 deletions src/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,69 @@ const rpc = require('./rpc')
class RendezvousServer {
/**
* @constructor
* @param {object} params
* @param {Registrar} params.registrar
* @param {Registrar} registrar
* @param {object} options
* @param {number} options.gcInterval
*/
constructor ({ registrar }) {
constructor (registrar, { gcInterval = 3e5 } = {}) {
this._registrar = registrar
this._gcInterval = gcInterval

/**
* Registrations per namespace.
* @type {Map<string, Map<string, Registration>>}
*/
this.registrations = new Map()
}

/**
* Start rendezvous server for handling rendezvous streams and gc.
* @returns {void}
*/
start () {
if (this._interval) {
return
}

log('starting')

// Garbage collection
this._interval = setInterval(this._gc, this._gcInterval)

// Incoming streams handling
this._registrar.handle(PROTOCOL_MULTICODEC, rpc(this))

log('started')
}

/**
* Stops rendezvous server gc and clears registrations
*/
stop () {
clearInterval(this._interval)
this._interval = undefined
this.registrations.clear()

log('stopped')
}

// TODO: Should we have a start method to gv the expired registrations?
// I am removing them on discover, but it should be useful to have a gc too
/**
* Garbage collector to removed outdated registrations.
* @returns {void}
*/
_gc () {
const now = Date.now()

// Iterate namespaces
this.registrations.forEach((nsRegistrations) => {
// Iterate registrations for namespaces
nsRegistrations.forEach((reg, idStr) => {
if (now >= reg.expiration) {
nsRegistrations.delete(idStr)
}
})
})
}

/**
* Add a peer registration to a namespace.
Expand Down
167 changes: 167 additions & 0 deletions test/server.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
'use strict'
/* eslint-env mocha */

const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-as-promised'))
const { expect } = chai

const delay = require('delay')
const sinon = require('sinon')
const multiaddr = require('multiaddr')

const RendezvousServer = require('../src/server')

const { createPeerId } = require('./utils')

const registrar = {
handle: () => { }
}
const testNamespace = 'test-namespace'
const multiaddrs = [multiaddr('/ip4/127.0.0.1/tcp/0')].map((m) => m.buffer)

describe('rendezvous server', () => {
let rServer
let peerIds

before(async () => {
peerIds = await createPeerId({ number: 3 })
})

afterEach(() => {
rServer && rServer.stop()
})

it('calls registrar handle on start once', () => {
rServer = new RendezvousServer(registrar)

// Spy for handle
const spyHandle = sinon.spy(registrar, 'handle')

rServer.start()
expect(spyHandle).to.have.property('callCount', 1)

rServer.start()
expect(spyHandle).to.have.property('callCount', 1)
})

it('can add registrations to multiple namespaces', () => {
const otherNamespace = 'other-namespace'
rServer = new RendezvousServer(registrar)

// Add registration for peer 1 in test namespace
rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000)
// Add registration for peer 1 in a different namespace
rServer.addRegistration(otherNamespace, peerIds[0], multiaddrs, 1000)

// Add registration for peer 2 in test namespace
rServer.addRegistration(testNamespace, peerIds[1], multiaddrs, 1000)

const testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(2)

const otherNsRegistrations = rServer.getRegistrations(otherNamespace)
expect(otherNsRegistrations).to.have.lengthOf(1)
})

it('should be able to limit registrations to get', () => {
rServer = new RendezvousServer(registrar)

// Add registration for peer 1 in test namespace
rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000)
// Add registration for peer 2 in test namespace
rServer.addRegistration(testNamespace, peerIds[1], multiaddrs, 1000)

let testNsRegistrations = rServer.getRegistrations(testNamespace, 1)
expect(testNsRegistrations).to.have.lengthOf(1)

testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(2)
})

it('can remove registrations from a peer in a given namespace', () => {
rServer = new RendezvousServer(registrar)

// Add registration for peer 1 in test namespace
rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000)
// Add registration for peer 2 in test namespace
rServer.addRegistration(testNamespace, peerIds[1], multiaddrs, 1000)

let testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(2)

// Remove registration for peer0
rServer.removeRegistration(testNamespace, peerIds[0])

testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(1)
})

it('can remove all registrations from a peer', () => {
const otherNamespace = 'other-namespace'
rServer = new RendezvousServer(registrar)

// Add registration for peer 1 in test namespace
rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000)
// Add registration for peer 1 in a different namespace
rServer.addRegistration(otherNamespace, peerIds[0], multiaddrs, 1000)

let testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(1)

let otherNsRegistrations = rServer.getRegistrations(otherNamespace)
expect(otherNsRegistrations).to.have.lengthOf(1)

// Remove all registrations for peer0
rServer.removePeerRegistrations(peerIds[0])

testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(0)

otherNsRegistrations = rServer.getRegistrations(otherNamespace)
expect(otherNsRegistrations).to.have.lengthOf(0)
})

it('can attempt to remove a registration for a non existent namespace', () => {
const otherNamespace = 'other-namespace'
rServer = new RendezvousServer(registrar)

rServer.removeRegistration(otherNamespace, peerIds[0])
})

it('can attempt to remove a registration for a non existent peer', () => {
rServer = new RendezvousServer(registrar)

// Add registration for peer 1 in test namespace
rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 1000)

let testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(1)

// Remove registration for peer0
rServer.removeRegistration(testNamespace, peerIds[1])

testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(1)
})

it('gc expired records', async () => {
rServer = new RendezvousServer(registrar, { gcInterval: 300 })

// Add registration for peer 1 in test namespace
rServer.addRegistration(testNamespace, peerIds[0], multiaddrs, 500)
rServer.addRegistration(testNamespace, peerIds[1], multiaddrs, 1000)

let testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(2)

// wait for firt record to be removed
await delay(650)
testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(1)

await delay(400)
testNsRegistrations = rServer.getRegistrations(testNamespace)
expect(testNsRegistrations).to.have.lengthOf(0)
})
})
14 changes: 14 additions & 0 deletions test/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,20 @@ const defaultConfig = {
}
}

/**
* Create Perr Id.
* @param {Object} [properties]
* @param {number} [properties.number] number of peers (default: 1).
* @return {Promise<Array<PeerId>>}
*/
async function createPeerId ({ number = 1 }) {
const peerIds = await pTimes(number, (i) => PeerId.createFromJSON(Peers[i]))

return peerIds
}

module.exports.createPeerId = createPeerId

/**
* Create libp2p nodes.
* @param {Object} [properties]
Expand Down

0 comments on commit b080670

Please sign in to comment.