Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable encryption and feed-replication-control #18

Merged
merged 13 commits into from
Mar 24, 2019
Merged
1 change: 1 addition & 0 deletions .npmrc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package-lock=false
178 changes: 47 additions & 131 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
var raf = require('random-access-file')
var path = require('path')
var protocol = require('hypercore-protocol')
var through = require('through2')
var pumpify = require('pumpify')
var events = require('events')
var inherits = require('inherits')
var readyify = require('./ready')
var mutexify = require('mutexify')
var debug = require('debug')('multifeed')

var PROTOCOL_VERSION = '1.0.0'
var multiplexer = require('./mux')

module.exports = Multifeed

function Multifeed (hypercore, storage, opts) {
if (!(this instanceof Multifeed)) return new Multifeed(hypercore, storage, opts)

this._feeds = {}
this._feedKeyToFeed = {}

Expand All @@ -34,12 +29,17 @@ function Multifeed (hypercore, storage, opts) {
}
}


var self = this
this._ready = readyify(function (done) {
// Private key-less constant hypercore to bootstrap hypercore-protocol
// replication.
var publicKey = new Buffer('bee80ff3a4ee5e727dc44197cb9d25bf8f19d50b0f3ad2984cfe5b7d14e75de7', 'hex')
var feed = hypercore(self._storage('fake'), publicKey)
var protocolEncryptionKey = new Buffer('bee80ff3a4ee5e727dc44197cb9d25bf8f19d50b0f3ad2984cfe5b7d14e75de7', 'hex')
if (self._opts.key) protocolEncryptionKey = Buffer.from(self._opts.key)
else debug('Warning, running multifeed with unsecure default key')

var feed = hypercore(self._storage('_fake'), protocolEncryptionKey)

feed.ready(function () {
self._fake = feed
self._loadFeeds(function () {
Expand Down Expand Up @@ -120,11 +120,14 @@ Multifeed.prototype.writer = function (name, cb) {
})
return
}

var feed = self._hypercore(storage, self._opts)

feed.ready(function () {
self._addFeed(feed, String(idx))
release(function () {
cb(null, feed, idx)
if (err) cb(err)
else cb(null, feed, idx)
})
})
})
Expand All @@ -144,13 +147,42 @@ Multifeed.prototype.feed = function (key) {

Multifeed.prototype.replicate = function (opts) {
if (!opts) opts = {}

var self = this
opts.expectedFeeds = Object.keys(this._feeds).length + 1
var expectedFeeds = opts.expectedFeeds
var mux = multiplexer(self._fake.key, opts)

// Add key exchange listener
mux.once('manifest', function(m) {
mux.wantFeeds(m.keys)
})

// Add replication listener
mux.once('replicate', function(keys, repl) {
addMissingKeys(keys, function(err){
if(err) return mux.destroy(err)

opts.encrypt = false
opts.stream = protocol(opts)
var key2feed = values(self._feeds).reduce(function(h,feed){
h[feed.key.toString('hex')] = feed
return h
},{})

var feeds = keys.map(function(k){ return key2feed[k] })
repl(feeds)
})
})

// Start streaming
this.ready(function(err){
if (err) return mux.stream.destroy(err)
if (mux.stream.destroyed) return
mux.ready(function(){
var keys = values(self._feeds).map(function (feed) { return feed.key.toString('hex') })
mux.haveFeeds(keys)
})
})

return mux.stream

// Helper functions

function addMissingKeys (keys, cb) {
self.ready(function (err) {
Expand All @@ -162,7 +194,7 @@ Multifeed.prototype.replicate = function (opts) {
})
})
}

function addMissingKeysLocked (keys, cb) {
var pending = 0
debug('[REPLICATION] recv\'d ' + keys.length + ' keys')
Expand Down Expand Up @@ -195,116 +227,6 @@ Multifeed.prototype.replicate = function (opts) {
})
if (!pending) cb()
}

var firstWrite = true
var writeStream = through(function (buf, _, next) {
if (firstWrite) {
firstWrite = false
debug('[REPLICATION] able to share ' + values(self._feeds).length + ' keys')
var keys = values(self._feeds).map(function (feed) { return feed.key.toString('hex') })
var headerBuf = serializeHeader(PROTOCOL_VERSION, keys)
this.push(headerBuf)
}
this.push(buf)
next()
})

var readingHeader = true
var headerAccum = Buffer.alloc(0)
var readStream = through(function (buf, _, next) {
var self = this
if (readingHeader) {
headerAccum = Buffer.concat([headerAccum, buf])
if (headerAccum.length <= 0) return next()
var expectedLen = headerAccum.readUInt32LE(0)
if (headerAccum.length >= expectedLen + 4) {
readingHeader = false
try {
var header = deserializeHeader(headerAccum)
debug('[REPLICATION] recv\'d header: ' + JSON.stringify(header))
if (!compatibleVersions(header.version, PROTOCOL_VERSION)) {
debug('[REPLICATION] aborting; version mismatch (us='+PROTOCOL_VERSION+')')
self.emit('error', new Error('protocol version mismatch! us='+PROTOCOL_VERSION + ' them=' + header.version))
return
}
addMissingKeys(header.keys, function () {
// push remainder of buffer
var leftover = headerAccum.slice(expectedLen + 4)
self.unshift(leftover)
debug('[REPLICATION] starting hypercore replication')
process.nextTick(startSync)
next()
})
} catch (e) {
debug('[REPLICATION] aborting (bad header)')
self.emit('error', e)
return
}
} else {
next()
}
} else {
this.push(buf)
next()
}
})

var stream = pumpify(readStream, opts.stream, writeStream)

if (!opts.live) {
opts.stream.on('prefinalize', function (cb) {
var numFeeds = Object.keys(self._feeds).length + 1
opts.stream.expectedFeeds += (numFeeds - expectedFeeds)
expectedFeeds = numFeeds
cb()
})
}

this.ready(onready)

return stream

function startSync () {
var sortedFeeds = values(self._feeds).sort(cmp)
function cmp (a, b) {
return a.key.toString('hex') > b.key.toString('hex')
}
sortedFeeds.forEach(function (feed) {
debug('[REPLICATION] replicating ' + feed.key.toString('hex'))
feed.replicate(opts)
})
}

function onready (err) {
if (err) return stream.destroy(err)
if (stream.destroyed) return

self._fake.replicate(opts)
}
}

function serializeHeader (version, keys) {
var header = {
version: version,
keys: keys
}
var json = JSON.stringify(header)
debug('[SERIALIZE] header outgoing: ' + json)
var lenBuf = Buffer.alloc(4)
lenBuf.writeUInt32LE(json.length, 0)
var jsonBuf = Buffer.from(json, 'utf8')
return Buffer.concat([
lenBuf,
jsonBuf
])
}

function deserializeHeader (buf) {
var len = buf.readUInt32LE(0)
debug('[SERIALIZE] header len to read: ' + len)
var jsonBuf = buf.slice(4, len + 4)
debug('[SERIALIZE] json buf to read: ' + jsonBuf.toString())
return JSON.parse(jsonBuf.toString('utf8'))
}

function writeJsonToStorage (obj, storage, cb) {
Expand Down Expand Up @@ -341,12 +263,6 @@ function readStringFromStorage (storage, cb) {
})
}

// String, String -> Boolean
function compatibleVersions (v1, v2) {
var major1 = v1.split('.')[0]
var major2 = v2.split('.')[0]
return parseInt(major1) === parseInt(major2)
}

function values (obj) {
return Object.keys(obj).map(function (k) { return obj[k] })
Expand Down
Loading