This repository has been archived by the owner on Mar 10, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 300
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: send files HTTP request should stream (#629)
* files add multipart HTTP request streams for real * fixed dangling multipart stream * multipart: better backpressure: waiting for drain before resuming file content * src/utils/request-api.js renamed to send-request.js * only do file backpressure on node because browser HTTP
- Loading branch information
Showing
20 changed files
with
444 additions
and
215 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,30 +1,6 @@ | ||
'use strict' | ||
|
||
const addCmd = require('./add.js') | ||
const pull = require('pull-stream') | ||
const pushable = require('pull-pushable') | ||
const SendFilesStream = require('../utils/send-files-stream') | ||
const toPull = require('stream-to-pull-stream') | ||
|
||
module.exports = (send) => { | ||
const add = addCmd(send) | ||
|
||
return (options) => { | ||
options = options || {} | ||
|
||
const source = pushable() | ||
const sink = pull.collect((err, tuples) => { | ||
if (err) { return source.end(err) } | ||
|
||
add(tuples, options, (err, filesAdded) => { | ||
if (err) { return source.end(err) } | ||
|
||
filesAdded.forEach((file) => source.push(file)) | ||
source.end() | ||
}) | ||
}) | ||
|
||
return { | ||
sink: sink, | ||
source: source | ||
} | ||
} | ||
} | ||
module.exports = (send) => (options) => toPull(SendFilesStream(send, 'add')(options)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,31 +1,5 @@ | ||
'use strict' | ||
|
||
const addCmd = require('./add.js') | ||
const Duplex = require('readable-stream').Duplex | ||
const SendFilesStream = require('../utils/send-files-stream') | ||
|
||
module.exports = (send) => { | ||
const add = addCmd(send) | ||
|
||
return (options) => { | ||
options = options || {} | ||
|
||
const tuples = [] | ||
|
||
const ds = new Duplex({ objectMode: true }) | ||
ds._read = (n) => {} | ||
|
||
ds._write = (file, enc, next) => { | ||
tuples.push(file) | ||
next() | ||
} | ||
|
||
ds.end = () => add(tuples, options, (err, res) => { | ||
if (err) { return ds.emit('error', err) } | ||
|
||
res.forEach((tuple) => ds.push(tuple)) | ||
ds.push(null) | ||
}) | ||
|
||
return ds | ||
} | ||
} | ||
module.exports = (send) => SendFilesStream(send, 'add') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,51 +1,42 @@ | ||
'use strict' | ||
|
||
const isStream = require('is-stream') | ||
const promisify = require('promisify-es6') | ||
const ProgressStream = require('../utils/progress-stream') | ||
const converter = require('../utils/converter') | ||
const ConcatStream = require('concat-stream') | ||
const once = require('once') | ||
const isStream = require('is-stream') | ||
const SendFilesStream = require('../utils/send-files-stream') | ||
|
||
module.exports = (send) => { | ||
return promisify((files, opts, callback) => { | ||
if (typeof opts === 'function') { | ||
callback = opts | ||
opts = {} | ||
} | ||
const createAddStream = SendFilesStream(send, 'add') | ||
|
||
opts = opts || {} | ||
|
||
const ok = Buffer.isBuffer(files) || | ||
isStream.readable(files) || | ||
Array.isArray(files) | ||
|
||
if (!ok) { | ||
return callback(new Error('"files" must be a buffer, readable stream, or array of objects')) | ||
return promisify((_files, options, _callback) => { | ||
if (typeof options === 'function') { | ||
_callback = options | ||
options = null | ||
} | ||
|
||
const qs = {} | ||
const callback = once(_callback) | ||
|
||
if (opts['cid-version'] != null) { | ||
qs['cid-version'] = opts['cid-version'] | ||
} else if (opts.cidVersion != null) { | ||
qs['cid-version'] = opts.cidVersion | ||
if (!options) { | ||
options = {} | ||
} | ||
|
||
if (opts['raw-leaves'] != null) { | ||
qs['raw-leaves'] = opts['raw-leaves'] | ||
} else if (opts.rawLeaves != null) { | ||
qs['raw-leaves'] = opts.rawLeaves | ||
} | ||
const ok = Buffer.isBuffer(_files) || | ||
isStream.readable(_files) || | ||
Array.isArray(_files) | ||
|
||
if (opts.hash != null) { | ||
qs.hash = opts.hash | ||
} else if (opts.hashAlg != null) { | ||
qs.hash = opts.hashAlg | ||
if (!ok) { | ||
return callback(new Error('"files" must be a buffer, readable stream, or array of objects')) | ||
} | ||
|
||
const request = { path: 'add', files: files, qs: qs, progress: opts.progress } | ||
const files = [].concat(_files) | ||
|
||
const stream = createAddStream(options) | ||
const concat = ConcatStream((result) => callback(null, result)) | ||
stream.once('error', callback) | ||
stream.pipe(concat) | ||
|
||
send.andTransform(request, (response, cb) => { | ||
converter(ProgressStream.fromStream(opts.progress, response), cb) | ||
}, callback) | ||
files.forEach((file) => stream.write(file)) | ||
stream.end() | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,28 +1,42 @@ | ||
'use strict' | ||
|
||
const promisify = require('promisify-es6') | ||
const concatStream = require('concat-stream') | ||
const once = require('once') | ||
const SendFilesStream = require('../utils/send-files-stream') | ||
|
||
module.exports = (send) => { | ||
return promisify((pathDst, files, opts, callback) => { | ||
const sendFilesStream = SendFilesStream(send, 'files/write') | ||
|
||
return promisify((pathDst, _files, opts, _callback) => { | ||
if (typeof opts === 'function' && | ||
!callback) { | ||
callback = opts | ||
!_callback) { | ||
_callback = opts | ||
opts = {} | ||
} | ||
|
||
// opts is the real callback -- | ||
// 'callback' is being injected by promisify | ||
if (typeof opts === 'function' && | ||
typeof callback === 'function') { | ||
callback = opts | ||
typeof _callback === 'function') { | ||
_callback = opts | ||
opts = {} | ||
} | ||
|
||
send({ | ||
path: 'files/write', | ||
const files = [].concat(_files) | ||
const callback = once(_callback) | ||
|
||
const options = { | ||
args: pathDst, | ||
qs: opts, | ||
files: files | ||
}, callback) | ||
qs: opts | ||
} | ||
|
||
const stream = sendFilesStream(options) | ||
const concat = concatStream((result) => callback(null, result)) | ||
stream.once('error', callback) | ||
stream.pipe(concat) | ||
|
||
files.forEach((file) => stream.write(file)) | ||
stream.end() | ||
}) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.