Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/replication api #26

Closed
wants to merge 12 commits into from
46 changes: 33 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,20 +125,40 @@ With [npm](https://npmjs.org/) installed, run
$ npm install multifeed
```

## Hacks

1. `hypercore-protocol` requires the first feed exchanged to be common between
replicating peers. This prevents two strangers from exchanging sets of
hypercores. A "fake" hypercore with a hardcoded public key is included in the
code to bootstrap the replication process. I discarded the private key, but
even if I didn't, it doesn't let me do anything nefarious. You could patch
this with your own key of choice.
2. `hypercore-protocol` requires all feed keys be known upfront: only discovery
keys are exchanged (`discoveryKey = hash(key)`), so this module wraps the
hypercore replication duplex stream in a secondary duplex stream that
exchanges feed public keys upfront before moving on to the hypercore
replication mechanism.
## Replication Policies

You can control which feeds get replicated by providing a policy object:

```js

var randomReplication = {
init: function(multifeed) {
// called on multifeed.ready
// use it for initalization.
},
have: function(local, share) {
// called on peer connection
// select which feeds to share
share(local.keys, {
random: local.keys.map(function() { return Math.random() - 0.5 })
})
},
want: function(remote, request) {
// called when remote peer's
// share list is available.
// remote.keys is always available
// remote contains also all custom props sent by remote.
var keys = remote.keys.filter(function(k, i){
return remote.random[i] > 0.5
})
request(keys)
}
}

var multi = multifeed(hypercore, './db', { valueEncoding: 'json' })
multi.use(randomReplication)

```
## See Also

- [multifeed-index](https://github.com/noffle/multifeed-index)
Expand Down
124 changes: 87 additions & 37 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ function Multifeed (hypercore, storage, opts) {
this._feedKeyToFeed = {}

this._hypercore = hypercore
this._opts = opts

this._opts = opts || {}
this._middleware = null
this.writerLock = mutexify()
this._middleware = []

this.key = new Buffer('bee80ff3a4ee5e727dc44197cb9d25bf8f19d50b0f3ad2984cfe5b7d14e75de7', 'hex')
if (this._opts.key) this.key = Buffer.from(this._opts.key)
else debug('Warning, running multifeed with unsecure default key')

this.closed = false

Expand All @@ -34,20 +39,9 @@ function Multifeed (hypercore, storage, opts) {

var self = this
this._ready = readyify(function (done) {
// Private key-less constant hypercore to bootstrap hypercore-protocol
// replication.
var protocolEncryptionKey = new Buffer('bee80ff3a4ee5e727dc44197cb9d25bf8f19d50b0f3ad2984cfe5b7d14e75de7', 'hex')
if (self._opts.key) protocolEncryptionKey = Buffer.from(self._opts.key)
else debug('Warning, running multifeed with unsecure default key')

var feed = hypercore(self._storage('_fake'), protocolEncryptionKey)

feed.ready(function () {
self._fake = feed
self._loadFeeds(function () {
debug('[INIT] finished loading feeds')
done()
})
self._loadFeeds(function () {
debug('[INIT] finished loading feeds')
done()
})
})
}
Expand Down Expand Up @@ -76,7 +70,7 @@ Multifeed.prototype.close = function (cb) {
})
}

var feeds = values(self._feeds).concat(self._fake)
var feeds = values(self._feeds)

function next (n) {
if (n >= feeds.length) {
Expand Down Expand Up @@ -161,7 +155,7 @@ Multifeed.prototype.writer = function (name, cb) {
feed.ready(function () {
self._addFeed(feed, String(idx))
release(function () {
if (err) cb(err)
if (err) return cb(err)
else cb(null, feed, idx)
})
})
Expand All @@ -183,19 +177,39 @@ Multifeed.prototype.feed = function (key) {
Multifeed.prototype.replicate = function (opts) {
if (!opts) opts = {}
var self = this
var mux = multiplexer(self._fake.key, opts)
var mux = multiplexer(self.key, opts)

// Add key exchange listener
mux.once('manifest', function(m) {
mux.wantFeeds(m.keys)
if (self._middleware.length) {
function callPlug(i, ctx) {
if (self._middleware.length === i) return mux.wantFeeds(ctx.keys)
var plug = self._middleware[i]

// Reliquish control to next if plug does not implement callback
if (typeof plug.want !== 'function') return callPlug(i + 1, ctx)

// give each plug a fresh reference to avoid peeking/postmodifications
plug.want.bind(self)(clone(ctx), function(keys) {
let n = clone(m)
n.keys = keys
callPlug(i + 1, n)
})
}
// Start loop
callPlug(0, m)
} else {
// default behaviour "want all"
mux.wantFeeds(m.keys)
}
})

// Add replication listener
mux.once('replicate', function(keys, repl) {
addMissingKeys(keys, function(err){
if(err) return mux.destroy(err)

var key2feed = values(self._feeds).reduce(function(h,feed){
var key2feed = values(self._feeds).reduce(function(h, feed){
h[feed.key.toString('hex')] = feed
return h
},{})
Expand All @@ -210,8 +224,28 @@ Multifeed.prototype.replicate = function (opts) {
if (err) return mux.stream.destroy(err)
if (mux.stream.destroyed) return
mux.ready(function(){
var keys = values(self._feeds).map(function (feed) { return feed.key.toString('hex') })
mux.haveFeeds(keys)
var available = values(self._feeds).map(function (feed) { return feed.key.toString('hex') })
if (self._middleware.length) {
// Orderly iterate through all plugs
function callPlug(i, ctx) {
if (i === self._middleware.length) return mux.haveFeeds(ctx.keys, ctx)
let plug = self._middleware[i]

// Reliquish control to next if plug does not implement callback
if (typeof plug.have !== 'function') return callPlug(i + 1, ctx)

// give each plug a fresh reference to avoid peeking/postmodifications
plug.have.bind(self)(clone(ctx), function(keys, extras){
extras = extras || {}
extras.keys = keys
callPlug(i + 1, extras)
})
}
callPlug(0, {keys: available})
} else {
// Default behaviour 'share all'
mux.haveFeeds(available)
}
})
})

Expand All @@ -236,12 +270,16 @@ Multifeed.prototype.replicate = function (opts) {
var filtered = keys.filter(function (key) {
return !Number.isNaN(parseInt(key, 16)) && key.length === 64
})
filtered.forEach(function (key) {
var feeds = values(self._feeds).filter(function (feed) {
return feed.key.toString('hex') === key
})
if (!feeds.length) {
pending++

var existingKeys = values(self._feeds).map(function(feed) { return feed.key.toString('hex') })

var missingFeeds = filtered.filter(function (key) {
return existingKeys.indexOf(key) === -1
})

function initFeed(i) {
if (i >= missingFeeds.length) return cb()
var key = missingFeeds[i]
var numFeeds = Object.keys(self._feeds).length
var storage = self._storage(''+numFeeds)
var feed
Expand All @@ -250,20 +288,27 @@ Multifeed.prototype.replicate = function (opts) {
feed = self._hypercore(storage, Buffer.from(key, 'hex'), self._opts)
} catch (e) {
debug('[REPLICATION] failed to create new local hypercore, key=' + key.toString('hex'))
if (!--pending) cb()
return
return initFeed(i + 1)
}
debug('[REPLICATION] succeeded in creating new local hypercore, key=' + key.toString('hex'))
self._addFeed(feed, String(numFeeds))
feed.ready(function () {
if (!--pending) cb()
debug('[REPLICATION] succeeded in creating new local hypercore, key=' + key.toString('hex'))
self._addFeed(feed, String(numFeeds))
initFeed(i + 1)
})
}
})
if (!pending) cb()
}
initFeed(0)
}
}

Multifeed.prototype.use = function(plug) {
if(this._middleware === null) this._middleware = []
this._middleware.push(plug)
var self = this
if (typeof plug.init === 'function') this.ready(function(){
plug.init(self)
})
}

// TODO: what if the new data is shorter than the old data? things will break!
function writeStringToStorage (string, storage, cb) {
var buf = Buffer.from(string, 'utf8')
Expand Down Expand Up @@ -292,3 +337,8 @@ function readStringFromStorage (storage, cb) {
function values (obj) {
return Object.keys(obj).map(function (k) { return obj[k] })
}

// Deep clone
function clone(obj) {
return JSON.parse(JSON.stringify(obj))
}
11 changes: 8 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
"author": "Stephen Whitmore <sww@eight.net>",
"version": "3.0.3",
"repository": {
"url": "git://github.com/noffle/multifeed.git"
telamon marked this conversation as resolved.
Show resolved Hide resolved
"url": "git://github.com/telamon/multifeed.git"
},
"homepage": "https://github.com/noffle/multifeed",
"bugs": "https://github.com/noffle/multifeed/issues",
"bugs": {
"url": "https://github.com/telamon/multifeed/issues"
},
"main": "index.js",
"scripts": {
"test": "tape test/*.js",
Expand All @@ -31,5 +33,8 @@
"through2": "^3.0.0",
"tmp": "0.0.33"
},
"license": "ISC"
"license": "ISC",
"directories": {
"test": "test"
}
}
115 changes: 115 additions & 0 deletions test/replication-api.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
var test = require('tape')
var hypercore = require('hypercore')
var ram = require('random-access-memory')
var multifeed = require('../index')

var bindStorage = function(subpath) {
return function(p) {
return ram(subpath + '/' + p)
}
}

test('Key exchange API', function(t){
t.plan(26)
var multi = multifeed(hypercore, bindStorage("first/"), { valueEncoding: 'json' })
var m2 = multifeed(hypercore, bindStorage("second/"), { valueEncoding: 'json' })
// A policy that only replicates feeds relevant to cats
// This policy shares everything but accepts only feeds tagged as 'cat'
var CatPolicy = {
have: function(local, share) {
t.ok(local.keys instanceof Array, "Param 'local' should be an array of localy available feeds")
t.ok(typeof share === 'function', "Param 'share' should be a function")

// 'Marketing phase' extract zeroth entry for all feeds an attach it
// to the feed-exchange
extractEntries(this.feeds(), function(entries) {
// Share all feeds including the sounds they make.
share(local.keys, {
says: entries.map(function(e) { return e.says })
})
})
},
want: function(remote, request) {
t.ok(remote.keys instanceof Array, "Param 'remote' should be an array of remotely available feeds")
t.ok(typeof request === 'function', "Param 'request' should be a function")
// Ok let's request only keys relevant to our logic.
var keys = remote.keys.filter(function(k, i){
return remote.says[i] === 'meow'
})
request(keys)
}
}

// let both multifeeds use the same policy
multi.use(CatPolicy)
m2.use(CatPolicy)

var replicateAndVerify = function(err) {
t.error(err)
t.equal(multi.feeds().length, 3)
t.equal(m2.feeds().length, 3)
// Verify that both catfeeds and the original dog-feed is present.
extractEntries(multi.feeds(), function(ent1) {
t.equal(ent1.filter(function(i) { return i.says === 'meow'}).length, 2, 'Should contain two cats')
t.equal(ent1.filter(function(i) { return i.says === 'squeek'}).length, 0, 'Should not contain any mice')
t.equal(ent1.filter(function(i) { return i.says === 'woof'}).length, 1, 'Should contain a dog')

// Verify that both catfeeds and the original rat-feed is present.
extractEntries(m2.feeds(), function(ent2) {
t.equal(ent2.filter(function(i) { return i.says === 'meow'}).length, 2, 'Should contain two cats')
t.equal(ent2.filter(function(i) { return i.says === 'woof'}).length, 0, 'Should not contain any dogs')
t.equal(ent2.filter(function(i) { return i.says === 'squeek'}).length, 1, 'Should contain a rat')
t.end()
})
})
}

// Initialize a cat feed on the first multifeed
multi.writer('a1',function(err, catFeed){
t.error(err)
// Append a cat-log
catFeed.append({says: 'meow', name: 'billy'}, function(err){
t.error(err)
// Initialize a dog feed on the first multifeed
multi.writer('a2', function(err, dogFeed) {
t.error(err)
// Append a dog-log
dogFeed.append({says: 'woof', name: 'rupert'}, function(err) {
t.error(err)
// Initialize another cat feed on the second multifeed
m2.writer('b1', function(err, cat2Feed) {
t.error(err)
// Append cat-log
cat2Feed.append({says: 'meow', name: 'amy'}, function(err) {
t.error(err)
// And lastly a rat feed on the second multifeed.
m2.writer('b2', function(err, ratFeed) {
t.error(err)
ratFeed.append({says: 'squeek', name: 'engelbrecht'}, function(err){
t.error(err)
t.ok(true, "Test data setup ok")
// Replicating
var r = multi.replicate()
r.pipe(m2.replicate()).pipe(r)
.once('end', replicateAndVerify)
})
})
})
})
})
})
})
})
})

function extractEntries(feeds, cb) {
var entries = []
var f = function (i) {
if (typeof feeds[i] === 'undefined') return cb(entries)
feeds[i].get(0, function(err, entry){
entries.push(entry)
f(i+1)
})
}
f(0)
}