diff --git a/README.md b/README.md index 247228a..89d70f3 100644 --- a/README.md +++ b/README.md @@ -125,20 +125,40 @@ With [npm](https://npmjs.org/) installed, run $ npm install multifeed ``` -## Hacks - -1. `hypercore-protocol` requires the first feed exchanged to be common between - replicating peers. This prevents two strangers from exchanging sets of - hypercores. A "fake" hypercore with a hardcoded public key is included in the - code to bootstrap the replication process. I discarded the private key, but - even if I didn't, it doesn't let me do anything nefarious. You could patch - this with your own key of choice. -2. `hypercore-protocol` requires all feed keys be known upfront: only discovery - keys are exchanged (`discoveryKey = hash(key)`), so this module wraps the - hypercore replication duplex stream in a secondary duplex stream that - exchanges feed public keys upfront before moving on to the hypercore - replication mechanism. +## Replication Policies +You can control which feeds get replicated by providing a policy object: + +```js + +var randomReplication = { + init: function(multifeed) { + // called on multifeed.ready + // use it for initalization. + }, + have: function(local, share) { + // called on peer connection + // select which feeds to share + share(local.keys, { + random: local.keys.map(function() { return Math.random() - 0.5 }) + }) + }, + want: function(remote, request) { + // called when remote peer's + // share list is available. + // remote.keys is always available + // remote contains also all custom props sent by remote. + var keys = remote.keys.filter(function(k, i){ + return remote.random[i] > 0.5 + }) + request(keys) + } +} + +var multi = multifeed(hypercore, './db', { valueEncoding: 'json' }) +multi.use(randomReplication) + +``` ## See Also - [multifeed-index](https://github.com/noffle/multifeed-index) diff --git a/index.js b/index.js index 2a799d5..70f4194 100644 --- a/index.js +++ b/index.js @@ -15,9 +15,14 @@ function Multifeed (hypercore, storage, opts) { this._feedKeyToFeed = {} this._hypercore = hypercore - this._opts = opts - + this._opts = opts || {} + this._middleware = null this.writerLock = mutexify() + this._middleware = [] + + this.key = new Buffer('bee80ff3a4ee5e727dc44197cb9d25bf8f19d50b0f3ad2984cfe5b7d14e75de7', 'hex') + if (this._opts.key) this.key = Buffer.from(this._opts.key) + else debug('Warning, running multifeed with unsecure default key') this.closed = false @@ -34,20 +39,9 @@ function Multifeed (hypercore, storage, opts) { var self = this this._ready = readyify(function (done) { - // Private key-less constant hypercore to bootstrap hypercore-protocol - // replication. - var protocolEncryptionKey = new Buffer('bee80ff3a4ee5e727dc44197cb9d25bf8f19d50b0f3ad2984cfe5b7d14e75de7', 'hex') - if (self._opts.key) protocolEncryptionKey = Buffer.from(self._opts.key) - else debug('Warning, running multifeed with unsecure default key') - - var feed = hypercore(self._storage('_fake'), protocolEncryptionKey) - - feed.ready(function () { - self._fake = feed - self._loadFeeds(function () { - debug('[INIT] finished loading feeds') - done() - }) + self._loadFeeds(function () { + debug('[INIT] finished loading feeds') + done() }) }) } @@ -76,7 +70,7 @@ Multifeed.prototype.close = function (cb) { }) } - var feeds = values(self._feeds).concat(self._fake) + var feeds = values(self._feeds) function next (n) { if (n >= feeds.length) { @@ -161,7 +155,7 @@ Multifeed.prototype.writer = function (name, cb) { feed.ready(function () { self._addFeed(feed, String(idx)) release(function () { - if (err) cb(err) + if (err) return cb(err) else cb(null, feed, idx) }) }) @@ -183,11 +177,31 @@ Multifeed.prototype.feed = function (key) { Multifeed.prototype.replicate = function (opts) { if (!opts) opts = {} var self = this - var mux = multiplexer(self._fake.key, opts) + var mux = multiplexer(self.key, opts) // Add key exchange listener mux.once('manifest', function(m) { - mux.wantFeeds(m.keys) + if (self._middleware.length) { + function callPlug(i, ctx) { + if (self._middleware.length === i) return mux.wantFeeds(ctx.keys) + var plug = self._middleware[i] + + // Reliquish control to next if plug does not implement callback + if (typeof plug.want !== 'function') return callPlug(i + 1, ctx) + + // give each plug a fresh reference to avoid peeking/postmodifications + plug.want.bind(self)(clone(ctx), function(keys) { + let n = clone(m) + n.keys = keys + callPlug(i + 1, n) + }) + } + // Start loop + callPlug(0, m) + } else { + // default behaviour "want all" + mux.wantFeeds(m.keys) + } }) // Add replication listener @@ -195,7 +209,7 @@ Multifeed.prototype.replicate = function (opts) { addMissingKeys(keys, function(err){ if(err) return mux.destroy(err) - var key2feed = values(self._feeds).reduce(function(h,feed){ + var key2feed = values(self._feeds).reduce(function(h, feed){ h[feed.key.toString('hex')] = feed return h },{}) @@ -210,8 +224,28 @@ Multifeed.prototype.replicate = function (opts) { if (err) return mux.stream.destroy(err) if (mux.stream.destroyed) return mux.ready(function(){ - var keys = values(self._feeds).map(function (feed) { return feed.key.toString('hex') }) - mux.haveFeeds(keys) + var available = values(self._feeds).map(function (feed) { return feed.key.toString('hex') }) + if (self._middleware.length) { + // Orderly iterate through all plugs + function callPlug(i, ctx) { + if (i === self._middleware.length) return mux.haveFeeds(ctx.keys, ctx) + let plug = self._middleware[i] + + // Reliquish control to next if plug does not implement callback + if (typeof plug.have !== 'function') return callPlug(i + 1, ctx) + + // give each plug a fresh reference to avoid peeking/postmodifications + plug.have.bind(self)(clone(ctx), function(keys, extras){ + extras = extras || {} + extras.keys = keys + callPlug(i + 1, extras) + }) + } + callPlug(0, {keys: available}) + } else { + // Default behaviour 'share all' + mux.haveFeeds(available) + } }) }) @@ -236,12 +270,16 @@ Multifeed.prototype.replicate = function (opts) { var filtered = keys.filter(function (key) { return !Number.isNaN(parseInt(key, 16)) && key.length === 64 }) - filtered.forEach(function (key) { - var feeds = values(self._feeds).filter(function (feed) { - return feed.key.toString('hex') === key - }) - if (!feeds.length) { - pending++ + + var existingKeys = values(self._feeds).map(function(feed) { return feed.key.toString('hex') }) + + var missingFeeds = filtered.filter(function (key) { + return existingKeys.indexOf(key) === -1 + }) + + function initFeed(i) { + if (i >= missingFeeds.length) return cb() + var key = missingFeeds[i] var numFeeds = Object.keys(self._feeds).length var storage = self._storage(''+numFeeds) var feed @@ -250,20 +288,27 @@ Multifeed.prototype.replicate = function (opts) { feed = self._hypercore(storage, Buffer.from(key, 'hex'), self._opts) } catch (e) { debug('[REPLICATION] failed to create new local hypercore, key=' + key.toString('hex')) - if (!--pending) cb() - return + return initFeed(i + 1) } - debug('[REPLICATION] succeeded in creating new local hypercore, key=' + key.toString('hex')) - self._addFeed(feed, String(numFeeds)) feed.ready(function () { - if (!--pending) cb() + debug('[REPLICATION] succeeded in creating new local hypercore, key=' + key.toString('hex')) + self._addFeed(feed, String(numFeeds)) + initFeed(i + 1) }) - } - }) - if (!pending) cb() + } + initFeed(0) } } +Multifeed.prototype.use = function(plug) { + if(this._middleware === null) this._middleware = [] + this._middleware.push(plug) + var self = this + if (typeof plug.init === 'function') this.ready(function(){ + plug.init(self) + }) +} + // TODO: what if the new data is shorter than the old data? things will break! function writeStringToStorage (string, storage, cb) { var buf = Buffer.from(string, 'utf8') @@ -292,3 +337,8 @@ function readStringFromStorage (storage, cb) { function values (obj) { return Object.keys(obj).map(function (k) { return obj[k] }) } + +// Deep clone +function clone(obj) { + return JSON.parse(JSON.stringify(obj)) +} diff --git a/package.json b/package.json index 1823948..936ad97 100644 --- a/package.json +++ b/package.json @@ -31,5 +31,8 @@ "through2": "^3.0.0", "tmp": "0.0.33" }, - "license": "ISC" + "license": "ISC", + "directories": { + "test": "test" + } } diff --git a/test/replication-api.js b/test/replication-api.js new file mode 100644 index 0000000..c9668e0 --- /dev/null +++ b/test/replication-api.js @@ -0,0 +1,115 @@ +var test = require('tape') +var hypercore = require('hypercore') +var ram = require('random-access-memory') +var multifeed = require('../index') + +var bindStorage = function(subpath) { + return function(p) { + return ram(subpath + '/' + p) + } +} + +test('Key exchange API', function(t){ + t.plan(26) + var multi = multifeed(hypercore, bindStorage("first/"), { valueEncoding: 'json' }) + var m2 = multifeed(hypercore, bindStorage("second/"), { valueEncoding: 'json' }) + // A policy that only replicates feeds relevant to cats + // This policy shares everything but accepts only feeds tagged as 'cat' + var CatPolicy = { + have: function(local, share) { + t.ok(local.keys instanceof Array, "Param 'local' should be an array of localy available feeds") + t.ok(typeof share === 'function', "Param 'share' should be a function") + + // 'Marketing phase' extract zeroth entry for all feeds an attach it + // to the feed-exchange + extractEntries(this.feeds(), function(entries) { + // Share all feeds including the sounds they make. + share(local.keys, { + says: entries.map(function(e) { return e.says }) + }) + }) + }, + want: function(remote, request) { + t.ok(remote.keys instanceof Array, "Param 'remote' should be an array of remotely available feeds") + t.ok(typeof request === 'function', "Param 'request' should be a function") + // Ok let's request only keys relevant to our logic. + var keys = remote.keys.filter(function(k, i){ + return remote.says[i] === 'meow' + }) + request(keys) + } + } + + // let both multifeeds use the same policy + multi.use(CatPolicy) + m2.use(CatPolicy) + + var replicateAndVerify = function(err) { + t.error(err) + t.equal(multi.feeds().length, 3) + t.equal(m2.feeds().length, 3) + // Verify that both catfeeds and the original dog-feed is present. + extractEntries(multi.feeds(), function(ent1) { + t.equal(ent1.filter(function(i) { return i.says === 'meow'}).length, 2, 'Should contain two cats') + t.equal(ent1.filter(function(i) { return i.says === 'squeek'}).length, 0, 'Should not contain any mice') + t.equal(ent1.filter(function(i) { return i.says === 'woof'}).length, 1, 'Should contain a dog') + + // Verify that both catfeeds and the original rat-feed is present. + extractEntries(m2.feeds(), function(ent2) { + t.equal(ent2.filter(function(i) { return i.says === 'meow'}).length, 2, 'Should contain two cats') + t.equal(ent2.filter(function(i) { return i.says === 'woof'}).length, 0, 'Should not contain any dogs') + t.equal(ent2.filter(function(i) { return i.says === 'squeek'}).length, 1, 'Should contain a rat') + t.end() + }) + }) + } + + // Initialize a cat feed on the first multifeed + multi.writer('a1',function(err, catFeed){ + t.error(err) + // Append a cat-log + catFeed.append({says: 'meow', name: 'billy'}, function(err){ + t.error(err) + // Initialize a dog feed on the first multifeed + multi.writer('a2', function(err, dogFeed) { + t.error(err) + // Append a dog-log + dogFeed.append({says: 'woof', name: 'rupert'}, function(err) { + t.error(err) + // Initialize another cat feed on the second multifeed + m2.writer('b1', function(err, cat2Feed) { + t.error(err) + // Append cat-log + cat2Feed.append({says: 'meow', name: 'amy'}, function(err) { + t.error(err) + // And lastly a rat feed on the second multifeed. + m2.writer('b2', function(err, ratFeed) { + t.error(err) + ratFeed.append({says: 'squeek', name: 'engelbrecht'}, function(err){ + t.error(err) + t.ok(true, "Test data setup ok") + // Replicating + var r = multi.replicate() + r.pipe(m2.replicate()).pipe(r) + .once('end', replicateAndVerify) + }) + }) + }) + }) + }) + }) + }) + }) +}) + +function extractEntries(feeds, cb) { + var entries = [] + var f = function (i) { + if (typeof feeds[i] === 'undefined') return cb(entries) + feeds[i].get(0, function(err, entry){ + entries.push(entry) + f(i+1) + }) + } + f(0) +}