From e2d894c00be9508d6373c144da9796ececf59a12 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Wed, 18 Oct 2017 08:05:12 -0700 Subject: [PATCH] feat: report progress on ipfs add * Adds wiring for a progress bar Passes the progress bar option for tranfers over http. Progress is shown for your payload being streamed upto the server. There is a pause. Then you get your list of files. * feat: Support specify hash algorithm in files.add (#597) * support support specify hash alg * Add test for add with --hash option. Pass raw cid/multihash to ipfs object get/data * Allow object get/data to accept CID * Var naming tweaks from review * feat: track progress events * tesT: add more tests * fix: tidy up tests * feat: send errors as trailer headers * fix: remove .only from tests * feat: signal error on reponse stream if x-stream-error * test: adding tests for trailer header errors * chore: update interface-ipfs-core * chore: upgrade to latest interface-ipfs-core --- examples/upload-file-via-browser/package.json | 2 +- examples/upload-file-via-browser/src/App.js | 2 +- package.json | 2 +- src/files/add.js | 6 +- src/utils/progress-stream.js | 49 ++++++++++ src/utils/request-api.js | 19 ++++ test/files.spec.js | 90 +++++++++++++++++++ test/request-api.spec.js | 37 +++++++- 8 files changed, 201 insertions(+), 6 deletions(-) create mode 100644 src/utils/progress-stream.js diff --git a/examples/upload-file-via-browser/package.json b/examples/upload-file-via-browser/package.json index d4fc658ca..4b53ccd8c 100644 --- a/examples/upload-file-via-browser/package.json +++ b/examples/upload-file-via-browser/package.json @@ -13,7 +13,7 @@ "devDependencies": { "babel-core": "^5.4.7", "babel-loader": "^5.1.2", - "ipfs-api": "^12.1.7", + "ipfs-api": "../../", "json-loader": "^0.5.4", "react": "^15.4.2", "react-dom": "^15.4.2", diff --git a/examples/upload-file-via-browser/src/App.js b/examples/upload-file-via-browser/src/App.js index 7b913f7f0..102d86e14 100644 --- a/examples/upload-file-via-browser/src/App.js +++ b/examples/upload-file-via-browser/src/App.js @@ -29,7 +29,7 @@ class App extends React.Component { saveToIpfs (reader) { let ipfsId const buffer = Buffer.from(reader.result) - this.ipfsApi.add(buffer) + this.ipfsApi.add(buffer, { progress: (prog) => console.log(`received: ${prog}`) }) .then((response) => { console.log(response) ipfsId = response[0].hash diff --git a/package.json b/package.json index 78af95c85..7a4577355 100644 --- a/package.json +++ b/package.json @@ -65,8 +65,8 @@ "dirty-chai": "^2.0.1", "eslint-plugin-react": "^7.4.0", "gulp": "^3.9.1", + "interface-ipfs-core": "~0.32.1", "hapi": "^16.6.2", - "interface-ipfs-core": "~0.31.19", "ipfsd-ctl": "~0.23.0", "pre-commit": "^1.2.2", "socket.io": "^2.0.3", diff --git a/src/files/add.js b/src/files/add.js index 81e8734f4..cfd9a279e 100644 --- a/src/files/add.js +++ b/src/files/add.js @@ -3,6 +3,7 @@ const isStream = require('is-stream') const promisify = require('promisify-es6') const DAGNodeStream = require('../utils/dagnode-stream') +const ProgressStream = require('../utils/progress-stream') module.exports = (send) => { return promisify((files, opts, callback) => { @@ -41,10 +42,11 @@ module.exports = (send) => { qs.hash = opts.hashAlg } - const request = { path: 'add', files: files, qs: qs } + const request = { path: 'add', files: files, qs: qs, progress: opts.progress } // Transform the response stream to DAGNode values - const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback) + const transform = (res, callback) => DAGNodeStream + .streamToValue(send, ProgressStream.fromStream(opts.progress, res), callback) send.andTransform(request, transform, callback) }) } diff --git a/src/utils/progress-stream.js b/src/utils/progress-stream.js new file mode 100644 index 000000000..272c47c6c --- /dev/null +++ b/src/utils/progress-stream.js @@ -0,0 +1,49 @@ +'use strict' + +const Transform = require('readable-stream').Transform + +/* + A transform stream to track progress events on file upload + + When the progress flag is passed to the HTTP api, the stream + emits progress events like such: + + { + Name string + Hash string `json:",omitempty"` + Bytes int64 `json:",omitempty"` + Size string `json:",omitempty"` + } + + This class will take care of detecting such + events and calling the associated track method + with the bytes sent so far as parameter. It will + also skip them from the stream, emitting only + when the final object has been uploaded and we + got a hash. +*/ +class ProgressStream extends Transform { + constructor (opts) { + opts = Object.assign(opts || {}, { objectMode: true }) + super(opts) + this._track = opts.track || (() => {}) + } + + static fromStream (track, stream) { + const prog = new ProgressStream({ track }) + return stream.pipe(prog) + } + + _transform (chunk, encoding, callback) { + if (chunk && + typeof chunk.Bytes !== 'undefined' && + typeof chunk.Hash === 'undefined') { + this._track(chunk.Bytes) + return callback() + } + + callback(null, chunk) + } +} + +module.exports = ProgressStream diff --git a/src/utils/request-api.js b/src/utils/request-api.js index 951d5d6c0..61815fde8 100644 --- a/src/utils/request-api.js +++ b/src/utils/request-api.js @@ -48,6 +48,22 @@ function onRes (buffer, cb) { // Return a stream of JSON objects if (chunkedObjects && isJson) { const outputStream = pump(res, ndjson.parse()) + // TODO: This needs reworking. + // this is a chicken and egg problem - + // 1) we can't get Trailer headers unless the response ends + // 2) we can't propagate the error, because the response stream + // is closed + // (perhaps we can workaround this using pull-streams) + res.on('end', () => { + let err = res.trailers['x-stream-error'] + if (err) { + err = JSON.parse(err) + const error = new Error(`Server responded with 500`) + error.code = err.Code + error.message = err.Message + outputStream.destroy(error) // error is not going to be propagated + } + }) return cb(null, outputStream) } @@ -81,6 +97,9 @@ function requestAPI (config, options, callback) { if (options.files && !Array.isArray(options.files)) { options.files = [options.files] } + if (options.progress) { + options.qs.progress = true + } if (options.qs.r) { options.qs.recursive = options.qs.r diff --git a/test/files.spec.js b/test/files.spec.js index 629e5e9b0..3bfc4bf87 100644 --- a/test/files.spec.js +++ b/test/files.spec.js @@ -106,6 +106,96 @@ describe('.files (the MFS API part)', function () { }) }) + it('files.add file with progress option', (done) => { + let progress + let progressCount = 0 + + const progressHandler = (p) => { + progressCount += 1 + progress = p + } + + ipfs.files.add(testfile, { progress: progressHandler }, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(progress).to.be.equal(testfile.byteLength) + expect(progressCount).to.be.equal(1) + + done() + }) + }) + + it('files.add big file with progress option', (done) => { + let progress = 0 + let progressCount = 0 + + const progressHandler = (p) => { + progressCount += 1 + progress = p + } + + // TODO: needs to be using a big file + ipfs.files.add(testfile, { progress: progressHandler }, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(progress).to.be.equal(testfile.byteLength) + expect(progressCount).to.be.equal(1) + + done() + }) + }) + + it('files.add directory with progress option', (done) => { + let progress = 0 + let progressCount = 0 + + const progressHandler = (p) => { + progressCount += 1 + progress = p + } + + // TODO: needs to be using a directory + ipfs.files.add(testfile, { progress: progressHandler }, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(progress).to.be.equal(testfile.byteLength) + expect(progressCount).to.be.equal(1) + + done() + }) + }) + + it('files.add without progress options', (done) => { + ipfs.files.add(testfile, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + done() + }) + }) + + HASH_ALGS.forEach((name) => { + it(`files.add with hash=${name} and raw-leaves=false`, (done) => { + const content = String(Math.random() + Date.now()) + const file = { + path: content + '.txt', + content: Buffer.from(content) + } + const options = { hash: name, 'raw-leaves': false } + + ipfs.files.add([file], options, (err, res) => { + if (err) return done(err) + expect(res).to.have.length(1) + const cid = new CID(res[0].hash) + expect(mh.decode(cid.multihash).name).to.equal(name) + done() + }) + }) + }) + it('files.mkdir', (done) => { ipfs.files.mkdir('/test-folder', done) }) diff --git a/test/request-api.spec.js b/test/request-api.spec.js index 6588ccbbd..18e45d522 100644 --- a/test/request-api.spec.js +++ b/test/request-api.spec.js @@ -7,10 +7,14 @@ const expect = chai.expect chai.use(dirtyChai) const isNode = require('detect-node') const ipfsAPI = require('../src/index.js') +const ndjson = require('ndjson') +const pump = require('pump') describe('\'deal with HTTP weirdness\' tests', () => { it('does not crash if no content-type header is provided', (done) => { - if (!isNode) { return done() } + if (!isNode) { + return done() + } // go-ipfs always (currently) adds a content-type header, even if no content is present, // the standard behaviour for an http-api is to omit this header if no content is present @@ -27,3 +31,34 @@ describe('\'deal with HTTP weirdness\' tests', () => { }) }) }) + +describe('trailer headers', () => { + it('should deal with trailer x-stream-error correctly', (done) => { + if (!isNode) { + return done() + } + + const server = require('http').createServer((req, res) => { + const resStream = pump(res, ndjson.stringify()) + res.setHeader('x-chunked-output', '1') + res.setHeader('content-type', 'application/json') + res.setHeader('Trailer', 'X-Stream-Error') + res.addTrailers({ 'X-Stream-Error': JSON.stringify({ Message: 'ups, something went wrong', Code: 500 }) }) + resStream.write({ Bytes: 1 }) + res.end() + }) + + server.listen(6001, () => { + const ipfs = ipfsAPI('/ip4/127.0.0.1/tcp/6001') + /* eslint-disable */ + ipfs.files.add(Buffer.from('Hello there!'), (err, res) => { + // TODO: error's are not being correctly + // propagated with Trailer headers yet + // expect(err).to.exist() + expect(res).to.not.equal(0) + server.close(done) + }) + /* eslint-enable */ + }) + }) +})