From 19a8dae6d57a7fd7a61c83ce8dd6bece5fdeab5b Mon Sep 17 00:00:00 2001 From: Pedro Teixeira Date: Fri, 17 Nov 2017 15:44:13 +0000 Subject: [PATCH 1/5] files add multipart HTTP request streams for real --- package.json | 1 - src/block/put.js | 29 ++-- src/files/add-pull-stream.js | 30 +--- src/files/add-readable-stream.js | 30 +--- src/files/add.js | 59 +++----- src/files/write.js | 34 +++-- src/object/appendData.js | 15 +- src/object/put.js | 23 ++- src/object/setData.js | 14 +- src/util/fs-add.js | 7 +- src/util/url-add.js | 17 +-- src/utils/converter.js | 13 +- src/utils/multipart.js | 112 ++++++++++++++ .../{get-files-stream.js => prepare-file.js} | 38 +---- src/utils/request-api.js | 30 ++-- src/utils/send-files-stream.js | 142 ++++++++++++++++++ src/utils/send-one-file-multiple-results.js | 18 +++ src/utils/send-one-file.js | 18 +++ 18 files changed, 420 insertions(+), 210 deletions(-) create mode 100644 src/utils/multipart.js rename src/utils/{get-files-stream.js => prepare-file.js} (76%) create mode 100644 src/utils/send-files-stream.js create mode 100644 src/utils/send-one-file-multiple-results.js create mode 100644 src/utils/send-one-file.js diff --git a/package.json b/package.json index 444aff8ae..3d7262392 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,6 @@ "lru-cache": "^4.1.1", "multiaddr": "^3.0.1", "multihashes": "~0.4.12", - "multipart-stream": "^2.0.1", "ndjson": "^1.5.0", "once": "^1.4.0", "peer-id": "~0.10.2", diff --git a/src/block/put.js b/src/block/put.js index d27f95314..0ec1e5c5f 100644 --- a/src/block/put.js +++ b/src/block/put.js @@ -3,34 +3,35 @@ const promisify = require('promisify-es6') const Block = require('ipfs-block') const CID = require('cids') +const once = require('once') +const SendOneFile = require('../utils/send-one-file') module.exports = (send) => { - return promisify((block, cid, callback) => { + const sendOneFile = SendOneFile(send, 'block/put') + + return promisify((block, cid, _callback) => { // TODO this needs to be adjusted with the new go-ipfs http-api if (typeof cid === 'function') { - callback = cid + _callback = cid cid = {} } + const callback = once(_callback) + if (Array.isArray(block)) { - const err = new Error('block.put() only accepts 1 file') - return callback(err) + return callback(new Error('block.put accepts only one block')) } if (typeof block === 'object' && block.data) { block = block.data } - const request = { - path: 'block/put', - files: block - } - - // Transform the response to a Block - const transform = (info, callback) => { - callback(null, new Block(block, new CID(info.Key))) - } + sendOneFile(block, {}, (err, result) => { + if (err) { + return callback(err) // early + } - send.andTransform(request, transform, callback) + callback(null, new Block(block, new CID(result.Key))) + }) }) } diff --git a/src/files/add-pull-stream.js b/src/files/add-pull-stream.js index 8c23abc23..daf050de8 100644 --- a/src/files/add-pull-stream.js +++ b/src/files/add-pull-stream.js @@ -1,30 +1,6 @@ 'use strict' -const addCmd = require('./add.js') -const pull = require('pull-stream') -const pushable = require('pull-pushable') +const SendFilesStream = require('../utils/send-files-stream') +const toPull = require('stream-to-pull-stream') -module.exports = (send) => { - const add = addCmd(send) - - return (options) => { - options = options || {} - - const source = pushable() - const sink = pull.collect((err, tuples) => { - if (err) { return source.end(err) } - - add(tuples, options, (err, filesAdded) => { - if (err) { return source.end(err) } - - filesAdded.forEach((file) => source.push(file)) - source.end() - }) - }) - - return { - sink: sink, - source: source - } - } -} +module.exports = (send) => (options) => toPull(SendFilesStream(send, 'add')(options)) diff --git a/src/files/add-readable-stream.js b/src/files/add-readable-stream.js index cb4364d1d..b3e03d4e8 100644 --- a/src/files/add-readable-stream.js +++ b/src/files/add-readable-stream.js @@ -1,31 +1,5 @@ 'use strict' -const addCmd = require('./add.js') -const Duplex = require('readable-stream').Duplex +const SendFilesStream = require('../utils/send-files-stream') -module.exports = (send) => { - const add = addCmd(send) - - return (options) => { - options = options || {} - - const tuples = [] - - const ds = new Duplex({ objectMode: true }) - ds._read = (n) => {} - - ds._write = (file, enc, next) => { - tuples.push(file) - next() - } - - ds.end = () => add(tuples, options, (err, res) => { - if (err) { return ds.emit('error', err) } - - res.forEach((tuple) => ds.push(tuple)) - ds.push(null) - }) - - return ds - } -} +module.exports = (send) => SendFilesStream(send, 'add') diff --git a/src/files/add.js b/src/files/add.js index 324a49416..6b54bb03d 100644 --- a/src/files/add.js +++ b/src/files/add.js @@ -1,51 +1,42 @@ 'use strict' -const isStream = require('is-stream') const promisify = require('promisify-es6') -const ProgressStream = require('../utils/progress-stream') -const converter = require('../utils/converter') +const ConcatStream = require('concat-stream') +const once = require('once') +const isStream = require('is-stream') +const SendFilesStream = require('../utils/send-files-stream') module.exports = (send) => { - return promisify((files, opts, callback) => { - if (typeof opts === 'function') { - callback = opts - opts = {} - } + const createAddStream = SendFilesStream(send, 'add') - opts = opts || {} - - const ok = Buffer.isBuffer(files) || - isStream.readable(files) || - Array.isArray(files) - - if (!ok) { - return callback(new Error('"files" must be a buffer, readable stream, or array of objects')) + return promisify((_files, options, _callback) => { + if (typeof options === 'function') { + _callback = options + options = null } - const qs = {} + const callback = once(_callback) - if (opts['cid-version'] != null) { - qs['cid-version'] = opts['cid-version'] - } else if (opts.cidVersion != null) { - qs['cid-version'] = opts.cidVersion + if (!options) { + options = {} } - if (opts['raw-leaves'] != null) { - qs['raw-leaves'] = opts['raw-leaves'] - } else if (opts.rawLeaves != null) { - qs['raw-leaves'] = opts.rawLeaves - } + const ok = Buffer.isBuffer(_files) || + isStream.readable(_files) || + Array.isArray(_files) - if (opts.hash != null) { - qs.hash = opts.hash - } else if (opts.hashAlg != null) { - qs.hash = opts.hashAlg + if (!ok) { + return callback(new Error('"files" must be a buffer, readable stream, or array of objects')) } - const request = { path: 'add', files: files, qs: qs, progress: opts.progress } + const files = [].concat(_files) + + const stream = createAddStream(options) + const concat = ConcatStream((result) => callback(null, result)) + stream.once('error', callback) + stream.pipe(concat) - send.andTransform(request, (response, cb) => { - converter(ProgressStream.fromStream(opts.progress, response), cb) - }, callback) + files.forEach((file) => stream.write(file)) + stream.end() }) } diff --git a/src/files/write.js b/src/files/write.js index 17c7ce8df..5e9efa03b 100644 --- a/src/files/write.js +++ b/src/files/write.js @@ -1,28 +1,42 @@ 'use strict' const promisify = require('promisify-es6') +const concatStream = require('concat-stream') +const once = require('once') +const SendFilesStream = require('../utils/send-files-stream') module.exports = (send) => { - return promisify((pathDst, files, opts, callback) => { + const sendFilesStream = SendFilesStream(send, 'files/write') + + return promisify((pathDst, _files, opts, _callback) => { if (typeof opts === 'function' && - !callback) { - callback = opts + !_callback) { + _callback = opts opts = {} } // opts is the real callback -- // 'callback' is being injected by promisify if (typeof opts === 'function' && - typeof callback === 'function') { - callback = opts + typeof _callback === 'function') { + _callback = opts opts = {} } - send({ - path: 'files/write', + const files = [].concat(_files) + const callback = once(_callback) + + const options = { args: pathDst, - qs: opts, - files: files - }, callback) + qs: opts + } + + const stream = sendFilesStream(options) + const concat = concatStream((result) => callback(null, result)) + stream.once('error', callback) + stream.pipe(concat) + + files.forEach((file) => stream.write(file)) + stream.end() }) } diff --git a/src/object/appendData.js b/src/object/appendData.js index daf2a572d..fa3783909 100644 --- a/src/object/appendData.js +++ b/src/object/appendData.js @@ -1,16 +1,20 @@ 'use strict' const promisify = require('promisify-es6') +const once = require('once') const cleanMultihash = require('../utils/clean-multihash') +const SendOneFile = require('../utils/send-one-file') module.exports = (send) => { const objectGet = require('./get')(send) + const sendOneFile = SendOneFile(send, 'object/patch/append-data') - return promisify((multihash, data, opts, callback) => { + return promisify((multihash, data, opts, _callback) => { if (typeof opts === 'function') { - callback = opts + _callback = opts opts = {} } + const callback = once(_callback) if (!opts) { opts = {} } @@ -21,14 +25,11 @@ module.exports = (send) => { return callback(err) } - send({ - path: 'object/patch/append-data', - args: [multihash], - files: data - }, (err, result) => { + sendOneFile(data, { args: [multihash] }, (err, result) => { if (err) { return callback(err) } + objectGet(result.Hash, { enc: 'base58' }, callback) }) }) diff --git a/src/object/put.js b/src/object/put.js index 537c9d34f..1fd6aca11 100644 --- a/src/object/put.js +++ b/src/object/put.js @@ -9,13 +9,20 @@ const lruOptions = { } const cache = LRU(lruOptions) +const SendOneFile = require('../utils/send-one-file') +const once = require('once') module.exports = (send) => { - return promisify((obj, options, callback) => { + const sendOneFile = SendOneFile(send, 'object/put') + + return promisify((obj, options, _callback) => { if (typeof options === 'function') { - callback = options + _callback = options options = {} } + + const callback = once(_callback) + if (!options) { options = {} } @@ -56,13 +63,13 @@ module.exports = (send) => { } const enc = options.enc || 'json' - send({ - path: 'object/put', - qs: { inputenc: enc }, - files: buf - }, (err, result) => { + const sendOptions = { + qs: { inputenc: enc } + } + + sendOneFile(buf, sendOptions, (err, result) => { if (err) { - return callback(err) + return callback(err) // early } if (Buffer.isBuffer(obj)) { diff --git a/src/object/setData.js b/src/object/setData.js index a7380df60..a4296dddd 100644 --- a/src/object/setData.js +++ b/src/object/setData.js @@ -1,16 +1,20 @@ 'use strict' const promisify = require('promisify-es6') +const once = require('once') const cleanMultihash = require('../utils/clean-multihash') +const SendOneFile = require('../utils/send-one-file') module.exports = (send) => { const objectGet = require('./get')(send) + const sendOneFile = SendOneFile(send, 'object/patch/set-data') - return promisify((multihash, data, opts, callback) => { + return promisify((multihash, data, opts, _callback) => { if (typeof opts === 'function') { - callback = opts + _callback = opts opts = {} } + const callback = once(_callback) if (!opts) { opts = {} } @@ -21,11 +25,7 @@ module.exports = (send) => { return callback(err) } - send({ - path: 'object/patch/set-data', - args: [multihash], - files: data - }, (err, result) => { + sendOneFile(data, { args: [multihash] }, (err, result) => { if (err) { return callback(err) } diff --git a/src/util/fs-add.js b/src/util/fs-add.js index e305069fd..8a3ea404f 100644 --- a/src/util/fs-add.js +++ b/src/util/fs-add.js @@ -2,11 +2,11 @@ const isNode = require('detect-node') const promisify = require('promisify-es6') -const converter = require('../utils/converter') const moduleConfig = require('../utils/module-config') +const SendOneFile = require('../utils/send-one-file-multiple-results') module.exports = (arg) => { - const send = moduleConfig(arg) + const sendOneFile = SendOneFile(moduleConfig(arg), 'add') return promisify((path, opts, callback) => { if (typeof opts === 'function' && @@ -31,7 +31,6 @@ module.exports = (arg) => { return callback(new Error('"path" must be a string')) } - const request = { path: 'add', files: path, qs: opts } - send.andTransform(request, converter, callback) + sendOneFile(path, { qs: opts }, callback) }) } diff --git a/src/util/url-add.js b/src/util/url-add.js index a9889e64f..3caf11cb2 100644 --- a/src/util/url-add.js +++ b/src/util/url-add.js @@ -1,14 +1,13 @@ 'use strict' const promisify = require('promisify-es6') -const once = require('once') const parseUrl = require('url').parse const request = require('../utils/request') -const converter = require('../utils/converter') const moduleConfig = require('../utils/module-config') +const SendOneFile = require('../utils/send-one-file-multiple-results') module.exports = (arg) => { - const send = moduleConfig(arg) + const sendOneFile = SendOneFile(moduleConfig(arg), 'add') return promisify((url, opts, callback) => { if (typeof (opts) === 'function' && @@ -25,19 +24,17 @@ module.exports = (arg) => { opts = {} } - callback = once(callback) - if (!validUrl(url)) { return callback(new Error('"url" param must be an http(s) url')) } - requestWithRedirect(url, opts, send, callback) + requestWithRedirect(url, opts, sendOneFile, callback) }) } const validUrl = (url) => typeof url === 'string' && url.startsWith('http') -const requestWithRedirect = (url, opts, send, callback) => { +const requestWithRedirect = (url, opts, sendOneFile, callback) => { request(parseUrl(url).protocol)(url, (res) => { res.once('error', callback) if (res.statusCode >= 400) { @@ -50,11 +47,9 @@ const requestWithRedirect = (url, opts, send, callback) => { if (!validUrl(redirection)) { return callback(new Error('redirection url must be an http(s) url')) } - requestWithRedirect(redirection, opts, send, callback) + requestWithRedirect(redirection, opts, sendOneFile, callback) } else { - const request = { path: 'add', files: res, qs: opts } - - send.andTransform(request, converter, callback) + sendOneFile(res, { qs: opts }, callback) } }).end() } diff --git a/src/utils/converter.js b/src/utils/converter.js index b372e0d09..444064bf3 100644 --- a/src/utils/converter.js +++ b/src/utils/converter.js @@ -26,18 +26,20 @@ const streamToValue = require('./stream-to-value') */ class ConverterStream extends TransformStream { constructor (options) { - const opts = Object.assign(options || {}, { objectMode: true }) + const opts = Object.assign({}, options || {}, { objectMode: true }) super(opts) } _transform (obj, enc, callback) { - this.push({ + if (!obj.Hash) { + return callback() + } + + callback(null, { path: obj.Name, hash: obj.Hash, size: parseInt(obj.Size, 10) }) - - callback(null) } } @@ -54,4 +56,5 @@ function converter (inputStream, callback) { streamToValue(outputStream, callback) } -module.exports = converter +exports = module.exports = converter +exports.ConverterStream = ConverterStream diff --git a/src/utils/multipart.js b/src/utils/multipart.js new file mode 100644 index 000000000..2d28d4cf5 --- /dev/null +++ b/src/utils/multipart.js @@ -0,0 +1,112 @@ +'use strict' + +const Transform = require('stream').Transform + +const PADDING = '--' +const NEW_LINE = '\r\n' +const NEW_LINE_BUFFER = Buffer.from(NEW_LINE) + +class Multipart extends Transform { + constructor (options) { + super(Object.assign({}, options, { objectMode: true })) + + this._boundary = this._generateBoundary() + this._files = [] + this._draining = false + } + + _flush () { + this.push(Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE)) + } + + _generateBoundary () { + var boundary = '--------------------------' + for (var i = 0; i < 24; i++) { + boundary += Math.floor(Math.random() * 10).toString(16) + } + + return boundary + } + + _transform (file, encoding, callback) { + if (Buffer.isBuffer(file)) { + this.push(file) + return callback() // early + } + // not a buffer, must be a file + this._files.push(file) + this._maybeDrain(callback) + } + + _maybeDrain (callback) { + if (!this._draining) { + if (this._files.length) { + this._draining = true + const file = this._files.shift() + this._pushFile(file, (err) => { + this._draining = false + if (err) { + this.emit('error', err) + } else { + this._maybeDrain(callback) + } + }) + } else { + this.emit('drained all files') + callback() + } + } else { + this.once('drained all files', callback) + } + } + + _pushFile (file, callback) { + const leading = this._leading(file.headers || {}) + + this.push(leading) + + let content = file.content || Buffer.alloc(0) + + if (Buffer.isBuffer(content)) { + this.push(content) + this.push(NEW_LINE_BUFFER) + return callback() // early + } + + // From now on we assume content is a stream + + content.once('error', this.emit.bind(this, 'error')) + + content.once('end', () => { + this.push(NEW_LINE_BUFFER) + callback() + + // TODO: backpressure!!! wait once self is drained so we can proceed + // This does not work + // this.once('drain', () => { + // callback() + // }) + }) + + content.on('data', (data) => { + this.push(data) + }) + } + + _leading (headers) { + var leading = [PADDING + this._boundary] + + Object.keys(headers).forEach((header) => { + leading.push(header + ': ' + headers[header]) + }) + + leading.push('') + leading.push('') + + const leadingStr = leading.join(NEW_LINE) + + return Buffer.from(leadingStr) + } +} + +module.exports = Multipart diff --git a/src/utils/get-files-stream.js b/src/utils/prepare-file.js similarity index 76% rename from src/utils/get-files-stream.js rename to src/utils/prepare-file.js index 8a44c8f75..988903df7 100644 --- a/src/utils/get-files-stream.js +++ b/src/utils/prepare-file.js @@ -1,28 +1,9 @@ 'use strict' const isNode = require('detect-node') -const Multipart = require('multipart-stream') const flatmap = require('flatmap') const escape = require('glob-escape') -function headers (file) { - const name = file.path - ? encodeURIComponent(file.path) - : '' - - const header = { 'Content-Disposition': `file; filename="${name}"` } - - if (file.dir || !file.content) { - header['Content-Type'] = 'application/x-directory' - } else if (file.symlink) { - header['Content-Type'] = 'application/symlink' - } else { - header['Content-Type'] = 'application/octet-stream' - } - - return header -} - function strip (name, base) { const smallBase = base .split('/') @@ -99,14 +80,10 @@ function loadPaths (opts, file) { } } -function getFilesStream (files, opts) { - if (!files) { - return null - } - - const mp = new Multipart() +function prepareFile (file, opts) { + let files = [].concat(file) - flatmap(files, (file) => { + return flatmap(files, (file) => { if (typeof file === 'string') { if (!isNode) { throw new Error('Can not add paths in node') @@ -130,14 +107,7 @@ function getFilesStream (files, opts) { dir: false, content: file } - }).forEach((file) => { - mp.addPart({ - headers: headers(file), - body: file.content - }) }) - - return mp } -exports = module.exports = getFilesStream +exports = module.exports = prepareFile diff --git a/src/utils/request-api.js b/src/utils/request-api.js index 8cac6cbef..336e356d2 100644 --- a/src/utils/request-api.js +++ b/src/utils/request-api.js @@ -6,7 +6,6 @@ const isNode = require('detect-node') const ndjson = require('ndjson') const pump = require('pump') const once = require('once') -const getFilesStream = require('./get-files-stream') const streamToValue = require('./stream-to-value') const streamToJsonValue = require('./stream-to-json-value') const request = require('./request') @@ -89,10 +88,6 @@ function requestAPI (config, options, callback) { callback = once(callback) options.qs = options.qs || {} - if (Array.isArray(options.files)) { - options.qs.recursive = true - } - if (Array.isArray(options.path)) { options.path = options.path.join('/') } @@ -102,9 +97,6 @@ function requestAPI (config, options, callback) { if (options.args) { options.qs.arg = options.args } - if (options.files && !Array.isArray(options.files)) { - options.files = [options.files] - } if (options.progress) { options.qs.progress = true } @@ -117,9 +109,8 @@ function requestAPI (config, options, callback) { options.qs['stream-channels'] = true - let stream - if (options.files) { - stream = getFilesStream(options.files, options.qs) + if (options.stream) { + options.buffer = false } // this option is only used internally, not passed to daemon @@ -133,12 +124,12 @@ function requestAPI (config, options, callback) { headers['User-Agent'] = config['user-agent'] } - if (options.files) { - if (!stream.boundary) { - return callback(new Error('No boundary in multipart stream')) + if (options.multipart) { + if (!options.multipartBoundary) { + return callback(new Error('No multipartBoundary')) } - headers['Content-Type'] = `multipart/form-data; boundary=${stream.boundary}` + headers['Content-Type'] = `multipart/form-data; boundary=${options.multipartBoundary}` } const qs = Qs.stringify(options.qs, { @@ -174,22 +165,21 @@ function requestAPI (config, options, callback) { return qsDefaultEncoder(data) } }) - const req = request(config.protocol)({ + const reqOptions = { hostname: config.host, path: `${config['api-path']}${options.path}?${qs}`, port: config.port, method: method, headers: headers, protocol: `${config.protocol}:` - }, onRes(options.buffer, callback)) + } + const req = request(config.protocol)(reqOptions, onRes(options.buffer, callback)) req.on('error', (err) => { callback(err) }) - if (options.files) { - stream.pipe(req) - } else { + if (!options.stream) { req.end() } diff --git a/src/utils/send-files-stream.js b/src/utils/send-files-stream.js new file mode 100644 index 000000000..79eec3702 --- /dev/null +++ b/src/utils/send-files-stream.js @@ -0,0 +1,142 @@ +'use strict' + +const Duplex = require('stream').Duplex +const eachSeries = require('async/eachSeries') +const isStream = require('is-stream') +const once = require('once') +const prepareFile = require('./prepare-file') +const Multipart = require('./multipart') +const Converter = require('./converter').ConverterStream + +function headers (file) { + const name = file.path + ? encodeURIComponent(file.path) + : '' + + const header = { 'Content-Disposition': `file; filename="${name}"` } + + if (!file.content) { + header['Content-Type'] = 'application/x-directory' + } else if (file.symlink) { + header['Content-Type'] = 'application/symlink' + } else { + header['Content-Type'] = 'application/octet-stream' + } + + return header +} + +module.exports = (send, path) => { + return (options) => { + let request + let ended = false + let writing = false + + if (!options) { + options = {} + } + + const multipart = new Multipart() + const retStream = new Duplex({ objectMode: true }) + + retStream._read = (n) => {} + + retStream._write = (file, enc, _next) => { + const next = once(_next) + try { + const files = prepareFile(file, Object.assign({}, options, options.qs)).map( + (file) => Object.assign({headers: headers(file)}, file)) + + writing = true + eachSeries( + files, + (file, cb) => multipart.write(file, enc, cb), + (err) => { + writing = false + if (err) { + return next(err) + } + if (ended) { + multipart.end() + } + next() + }) + } catch (err) { + next(err) + } + } + + retStream.once('finish', () => { + if (!ended) { + ended = true + if (!writing) { + multipart.end() + } + } + }) + + const qs = options.qs || {} + + if (options['cid-version'] != null) { + qs['cid-version'] = options['cid-version'] + } else if (options.cidVersion != null) { + qs['cid-version'] = options.cidVersion + } + + if (options['raw-leaves'] != null) { + qs['raw-leaves'] = options['raw-leaves'] + } else if (options.rawLeaves != null) { + qs['raw-leaves'] = options.rawLeaves + } + + if (options.hash != null) { + qs.hash = options.hash + } else if (options.hashAlg != null) { + qs.hash = options.hashAlg + } + + const args = { + path: path, + qs: qs, + args: options.args, + multipart: true, + multipartBoundary: multipart._boundary, + stream: true, + recursive: true, + progress: options.progress + } + + request = send(args, (err, response) => { + if (err) { + return retStream.emit('error', err) + } + + if (!response) { + // no response object, which means + // everything is ok, so we end the + // return stream + return retStream.push(null) // early + } + + if (!isStream(response)) { + retStream.push(response) + retStream.push(null) + return + } + + response.on('data', (d) => { + if (d.Bytes && options.progress) { + options.progress(d.Bytes) + } + }) + const convertedResponse = new Converter() + convertedResponse.once('end', () => retStream.push(null)) + convertedResponse.on('data', (d) => retStream.push(d)) + response.pipe(convertedResponse) + }) + + multipart.pipe(request) + + return retStream + } +} diff --git a/src/utils/send-one-file-multiple-results.js b/src/utils/send-one-file-multiple-results.js new file mode 100644 index 000000000..180a9ad34 --- /dev/null +++ b/src/utils/send-one-file-multiple-results.js @@ -0,0 +1,18 @@ +'use strict' + +const once = require('once') +const ConcatStream = require('concat-stream') +const SendFilesStream = require('./send-files-stream') + +module.exports = (send, path) => { + const sendFilesStream = SendFilesStream(send, path) + return (file, options, _callback) => { + const callback = once(_callback) + const stream = sendFilesStream(options) + const concat = ConcatStream((results) => callback(null, results)) + stream.once('error', callback) + stream.pipe(concat) + stream.write(file) + stream.end() + } +} diff --git a/src/utils/send-one-file.js b/src/utils/send-one-file.js new file mode 100644 index 000000000..80de20842 --- /dev/null +++ b/src/utils/send-one-file.js @@ -0,0 +1,18 @@ +'use strict' + +const SendOneFileMultipleResults = require('./send-one-file-multiple-results') + +module.exports = (send, path) => { + const sendFile = SendOneFileMultipleResults(send, path) + return (file, options, callback) => { + sendFile(file, options, (err, results) => { + if (err) { + return callback(err) + } + if (results.length !== 1) { + return callback(new Error('expected 1 result and had ' + results.length)) + } + callback(null, results[0]) + }) + } +} From f10c5b6bcfd461dad5518948bd5be2a67fa6282b Mon Sep 17 00:00:00 2001 From: Pedro Teixeira Date: Fri, 17 Nov 2017 21:21:26 +0000 Subject: [PATCH 2/5] fixed dangling multipart stream --- src/utils/multipart.js | 1 + src/utils/send-files-stream.js | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/src/utils/multipart.js b/src/utils/multipart.js index 2d28d4cf5..a32ab7d83 100644 --- a/src/utils/multipart.js +++ b/src/utils/multipart.js @@ -17,6 +17,7 @@ class Multipart extends Transform { _flush () { this.push(Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE)) + this.push(null) } _generateBoundary () { diff --git a/src/utils/send-files-stream.js b/src/utils/send-files-stream.js index 79eec3702..dd2f09f82 100644 --- a/src/utils/send-files-stream.js +++ b/src/utils/send-files-stream.js @@ -37,6 +37,7 @@ module.exports = (send, path) => { } const multipart = new Multipart() + const retStream = new Duplex({ objectMode: true }) retStream._read = (n) => {} @@ -106,6 +107,10 @@ module.exports = (send, path) => { progress: options.progress } + multipart.on('error', (err) => { + retStream.emit('error', err) + }) + request = send(args, (err, response) => { if (err) { return retStream.emit('error', err) From f134b7b1dbcdbf560c226a713126037e11321b95 Mon Sep 17 00:00:00 2001 From: Pedro Teixeira Date: Mon, 20 Nov 2017 08:49:39 +0000 Subject: [PATCH 3/5] multipart: better backpressure: waiting for drain before resuming file content --- src/utils/multipart.js | 8 ++++++-- src/utils/send-files-stream.js | 4 ++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/utils/multipart.js b/src/utils/multipart.js index a32ab7d83..cea021894 100644 --- a/src/utils/multipart.js +++ b/src/utils/multipart.js @@ -8,7 +8,7 @@ const NEW_LINE_BUFFER = Buffer.from(NEW_LINE) class Multipart extends Transform { constructor (options) { - super(Object.assign({}, options, { objectMode: true })) + super(Object.assign({}, options, { objectMode: true, highWaterMark: 1 })) this._boundary = this._generateBoundary() this._files = [] @@ -90,7 +90,11 @@ class Multipart extends Transform { }) content.on('data', (data) => { - this.push(data) + const drained = this.push(data) + if (!drained) { + content.pause() + this.once('drain', () => content.resume()) + } }) } diff --git a/src/utils/send-files-stream.js b/src/utils/send-files-stream.js index dd2f09f82..0a40c6445 100644 --- a/src/utils/send-files-stream.js +++ b/src/utils/send-files-stream.js @@ -140,6 +140,10 @@ module.exports = (send, path) => { response.pipe(convertedResponse) }) + // signal the multipart that the underlying stream has drained and that + // it can continue producing data.. + request.on('drain', () => multipart.emit('drain')) + multipart.pipe(request) return retStream From 6583fcf14306e14071163b8b0d3b39ae25a394dc Mon Sep 17 00:00:00 2001 From: Pedro Teixeira Date: Mon, 20 Nov 2017 08:53:27 +0000 Subject: [PATCH 4/5] src/utils/request-api.js renamed to send-request.js --- src/index.js | 4 ++-- src/utils/module-config.js | 6 +++--- src/utils/{request-api.js => send-request.js} | 0 3 files changed, 5 insertions(+), 5 deletions(-) rename src/utils/{request-api.js => send-request.js} (100%) diff --git a/src/index.js b/src/index.js index 4e6443cf3..b56a521a9 100644 --- a/src/index.js +++ b/src/index.js @@ -3,7 +3,7 @@ const multiaddr = require('multiaddr') const loadCommands = require('./utils/load-commands') const getConfig = require('./utils/default-config') -const getRequestAPI = require('./utils/request-api') +const sendRequest = require('./utils/send-request') function IpfsAPI (hostOrMultiaddr, port, opts) { const config = getConfig() @@ -35,7 +35,7 @@ function IpfsAPI (hostOrMultiaddr, port, opts) { config.port = split[1] } - const requestAPI = getRequestAPI(config) + const requestAPI = sendRequest(config) const cmds = loadCommands(requestAPI) cmds.send = requestAPI cmds.Buffer = Buffer diff --git a/src/utils/module-config.js b/src/utils/module-config.js index a05b4d3ab..4e1b0e6a1 100644 --- a/src/utils/module-config.js +++ b/src/utils/module-config.js @@ -1,7 +1,7 @@ 'use strict' const getConfig = require('./default-config') -const requestAPI = require('./request-api') +const sendRequest = require('./send-request') const multiaddr = require('multiaddr') module.exports = (arg) => { @@ -10,12 +10,12 @@ module.exports = (arg) => { if (typeof arg === 'function') { return arg } else if (typeof arg === 'object') { - return requestAPI(arg) + return sendRequest(arg) } else if (typeof arg === 'string') { const maddr = multiaddr(arg).nodeAddress() config.host = maddr.address config.port = maddr.port - return requestAPI(config) + return sendRequest(config) } else { throw new Error('Argument must be a send function or a config object.') } diff --git a/src/utils/request-api.js b/src/utils/send-request.js similarity index 100% rename from src/utils/request-api.js rename to src/utils/send-request.js From 7403afc368c2992a4e19f4ac14b0815fbcb3c045 Mon Sep 17 00:00:00 2001 From: Pedro Teixeira Date: Mon, 20 Nov 2017 09:35:46 +0000 Subject: [PATCH 5/5] only do file backpressure on node because browser HTTP --- src/utils/multipart.js | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/utils/multipart.js b/src/utils/multipart.js index cea021894..bae39e141 100644 --- a/src/utils/multipart.js +++ b/src/utils/multipart.js @@ -1,6 +1,7 @@ 'use strict' const Transform = require('stream').Transform +const isNode = require('detect-node') const PADDING = '--' const NEW_LINE = '\r\n' @@ -91,7 +92,11 @@ class Multipart extends Transform { content.on('data', (data) => { const drained = this.push(data) - if (!drained) { + // Only do the drain dance on Node.js. + // In browserland, the underlying stream + // does NOT drain because the request is only sent + // once this stream ends. + if (!drained && isNode) { content.pause() this.once('drain', () => content.resume()) }