diff --git a/package.json b/package.json index f51d41c88..4fed6c3f3 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,6 @@ }, "dependencies": { "async": "^2.1.4", - "bl": "^1.1.2", "bs58": "^3.1.0", "concat-stream": "^1.6.0", "detect-node": "^2.0.3", @@ -43,6 +42,7 @@ "peer-id": "^0.8.1", "peer-info": "^0.8.1", "promisify-es6": "^1.0.2", + "pump": "^1.0.1", "qs": "^6.3.0", "readable-stream": "1.1.14", "stream-http": "^2.5.0", @@ -64,7 +64,7 @@ "gulp": "^3.9.1", "hapi": "^16.0.1", "interface-ipfs-core": "^0.22.1", - "ipfsd-ctl": "^0.17.0", + "@haad/ipfsd-ctl": "^0.18.0-beta.5", "pre-commit": "^1.2.0", "socket.io": "^1.7.1", "socket.io-client": "^1.7.1", @@ -117,4 +117,4 @@ "url": "https://github.com/ipfs/js-ipfs-api/issues" }, "homepage": "https://github.com/ipfs/js-ipfs-api" -} \ No newline at end of file +} diff --git a/src/api/add.js b/src/api/add.js index 1379a4296..1b41f7578 100644 --- a/src/api/add.js +++ b/src/api/add.js @@ -1,24 +1,26 @@ 'use strict' const isStream = require('isstream') -const addToDagNodesTransform = require('../add-to-dagnode-transform') const promisify = require('promisify-es6') +const DAGNodeStream = require('../dagnode-stream') module.exports = (send) => { return promisify((files, callback) => { - const good = Buffer.isBuffer(files) || + const ok = Buffer.isBuffer(files) || isStream.isReadable(files) || Array.isArray(files) - if (!good) { - callback(new Error('"files" must be a buffer, readable stream, or array of objects')) + if (!ok) { + return callback(new Error('"files" must be a buffer, readable stream, or array of objects')) } - const sendWithTransform = send.withTransform(addToDagNodesTransform) - - return sendWithTransform({ + const request = { path: 'add', files: files - }, callback) + } + + // Transform the response stream to DAGNode values + const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback) + send.andTransform(request, transform, callback) }) } diff --git a/src/api/block.js b/src/api/block.js index 35cc8af3e..73d745063 100644 --- a/src/api/block.js +++ b/src/api/block.js @@ -1,10 +1,10 @@ 'use strict' const promisify = require('promisify-es6') -const bl = require('bl') const Block = require('ipfs-block') const multihash = require('multihashes') const CID = require('cids') +const streamToValue = require('../stream-to-value') module.exports = (send) => { return { @@ -21,25 +21,27 @@ module.exports = (send) => { opts = {} } - return send({ - path: 'block/get', - args: args, - qs: opts - }, (err, res) => { - if (err) { - return callback(err) - } + // Transform the response from Buffer or a Stream to a Block + const transform = (res, callback) => { if (Buffer.isBuffer(res)) { callback(null, new Block(res)) } else { - res.pipe(bl((err, data) => { + streamToValue(res, (err, data) => { if (err) { return callback(err) } callback(null, new Block(data)) - })) + }) } - }) + } + + const request = { + path: 'block/get', + args: args, + qs: opts + } + + send.andTransform(request, transform, callback) }), stat: promisify((args, opts, callback) => { // TODO this needs to be adjusted with the new go-ipfs http-api @@ -51,19 +53,22 @@ module.exports = (send) => { callback = opts opts = {} } - return send({ + + const request = { path: 'block/stat', args: args, qs: opts - }, (err, stats) => { - if (err) { - return callback(err) - } + } + + // Transform the response from { Key, Size } objects to { key, size } objects + const transform = (stats, callback) => { callback(null, { key: stats.Key, size: stats.Size }) - }) + } + + send.andTransform(request, transform, callback) }), put: promisify((block, cid, callback) => { // TODO this needs to be adjusted with the new go-ipfs http-api @@ -81,15 +86,15 @@ module.exports = (send) => { block = block.data } - return send({ + const request = { path: 'block/put', files: block - }, (err, blockInfo) => { - if (err) { - return callback(err) - } - callback(null, new Block(block)) - }) + } + + // Transform the response to a Block + const transform = (blockInfo, callback) => callback(null, new Block(block)) + + send.andTransform(request, transform, callback) }) } } diff --git a/src/api/dht.js b/src/api/dht.js index fe42146ec..dd78e7e0e 100644 --- a/src/api/dht.js +++ b/src/api/dht.js @@ -1,6 +1,7 @@ 'use strict' const promisify = require('promisify-es6') +const streamToValue = require('../stream-to-value') module.exports = (send) => { return { @@ -19,11 +20,13 @@ module.exports = (send) => { opts = {} } - send({ + const request = { path: 'dht/findprovs', args: args, qs: opts - }, callback) + } + + send.andTransform(request, streamToValue, callback) }), get: promisify((key, opts, callback) => { if (typeof opts === 'function' && diff --git a/src/api/get.js b/src/api/get.js index fa7ba1250..3b4c0c983 100644 --- a/src/api/get.js +++ b/src/api/get.js @@ -1,11 +1,11 @@ 'use strict' -const tarStreamToObjects = require('../tar-stream-to-objects') -const cleanMultihash = require('../clean-multihash') const promisify = require('promisify-es6') +const cleanMultihash = require('../clean-multihash') +const TarStreamToObjects = require('../tar-stream-to-objects') module.exports = (send) => { - return promisify(function get (path, opts, callback) { + return promisify((path, opts, callback) => { if (typeof opts === 'function' && !callback) { callback = opts @@ -26,12 +26,13 @@ module.exports = (send) => { return callback(err) } - var sendWithTransform = send.withTransform(tarStreamToObjects) - - sendWithTransform({ + const request = { path: 'get', args: path, qs: opts - }, callback) + } + + // Convert the response stream to TarStream objects + send.andTransform(request, TarStreamToObjects, callback) }) } diff --git a/src/api/log.js b/src/api/log.js index f7a77aafd..74c750d5e 100644 --- a/src/api/log.js +++ b/src/api/log.js @@ -1,5 +1,6 @@ 'use strict' +const pump = require('pump') const ndjson = require('ndjson') const promisify = require('promisify-es6') @@ -12,7 +13,8 @@ module.exports = (send) => { if (err) { return callback(err) } - callback(null, response.pipe(ndjson.parse())) + const outputStream = pump(response, ndjson.parse()) + callback(null, outputStream) }) }) } diff --git a/src/api/object.js b/src/api/object.js index 1f403bc03..f7cd675a8 100644 --- a/src/api/object.js +++ b/src/api/object.js @@ -5,7 +5,7 @@ const DAGNode = dagPB.DAGNode const DAGLink = dagPB.DAGLink const promisify = require('promisify-es6') const bs58 = require('bs58') -const bl = require('bl') +const streamToValue = require('../stream-to-value') const cleanMultihash = require('../clean-multihash') const LRU = require('lru-cache') const lruOptions = { @@ -188,7 +188,7 @@ module.exports = (send) => { } if (typeof result.pipe === 'function') { - result.pipe(bl(callback)) + streamToValue(result, callback) } else { callback(null, result) } diff --git a/src/api/ping.js b/src/api/ping.js index 5e7c74f6f..eeaa3125a 100644 --- a/src/api/ping.js +++ b/src/api/ping.js @@ -1,18 +1,36 @@ 'use strict' const promisify = require('promisify-es6') +const streamToValue = require('../stream-to-value') module.exports = (send) => { return promisify((id, callback) => { - send({ + const request = { path: 'ping', args: id, qs: { n: 1 } - }, function (err, res) { - if (err) { - return callback(err, null) - } - callback(null, res[1]) - }) + } + + // Transform the response stream to a value: + // { Success: , Time: , Text: } + const transform = (res, callback) => { + streamToValue(res, (err, res) => { + if (err) { + return callback(err) + } + + // go-ipfs http api currently returns 3 lines for a ping. + // they're a little messed, so take the correct values from each lines. + const pingResult = { + Success: res[1].Success, + Time: res[1].Time, + Text: res[2].Text + } + + callback(null, pingResult) + }) + } + + send.andTransform(request, transform, callback) }) } diff --git a/src/api/pubsub.js b/src/api/pubsub.js new file mode 100644 index 000000000..1c054fca2 --- /dev/null +++ b/src/api/pubsub.js @@ -0,0 +1,135 @@ +'use strict' + +const promisify = require('promisify-es6') +const EventEmitter = require('events') +const eos = require('end-of-stream') +const PubsubMessageStream = require('../pubsub-message-stream') +const stringlistToArray = require('../stringlist-to-array') + +/* Public API */ +module.exports = (send) => { + /* Internal subscriptions state and functions */ + const ps = new EventEmitter() + const subscriptions = {} + ps.id = Math.random() + return { + subscribe: (topic, options, handler, callback) => { + const defaultOptions = { + discover: false + } + + if (typeof options === 'function') { + callback = handler + handler = options + options = defaultOptions + } + + if (!options) { + options = defaultOptions + } + + // promisify doesn't work as we always pass a + // function as last argument (`handler`) + if (!callback) { + return new Promise((resolve, reject) => { + subscribe(topic, options, handler, (err) => { + if (err) { + return reject(err) + } + resolve() + }) + }) + } + + subscribe(topic, options, handler, callback) + }, + unsubscribe (topic, handler) { + if (ps.listenerCount(topic) === 0 || !subscriptions[topic]) { + throw new Error(`Not subscribed to '${topic}'`) + } + + ps.removeListener(topic, handler) + + // Drop the request once we are actualy done + if (ps.listenerCount(topic) === 0) { + subscriptions[topic].abort() + subscriptions[topic] = null + } + }, + publish: promisify((topic, data, callback) => { + if (!Buffer.isBuffer(data)) { + return callback(new Error('data must be a Buffer')) + } + + const request = { + path: 'pubsub/pub', + args: [topic, data] + } + + send(request, callback) + }), + ls: promisify((callback) => { + const request = { + path: 'pubsub/ls' + } + + send.andTransform(request, stringlistToArray, callback) + }), + peers: promisify((topic, callback) => { + const request = { + path: 'pubsub/peers', + args: [topic] + } + + send.andTransform(request, stringlistToArray, callback) + }), + setMaxListeners (n) { + return ps.setMaxListeners(n) + } + } + + function subscribe (topic, options, handler, callback) { + ps.on(topic, handler) + if (subscriptions[topic]) { + return callback() + } + + // Request params + const request = { + path: 'pubsub/sub', + args: [topic], + qs: { + discover: options.discover + } + } + + // Start the request and transform the response + // stream to Pubsub messages stream + subscriptions[topic] = send.andTransform(request, PubsubMessageStream.from, (err, stream) => { + if (err) { + subscriptions[topic] = null + ps.removeListener(topic, handler) + return callback(err) + } + + stream.on('data', (msg) => { + ps.emit(topic, msg) + }) + + stream.on('error', (err) => { + ps.emit('error', err) + }) + + eos(stream, (err) => { + if (err) { + ps.emit('error', err) + } + + subscriptions[topic] = null + ps.removeListener(topic, handler) + }) + + callback() + }) + } +} diff --git a/src/api/refs.js b/src/api/refs.js index 318ccb59f..56958ca7f 100644 --- a/src/api/refs.js +++ b/src/api/refs.js @@ -1,6 +1,7 @@ 'use strict' const promisify = require('promisify-es6') +const streamToValue = require('../stream-to-value') module.exports = (send) => { const refs = promisify((args, opts, callback) => { @@ -8,21 +9,28 @@ module.exports = (send) => { callback = opts opts = {} } - return send({ + + const request = { path: 'refs', args: args, qs: opts - }, callback) + } + + send.andTransform(request, streamToValue, callback) }) + refs.local = promisify((opts, callback) => { if (typeof (opts) === 'function') { callback = opts opts = {} } - return send({ + + const request = { path: 'refs', qs: opts - }, callback) + } + + send.andTransform(request, streamToValue, callback) }) return refs diff --git a/src/api/util/fs-add.js b/src/api/util/fs-add.js index c78dda13a..6c63cf1ab 100644 --- a/src/api/util/fs-add.js +++ b/src/api/util/fs-add.js @@ -1,8 +1,8 @@ 'use strict' const isNode = require('detect-node') -const addToDagNodesTransform = require('./../../add-to-dagnode-transform') const promisify = require('promisify-es6') +const DAGNodeStream = require('../../dagnode-stream') module.exports = (send) => { return promisify((path, opts, callback) => { @@ -28,12 +28,14 @@ module.exports = (send) => { return callback(new Error('"path" must be a string')) } - const sendWithTransform = send.withTransform(addToDagNodesTransform) - - sendWithTransform({ + const request = { path: 'add', qs: opts, files: path - }, callback) + } + + // Transform the response stream to DAGNode values + const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback) + send.andTransform(request, transform, callback) }) } diff --git a/src/api/util/url-add.js b/src/api/util/url-add.js index 7dc7694b2..7db525ad7 100644 --- a/src/api/util/url-add.js +++ b/src/api/util/url-add.js @@ -3,9 +3,8 @@ const promisify = require('promisify-es6') const once = require('once') const parseUrl = require('url').parse - const request = require('../../request') -const addToDagNodesTransform = require('./../../add-to-dagnode-transform') +const DAGNodeStream = require('../../dagnode-stream') module.exports = (send) => { return promisify((url, opts, callback) => { @@ -28,7 +27,6 @@ module.exports = (send) => { return callback(new Error('"url" param must be an http(s) url')) } - const sendWithTransform = send.withTransform(addToDagNodesTransform) callback = once(callback) request(parseUrl(url).protocol)(url, (res) => { @@ -37,11 +35,15 @@ module.exports = (send) => { return callback(new Error(`Failed to download with ${res.statusCode}`)) } - sendWithTransform({ + const params = { path: 'add', qs: opts, files: res - }, callback) + } + + // Transform the response stream to DAGNode values + const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback) + send.andTransform(params, transform, callback) }).end() }) } diff --git a/src/dagnode-stream.js b/src/dagnode-stream.js new file mode 100644 index 000000000..bba259cb3 --- /dev/null +++ b/src/dagnode-stream.js @@ -0,0 +1,59 @@ +'use strict' + +const pump = require('pump') +const TransformStream = require('readable-stream').Transform +const streamToValue = require('./stream-to-value') +const getDagNode = require('./get-dagnode') + +/* + Transforms a stream of {Name, Hash} objects to include size + of the DAG object. + + Usage: inputStream.pipe(DAGNodeStream({ send: send })) + + Input object format: + { + Name: '/path/to/file/foo.txt', + Hash: 'Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP' + } + + Output object format: + { + path: '/path/to/file/foo.txt', + hash: 'Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP', + size: 20 + } +*/ +class DAGNodeStream extends TransformStream { + constructor (options) { + const opts = Object.assign(options || {}, { objectMode: true }) + super(opts) + this._send = opts.send + } + + static streamToValue (send, inputStream, callback) { + const outputStream = pump(inputStream, new DAGNodeStream({ send: send }), (err) => { + if (err) callback(err) + }) + streamToValue(outputStream, callback) + } + + _transform (obj, enc, callback) { + getDagNode(this._send, obj.Hash, (err, node) => { + if (err) { + return callback(err) + } + + const result = { + path: obj.Name, + hash: obj.Hash, + size: node.size + } + + this.push(result) + callback(null) + }) + } +} + +module.exports = DAGNodeStream diff --git a/src/get-dagnode.js b/src/get-dagnode.js index 75cd0886c..d9942d99c 100644 --- a/src/get-dagnode.js +++ b/src/get-dagnode.js @@ -1,8 +1,8 @@ 'use strict' const DAGNode = require('ipld-dag-pb').DAGNode -const bl = require('bl') const parallel = require('async/parallel') +const streamToValue = require('./stream-to-value') module.exports = function (send, hash, callback) { // Retrieve the object and its data in parallel, then produce a DAGNode @@ -36,12 +36,12 @@ module.exports = function (send, hash, callback) { if (Buffer.isBuffer(stream)) { DAGNode.create(stream, object.Links, callback) } else { - stream.pipe(bl(function (err, data) { + streamToValue(stream, (err, data) => { if (err) { return callback(err) } DAGNode.create(data, object.Links, callback) - })) + }) } }) } diff --git a/src/load-commands.js b/src/load-commands.js index b69c197cb..1246ef5ef 100644 --- a/src/load-commands.js +++ b/src/load-commands.js @@ -25,6 +25,7 @@ function requireCommands () { refs: require('./api/refs'), repo: require('./api/repo'), swarm: require('./api/swarm'), + pubsub: require('./api/pubsub'), update: require('./api/update'), version: require('./api/version') } diff --git a/src/pubsub-message-stream.js b/src/pubsub-message-stream.js new file mode 100644 index 000000000..b6631726f --- /dev/null +++ b/src/pubsub-message-stream.js @@ -0,0 +1,33 @@ +'use strict' + +const TransformStream = require('readable-stream').Transform +const PubsubMessage = require('./pubsub-message-utils') + +class PubsubMessageStream extends TransformStream { + constructor (options) { + const opts = Object.assign(options || {}, { objectMode: true }) + super(opts) + } + + static from (inputStream, callback) { + let outputStream = inputStream.pipe(new PubsubMessageStream()) + inputStream.on('end', () => outputStream.emit('end')) + callback(null, outputStream) + } + + _transform (obj, enc, callback) { + let msg + try { + msg = PubsubMessage.deserialize(obj, 'base64') + } catch (e) { + // Not a valid pubsub message + // go-ipfs returns '{}' as the very first object atm, we skip that + return callback() + } + + this.push(msg) + callback() + } +} + +module.exports = PubsubMessageStream diff --git a/src/pubsub-message-utils.js b/src/pubsub-message-utils.js new file mode 100644 index 000000000..c38cb7233 --- /dev/null +++ b/src/pubsub-message-utils.js @@ -0,0 +1,38 @@ +'use strict' + +module.exports = { + deserialize (data, enc = 'json') { + enc = enc ? enc.toLowerCase() : null + + if (enc === 'json') { + return deserializeFromJson(data) + } else if (enc === 'base64') { + return deserializeFromBase64(data) + } + + throw new Error(`Unsupported encoding: '${enc}'`) + } +} + +function deserializeFromJson (data) { + const json = JSON.parse(data) + return deserializeFromBase64(json) +} + +function deserializeFromBase64 (obj) { + if (!isPubsubMessage(obj)) { + throw new Error(`Not a pubsub message`) + } + + return { + // TODO: broken see https://github.com/ipfs/go-ipfs/issues/3522 + from: obj.from, + seqno: new Buffer(obj.seqno, 'base64'), + data: new Buffer(obj.data, 'base64'), + topicCIDs: obj.topicIDs || obj.topicCIDs + } +} + +function isPubsubMessage (obj) { + return obj && obj.from && obj.seqno && obj.data && (obj.topicIDs || obj.topicCIDs) +} diff --git a/src/request-api.js b/src/request-api.js index 5eb150269..b12e44cdd 100644 --- a/src/request-api.js +++ b/src/request-api.js @@ -1,50 +1,29 @@ 'use strict' const Qs = require('qs') -const ndjson = require('ndjson') const isNode = require('detect-node') +const ndjson = require('ndjson') +const pump = require('pump') const once = require('once') -const concat = require('concat-stream') - const getFilesStream = require('./get-files-stream') +const streamToValue = require('./stream-to-value') +const streamToJsonValue = require('./stream-to-json-value') const request = require('./request') // -- Internal -function parseChunkedJson (res, cb) { - res - .pipe(ndjson.parse()) - .once('error', cb) - .pipe(concat((data) => cb(null, data))) -} - -function parseRaw (res, cb) { - res - .once('error', cb) - .pipe(concat((data) => cb(null, data))) -} - -function parseJson (res, cb) { - res - .once('error', cb) - .pipe(concat((data) => { - if (!data || data.length === 0) { - return cb() - } - - if (Buffer.isBuffer(data)) { - data = data.toString() - } - - let res - try { - res = JSON.parse(data) - } catch (err) { - return cb(err) - } - - cb(null, res) - })) +function parseError (res, cb) { + const error = new Error(`Server responded with ${res.statusCode}`) + streamToJsonValue(res, (err, payload) => { + if (err) { + return cb(err) + } + if (payload) { + error.code = payload.Code + error.message = payload.Message || payload.toString() + } + cb(error) + }) } function onRes (buffer, cb) { @@ -55,33 +34,27 @@ function onRes (buffer, cb) { res.headers['content-type'].indexOf('application/json') === 0 if (res.statusCode >= 400 || !res.statusCode) { - const error = new Error(`Server responded with ${res.statusCode}`) - - parseJson(res, (err, payload) => { - if (err) { - return cb(err) - } - if (payload) { - error.code = payload.Code - error.message = payload.Message || payload.toString() - } - cb(error) - }) + return parseError(res, cb) } + // Return the response stream directly if (stream && !buffer) { return cb(null, res) } + // Return a stream of JSON objects if (chunkedObjects && isJson) { - return parseChunkedJson(res, cb) + const outputStream = pump(res, ndjson.parse()) + return cb(null, outputStream) } + // Return a JSON object if (isJson) { - return parseJson(res, cb) + return streamToJsonValue(res, cb) } - parseRaw(res, cb) + // Return a value + return streamToValue(res, cb) } } @@ -163,7 +136,7 @@ function requestAPI (config, options, callback) { // // -- Module Interface -exports = module.exports = function getRequestAPI (config) { +exports = module.exports = (config) => { /* * options: { * path: // API path (like /add or /config) - type: string @@ -173,7 +146,7 @@ exports = module.exports = function getRequestAPI (config) { * buffer: // buffer the request before sending it - type: bool * } */ - const send = function (options, callback) { + const send = (options, callback) => { if (typeof options !== 'object') { return callback(new Error('no options were passed')) } @@ -181,25 +154,17 @@ exports = module.exports = function getRequestAPI (config) { return requestAPI(config, options, callback) } - // Wraps the 'send' function such that an asynchronous - // transform may be applied to its result before - // passing it on to either its callback or promise. - send.withTransform = function (transform) { - return function (options, callback) { - if (typeof options !== 'object') { - return callback(new Error('no options were passed')) + // Send a HTTP request and pass via a transform function + // to convert the response data to wanted format before + // returning it to the callback. + // Eg. send.andTransform({}, (e) => JSON.parse(e), (err, res) => ...) + send.andTransform = (options, transform, callback) => { + return send(options, (err, res) => { + if (err) { + return callback(err) } - - send(options, wrap(callback)) - - function wrap (func) { - if (func) { - return function (err, res) { - transform(err, res, send, func) - } - } - } - } + transform(res, callback) + }) } return send diff --git a/src/stream-to-json-value.js b/src/stream-to-json-value.js new file mode 100644 index 000000000..e42de2fc6 --- /dev/null +++ b/src/stream-to-json-value.js @@ -0,0 +1,34 @@ +'use strict' + +const streamToValue = require('./stream-to-value') + +/* + Converts a stream to a single JSON value +*/ +function streamToJsonValue (res, cb) { + streamToValue(res, (err, data) => { + if (err) { + return cb(err) + } + + if (!data || data.length === 0) { + return cb() + } + + // TODO: check if needed, afaik JSON.parse can parse Buffers + if (Buffer.isBuffer(data)) { + data = data.toString() + } + + let res + try { + res = JSON.parse(data) + } catch (err) { + return cb(err) + } + + cb(null, res) + }) +} + +module.exports = streamToJsonValue diff --git a/src/stream-to-value.js b/src/stream-to-value.js new file mode 100644 index 000000000..6cf0c1ec5 --- /dev/null +++ b/src/stream-to-value.js @@ -0,0 +1,16 @@ +'use strict' + +const pump = require('pump') +const concat = require('concat-stream') + +/* + Concatenate a stream to a single value. +*/ +function streamToValue (res, callback) { + const done = (data) => callback(null, data) + pump(res, concat(done), (err) => { + if (err) callback(err) + }) +} + +module.exports = streamToValue diff --git a/src/stringlist-to-array.js b/src/stringlist-to-array.js new file mode 100644 index 000000000..df28ee6df --- /dev/null +++ b/src/stringlist-to-array.js @@ -0,0 +1,9 @@ +'use strict' + +// Converts a go-ipfs "stringList" to an array +// { Strings: ['A', 'B'] } --> ['A', 'B'] +function stringlistToArray (res, cb) { + cb(null, res.Strings || []) +} + +module.exports = stringlistToArray diff --git a/src/tar-stream-to-objects.js b/src/tar-stream-to-objects.js index acab14658..6d7765a03 100644 --- a/src/tar-stream-to-objects.js +++ b/src/tar-stream-to-objects.js @@ -1,38 +1,48 @@ 'use strict' +const pump = require('pump') const tar = require('tar-stream') -const Readable = require('readable-stream') +const ReadableStream = require('readable-stream').Readable -// transform tar stream into readable stream of -// { path: 'string', content: Readable } -module.exports = (err, res, send, done) => { - if (err) { - return done(err) +class ObjectsStreams extends ReadableStream { + constructor (options) { + const opts = Object.assign(options || {}, { objectMode: true }) + super(opts) } - const objStream = new Readable({ objectMode: true }) - objStream._read = function noop () {} + _read () {} +} + +/* + Transform a tar stream into a stream of objects: + + Output format: + { path: 'string', content: Stream } +*/ +const TarStreamToObjects = (inputStream, callback) => { + let outputStream = new ObjectsStreams() + let extractStream = tar.extract() - res - .pipe(tar.extract()) + extractStream .on('entry', (header, stream, next) => { stream.on('end', next) if (header.type !== 'directory') { - objStream.push({ + outputStream.push({ path: header.name, content: stream }) } else { - objStream.push({ + outputStream.push({ path: header.name }) stream.resume() } }) - .on('finish', () => { - objStream.push(null) - }) + .on('finish', () => outputStream.push(null)) - done(null, objStream) + pump(inputStream, extractStream) + callback(null, outputStream) } + +module.exports = TarStreamToObjects diff --git a/test/factory/daemon-spawner.js b/test/factory/daemon-spawner.js index 86a2d808b..76b6b61f5 100644 --- a/test/factory/daemon-spawner.js +++ b/test/factory/daemon-spawner.js @@ -1,7 +1,7 @@ 'use strict' // const defaultConfig = require('./default-config.json') -const ipfsd = require('ipfsd-ctl') +const ipfsd = require('@haad/ipfsd-ctl') const series = require('async/series') const eachSeries = require('async/eachSeries') const once = require('once') @@ -73,17 +73,21 @@ function spawnEphemeralNode (callback) { (cb) => { const configValues = { Bootstrap: [], + // Do not use discovery to avoid connecting to + // other nodes by mistake Discovery: {}, - 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], - 'HTTPHeaders.Access-Control-Allow-Credentials': 'true', - 'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] + API: { + 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], + 'HTTPHeaders.Access-Control-Allow-Credentials': ['true'], + 'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] + } } eachSeries(Object.keys(configValues), (configKey, cb) => { - node.setConfig(`API.${configKey}`, JSON.stringify(configValues[configKey]), cb) + node.setConfig(`${configKey}`, JSON.stringify(configValues[configKey]), cb) }, cb) }, - (cb) => node.startDaemon(cb) + (cb) => node.startDaemon(['--enable-pubsub-experiment'], cb) ], (err) => { if (err) { return callback(err) diff --git a/test/interface-ipfs-core/ping.spec.js b/test/interface-ipfs-core/ping.spec.js index bf4e2e606..a7dbb4a28 100644 --- a/test/interface-ipfs-core/ping.spec.js +++ b/test/interface-ipfs-core/ping.spec.js @@ -12,6 +12,10 @@ describe('.ping', () => { apiClients.a.ping(id.id, (err, res) => { expect(err).to.not.exist expect(res).to.have.a.property('Success') + expect(res).to.have.a.property('Time') + expect(res).to.have.a.property('Text') + expect(res.Text).to.contain('Average latency') + expect(res.Time).to.be.a('number') done() }) }) @@ -25,6 +29,10 @@ describe('.ping', () => { }) .then((res) => { expect(res).to.have.a.property('Success') + expect(res).to.have.a.property('Time') + expect(res).to.have.a.property('Text') + expect(res.Text).to.contain('Average latency') + expect(res.Time).to.be.a('number') }) }) }) diff --git a/test/interface-ipfs-core/pubsub.spec.js b/test/interface-ipfs-core/pubsub.spec.js new file mode 100644 index 000000000..886d7db56 --- /dev/null +++ b/test/interface-ipfs-core/pubsub.spec.js @@ -0,0 +1,20 @@ +/* eslint-env mocha */ + +'use strict' + +const test = require('interface-ipfs-core') +const FactoryClient = require('../factory/factory-client') + +let fc + +const common = { + setup: function (callback) { + fc = new FactoryClient() + callback(null, fc) + }, + teardown: function (callback) { + fc.dismantle(callback) + } +} + +test.pubsub(common) diff --git a/test/interface-ipfs-core/refs.spec.js b/test/interface-ipfs-core/refs.spec.js index 5b33662b0..3a38abe1f 100644 --- a/test/interface-ipfs-core/refs.spec.js +++ b/test/interface-ipfs-core/refs.spec.js @@ -65,7 +65,6 @@ describe('.refs', () => { ipfs.refs(folder, {format: ' '}, (err, objs) => { expect(err).to.not.exist expect(objs).to.eql(result) - done() }) }) diff --git a/test/ipfs-api/util.spec.js b/test/ipfs-api/util.spec.js index e2e9dede2..d103844bd 100644 --- a/test/ipfs-api/util.spec.js +++ b/test/ipfs-api/util.spec.js @@ -65,7 +65,6 @@ describe('.util', () => { ipfs.util.addFromFs(filePath, (err, result) => { expect(err).to.not.exist expect(result.length).to.be.above(5) - done() }) }) diff --git a/test/setup/spawn-daemons.js b/test/setup/spawn-daemons.js index 5a2d3b659..d78f4cf95 100644 --- a/test/setup/spawn-daemons.js +++ b/test/setup/spawn-daemons.js @@ -5,7 +5,7 @@ const gulp = require('gulp') const fs = require('fs') const path = require('path') -const ipfsd = require('ipfsd-ctl') +const ipfsd = require('@haad/ipfsd-ctl') const eachSeries = require('async/eachSeries') const parallel = require('async/parallel') @@ -28,19 +28,19 @@ function startDisposableDaemons (callback) { const configValues = { Bootstrap: [], Discovery: {}, - 'HTTPHeaders.Access-Control-Allow-Origin': ['*'], - 'HTTPHeaders.Access-Control-Allow-Credentials': 'true', - 'HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] + 'API.HTTPHeaders.Access-Control-Allow-Origin': ['*'], + 'API.HTTPHeaders.Access-Control-Allow-Credentials': ['true'], + 'API.HTTPHeaders.Access-Control-Allow-Methods': ['PUT', 'POST', 'GET'] } eachSeries(Object.keys(configValues), (configKey, cb) => { - nodes[key].setConfig(`API.${configKey}`, JSON.stringify(configValues[configKey]), cb) + nodes[key].setConfig(configKey, JSON.stringify(configValues[configKey]), cb) }, (err) => { if (err) { return cb(err) } - nodes[key].startDaemon(cb) + nodes[key].startDaemon(['--enable-pubsub-experiment'], cb) }) }) }