Skip to content

Commit

Permalink
Modularize storage - Use cookie instead of timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
mkg20001 committed Apr 15, 2018
1 parent 6ca7f6f commit 42dd587
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 57 deletions.
14 changes: 7 additions & 7 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,24 @@ class RendezvousDiscovery {
this._rpc('register', ns, peer, 0, cb) // TODO: interface does not expose ttl option?!
}

discover (ns, limit, since, cb) {
if (typeof since === 'function') {
cb = since
since = 0
discover (ns, limit, cookie, cb) {
if (typeof cookie === 'function') {
cb = cookie
cookie = Buffer.from('')
}
if (typeof limit === 'function') {
since = 0
cookie = Buffer.from('')
cb = limit
limit = 0
}
if (typeof ns === 'function') {
since = 0
cookie = Buffer.from('')
limit = 0
cb = ns
ns = null
}

this._rpc('discover', ns, limit, since, cb)
this._rpc('discover', ns, limit, cookie, cb)
}

unregister (ns, id) {
Expand Down
4 changes: 2 additions & 2 deletions src/proto.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ module.exports = protons(`
message Discover {
optional string ns = 1;
optional int64 limit = 2;
optional int64 since = 3;
optional bytes cookie = 3;
}
message DiscoverResponse {
repeated Register registrations = 1;
optional int64 timestamp = 2;
optional bytes cookie = 2;
}
message Message {
Expand Down
5 changes: 3 additions & 2 deletions src/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class RPC {
sink (read) {
const next = (end, msg, doend) => {
if (doend) {
log('crash@%s: %s', this.id, doend)
return read(doend, next)
}
if (end) {
Expand Down Expand Up @@ -131,13 +132,13 @@ class RPC {
this.cbs.register.push(wrap(cb, TIMEOUT))
}

discover (ns, limit, since, cb) {
discover (ns, limit, cookie, cb) {
this.source.push({
type: MessageType.DISCOVER,
discover: {
ns,
limit,
since
cookie
}
})
this.cbs.discover.push(wrap(cb, TIMEOUT))
Expand Down
50 changes: 6 additions & 44 deletions src/server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,47 +5,7 @@ const RPC = require('./rpc')
const debug = require('debug')
const log = debug('libp2p:rendezvous:server')
const AsyncQueue = require('./queue')
const MAX_LIMIT = 1000 // TODO: spec this

class NS {
constructor (name, que) { // name is a utf8 string
this.name = name
this.hexName = Buffer.from(name).toString('hex') // needed to prevent queue-dos attacks
this.que = que
this.id = {}
this.sorted = []
}
addPeer (pi, ts, ttl, isOnline) { // isOnline returns a bool if the rpc connection still exists
const id = pi.id.toB58String()
this.id[id] = {pi, ts, ttl}
if (ttl) {
let expireAt = ts + ttl * 1000
this.id[id].online = () => Date.now() >= expireAt
} else {
this.id[id].online = isOnline
}
this.update()
}
removePeer (pid) {
delete this.id[pid]
this.update()
}
update () {
this.que.add('sort@' + this.hexName, () => {
this.sorted = Object.keys(this.id).map(id => { return {id, ts: this.id[id].ts} }).sort((a, b) => a.ts - b.ts)
})
}
getPeers (since, limit, ownId) {
if (limit <= 0 || limit > MAX_LIMIT) limit = MAX_LIMIT
return this.sorted.filter(p => p.ts > since && p.id !== ownId).slice(0, limit).map(p => this.id[p.id])
}
gc () {
return Object.keys(this.id).filter(k => !this.id[k].online()).map(k => delete this.id[k]).length
}
get useless () {
return !Object.keys(this.id).length
}
}
const BasicStore = require('./store/basic')

class Server {
constructor (opt) {
Expand All @@ -57,7 +17,9 @@ class Server {
NS: {},
RPC: {}
}
this._stubNS = new NS('', this.que)
const Store = opt.store || BasicStore
this.store = new Store(this)
this._stubNS = this.store.create(Buffer.alloc(256, '0').toString())
}

start () {
Expand All @@ -83,10 +45,10 @@ class Server {
// TODO: remove on disconnect
}

getNS (name, create) { // TODO: remove NSs that get empty
getNS (name, create) {
if (!this.table.NS[name]) {
if (create) {
return (this.table.NS[name] = new NS(name, this.que))
return (this.table.NS[name] = this.store.create(name))
} else {
return this._stubNS
}
Expand Down
7 changes: 5 additions & 2 deletions src/server/rpc.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const Peer = require('peer-info')
const Id = require('peer-id')

const MAX_NS_LENGTH = 255 // TODO: spec this
const MAX_LIMIT = 1000 // TODO: spec this

class RPC {
constructor (main) {
Expand All @@ -19,6 +20,7 @@ class RPC {
sink (read) {
const next = (end, msg, doend) => {
if (doend) {
log('crash@%s: %s', this.id, doend)
return read(doend, next)
}
if (end) {
Expand Down Expand Up @@ -87,7 +89,8 @@ class RPC {
case MessageType.DISCOVER:
try {
log('discover@%s: discover on %s', this.id, msg.discover.ns)
const peers = this.main.getNS(msg.discover.ns).getPeers(msg.discover.since || 0, msg.discover.limit, this.id) // TODO: add a max-limit to avoid dos?
if (msg.discover.limit <= 0 || msg.discover.limit > MAX_LIMIT) msg.discover.limit = MAX_LIMIT
const {peers, cookie} = this.main.getNS(msg.discover.ns).getPeers(msg.discover.cookie || Buffer.from(''), msg.discover.limit, this.id)
log('discover@%s: got %s peers', this.id, peers.length)
this.source.push({
type: MessageType.DISCOVER_RESPONSE,
Expand All @@ -102,7 +105,7 @@ class RPC {
ttl: p.ttl
}
}),
timestamp: peers.length ? peers.pop().ts : null
cookie
}
})
} catch (e) {
Expand Down
60 changes: 60 additions & 0 deletions src/server/store/basic/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
'use strict'

class NS {
constructor (name, que) { // name is a utf8 string
this.name = name
this.hexName = Buffer.from(name).toString('hex') // needed to prevent queue-dos attacks
this.que = que
this.id = {}
this.sorted = []
}
addPeer (pi, ts, ttl, isOnline) { // isOnline returns a bool if the rpc connection still exists
const id = pi.id.toB58String()
this.id[id] = {pi, ts, ttl}
if (ttl) {
let expireAt = ts + ttl * 1000
this.id[id].online = () => Date.now() >= expireAt
} else {
this.id[id].online = isOnline
}
this.update()
}
removePeer (pid) {
delete this.id[pid]
this.update()
}
update () {
this.que.add('sort@' + this.hexName, () => {
this.sorted = Object.keys(this.id).map(id => { return {id, ts: this.id[id].ts} }).sort((a, b) => a.ts - b.ts)
})
}
getPeers (cookie, limit, ownId) {
cookie = cookie.length ? parseInt(cookie.toString(), 10) : 0
let p = this.sorted.filter(p => p.ts > cookie && p.id !== ownId).slice(0, limit).map(p => this.id[p.id])
let newCookie
if (p.length) {
newCookie = Buffer.from(p[p.length - 1].ts.toString())
} else {
newCookie = Buffer.from('')
}
return {cookie: newCookie, peers: p}
}
gc () {
return Object.keys(this.id).filter(k => !this.id[k].online()).map(k => delete this.id[k]).length
}
get useless () {
return !Object.keys(this.id).length
}
}

class BasicStore {
constructor (main) {
this.main = main
}
create (name) {
return new NS(name, this.main.que)
}
}

module.exports = BasicStore
module.exports.NS = NS

0 comments on commit 42dd587

Please sign in to comment.