Skip to content
This repository has been archived by the owner on Mar 10, 2020. It is now read-only.

Commit

Permalink
feat: report progress on ipfs add
Browse files Browse the repository at this point in the history
* Adds wiring for a progress bar

Passes the progress bar option for tranfers over http. Progress is shown for your payload being streamed upto the server. There is a pause. Then you get your list of files.

* feat: Support specify hash algorithm in files.add (#597)

*  support support specify hash alg

* Add test for add with --hash option. Pass raw cid/multihash to ipfs object get/data

* Allow object get/data to accept CID

* Var naming tweaks from review

* feat: track progress events

* tesT: add more tests

* fix: tidy up tests

* feat: send errors as trailer headers

* fix: remove .only from tests

* feat: signal error on reponse stream if x-stream-error

* test: adding tests for trailer header errors

* chore: update interface-ipfs-core

* chore: upgrade to latest interface-ipfs-core
  • Loading branch information
dryajov authored and daviddias committed Oct 18, 2017
1 parent 38d7289 commit e2d894c
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 6 deletions.
2 changes: 1 addition & 1 deletion examples/upload-file-via-browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
"devDependencies": {
"babel-core": "^5.4.7",
"babel-loader": "^5.1.2",
"ipfs-api": "^12.1.7",
"ipfs-api": "../../",
"json-loader": "^0.5.4",
"react": "^15.4.2",
"react-dom": "^15.4.2",
Expand Down
2 changes: 1 addition & 1 deletion examples/upload-file-via-browser/src/App.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class App extends React.Component {
saveToIpfs (reader) {
let ipfsId
const buffer = Buffer.from(reader.result)
this.ipfsApi.add(buffer)
this.ipfsApi.add(buffer, { progress: (prog) => console.log(`received: ${prog}`) })
.then((response) => {
console.log(response)
ipfsId = response[0].hash
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
"dirty-chai": "^2.0.1",
"eslint-plugin-react": "^7.4.0",
"gulp": "^3.9.1",
"interface-ipfs-core": "~0.32.1",
"hapi": "^16.6.2",
"interface-ipfs-core": "~0.31.19",
"ipfsd-ctl": "~0.23.0",
"pre-commit": "^1.2.2",
"socket.io": "^2.0.3",
Expand Down
6 changes: 4 additions & 2 deletions src/files/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const isStream = require('is-stream')
const promisify = require('promisify-es6')
const DAGNodeStream = require('../utils/dagnode-stream')
const ProgressStream = require('../utils/progress-stream')

module.exports = (send) => {
return promisify((files, opts, callback) => {
Expand Down Expand Up @@ -41,10 +42,11 @@ module.exports = (send) => {
qs.hash = opts.hashAlg
}

const request = { path: 'add', files: files, qs: qs }
const request = { path: 'add', files: files, qs: qs, progress: opts.progress }

// Transform the response stream to DAGNode values
const transform = (res, callback) => DAGNodeStream.streamToValue(send, res, callback)
const transform = (res, callback) => DAGNodeStream
.streamToValue(send, ProgressStream.fromStream(opts.progress, res), callback)
send.andTransform(request, transform, callback)
})
}
49 changes: 49 additions & 0 deletions src/utils/progress-stream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
'use strict'

const Transform = require('readable-stream').Transform

/*
A transform stream to track progress events on file upload
When the progress flag is passed to the HTTP api, the stream
emits progress events like such:
{
Name string
Hash string `json:",omitempty"`
Bytes int64 `json:",omitempty"`
Size string `json:",omitempty"`
}
This class will take care of detecting such
events and calling the associated track method
with the bytes sent so far as parameter. It will
also skip them from the stream, emitting only
when the final object has been uploaded and we
got a hash.
*/
class ProgressStream extends Transform {
constructor (opts) {
opts = Object.assign(opts || {}, { objectMode: true })
super(opts)
this._track = opts.track || (() => {})
}

static fromStream (track, stream) {
const prog = new ProgressStream({ track })
return stream.pipe(prog)
}

_transform (chunk, encoding, callback) {
if (chunk &&
typeof chunk.Bytes !== 'undefined' &&
typeof chunk.Hash === 'undefined') {
this._track(chunk.Bytes)
return callback()
}

callback(null, chunk)
}
}

module.exports = ProgressStream
19 changes: 19 additions & 0 deletions src/utils/request-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,22 @@ function onRes (buffer, cb) {
// Return a stream of JSON objects
if (chunkedObjects && isJson) {
const outputStream = pump(res, ndjson.parse())
// TODO: This needs reworking.
// this is a chicken and egg problem -
// 1) we can't get Trailer headers unless the response ends
// 2) we can't propagate the error, because the response stream
// is closed
// (perhaps we can workaround this using pull-streams)
res.on('end', () => {
let err = res.trailers['x-stream-error']
if (err) {
err = JSON.parse(err)
const error = new Error(`Server responded with 500`)
error.code = err.Code
error.message = err.Message
outputStream.destroy(error) // error is not going to be propagated
}
})
return cb(null, outputStream)
}

Expand Down Expand Up @@ -81,6 +97,9 @@ function requestAPI (config, options, callback) {
if (options.files && !Array.isArray(options.files)) {
options.files = [options.files]
}
if (options.progress) {
options.qs.progress = true
}

if (options.qs.r) {
options.qs.recursive = options.qs.r
Expand Down
90 changes: 90 additions & 0 deletions test/files.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,96 @@ describe('.files (the MFS API part)', function () {
})
})

it('files.add file with progress option', (done) => {
let progress
let progressCount = 0

const progressHandler = (p) => {
progressCount += 1
progress = p
}

ipfs.files.add(testfile, { progress: progressHandler }, (err, res) => {
expect(err).to.not.exist()

expect(res).to.have.length(1)
expect(progress).to.be.equal(testfile.byteLength)
expect(progressCount).to.be.equal(1)

done()
})
})

it('files.add big file with progress option', (done) => {
let progress = 0
let progressCount = 0

const progressHandler = (p) => {
progressCount += 1
progress = p
}

// TODO: needs to be using a big file
ipfs.files.add(testfile, { progress: progressHandler }, (err, res) => {
expect(err).to.not.exist()

expect(res).to.have.length(1)
expect(progress).to.be.equal(testfile.byteLength)
expect(progressCount).to.be.equal(1)

done()
})
})

it('files.add directory with progress option', (done) => {
let progress = 0
let progressCount = 0

const progressHandler = (p) => {
progressCount += 1
progress = p
}

// TODO: needs to be using a directory
ipfs.files.add(testfile, { progress: progressHandler }, (err, res) => {
expect(err).to.not.exist()

expect(res).to.have.length(1)
expect(progress).to.be.equal(testfile.byteLength)
expect(progressCount).to.be.equal(1)

done()
})
})

it('files.add without progress options', (done) => {
ipfs.files.add(testfile, (err, res) => {
expect(err).to.not.exist()

expect(res).to.have.length(1)
done()
})
})

HASH_ALGS.forEach((name) => {
it(`files.add with hash=${name} and raw-leaves=false`, (done) => {
const content = String(Math.random() + Date.now())
const file = {
path: content + '.txt',
content: Buffer.from(content)
}
const options = { hash: name, 'raw-leaves': false }

ipfs.files.add([file], options, (err, res) => {
if (err) return done(err)
expect(res).to.have.length(1)
const cid = new CID(res[0].hash)
expect(mh.decode(cid.multihash).name).to.equal(name)
done()
})
})
})

it('files.mkdir', (done) => {
ipfs.files.mkdir('/test-folder', done)
})
Expand Down
37 changes: 36 additions & 1 deletion test/request-api.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ const expect = chai.expect
chai.use(dirtyChai)
const isNode = require('detect-node')
const ipfsAPI = require('../src/index.js')
const ndjson = require('ndjson')
const pump = require('pump')

describe('\'deal with HTTP weirdness\' tests', () => {
it('does not crash if no content-type header is provided', (done) => {
if (!isNode) { return done() }
if (!isNode) {
return done()
}

// go-ipfs always (currently) adds a content-type header, even if no content is present,
// the standard behaviour for an http-api is to omit this header if no content is present
Expand All @@ -27,3 +31,34 @@ describe('\'deal with HTTP weirdness\' tests', () => {
})
})
})

describe('trailer headers', () => {
it('should deal with trailer x-stream-error correctly', (done) => {
if (!isNode) {
return done()
}

const server = require('http').createServer((req, res) => {
const resStream = pump(res, ndjson.stringify())
res.setHeader('x-chunked-output', '1')
res.setHeader('content-type', 'application/json')
res.setHeader('Trailer', 'X-Stream-Error')
res.addTrailers({ 'X-Stream-Error': JSON.stringify({ Message: 'ups, something went wrong', Code: 500 }) })
resStream.write({ Bytes: 1 })
res.end()
})

server.listen(6001, () => {
const ipfs = ipfsAPI('/ip4/127.0.0.1/tcp/6001')
/* eslint-disable */
ipfs.files.add(Buffer.from('Hello there!'), (err, res) => {
// TODO: error's are not being correctly
// propagated with Trailer headers yet
// expect(err).to.exist()
expect(res).to.not.equal(0)
server.close(done)
})
/* eslint-enable */
})
})
})

0 comments on commit e2d894c

Please sign in to comment.