Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

Commit dae62cb

Browse files
pgtedaviddias
authored andcommitted
feat: send files HTTP request should stream (#629)
* files add multipart HTTP request streams for real * fixed dangling multipart stream * multipart: better backpressure: waiting for drain before resuming file content * src/utils/request-api.js renamed to send-request.js * only do file backpressure on node because browser HTTP
1 parent d4b8ed1 commit dae62cb

20 files changed

+444
-215
lines changed

package.json

-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
"lru-cache": "^4.1.1",
3939
"multiaddr": "^3.0.1",
4040
"multihashes": "~0.4.12",
41-
"multipart-stream": "^2.0.1",
4241
"ndjson": "^1.5.0",
4342
"once": "^1.4.0",
4443
"peer-id": "~0.10.2",

src/block/put.js

+15-14
Original file line numberDiff line numberDiff line change
@@ -3,34 +3,35 @@
33
const promisify = require('promisify-es6')
44
const Block = require('ipfs-block')
55
const CID = require('cids')
6+
const once = require('once')
7+
const SendOneFile = require('../utils/send-one-file')
68

79
module.exports = (send) => {
8-
return promisify((block, cid, callback) => {
10+
const sendOneFile = SendOneFile(send, 'block/put')
11+
12+
return promisify((block, cid, _callback) => {
913
// TODO this needs to be adjusted with the new go-ipfs http-api
1014
if (typeof cid === 'function') {
11-
callback = cid
15+
_callback = cid
1216
cid = {}
1317
}
1418

19+
const callback = once(_callback)
20+
1521
if (Array.isArray(block)) {
16-
const err = new Error('block.put() only accepts 1 file')
17-
return callback(err)
22+
return callback(new Error('block.put accepts only one block'))
1823
}
1924

2025
if (typeof block === 'object' && block.data) {
2126
block = block.data
2227
}
2328

24-
const request = {
25-
path: 'block/put',
26-
files: block
27-
}
28-
29-
// Transform the response to a Block
30-
const transform = (info, callback) => {
31-
callback(null, new Block(block, new CID(info.Key)))
32-
}
29+
sendOneFile(block, {}, (err, result) => {
30+
if (err) {
31+
return callback(err) // early
32+
}
3333

34-
send.andTransform(request, transform, callback)
34+
callback(null, new Block(block, new CID(result.Key)))
35+
})
3536
})
3637
}

src/files/add-pull-stream.js

+3-27
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,6 @@
11
'use strict'
22

3-
const addCmd = require('./add.js')
4-
const pull = require('pull-stream')
5-
const pushable = require('pull-pushable')
3+
const SendFilesStream = require('../utils/send-files-stream')
4+
const toPull = require('stream-to-pull-stream')
65

7-
module.exports = (send) => {
8-
const add = addCmd(send)
9-
10-
return (options) => {
11-
options = options || {}
12-
13-
const source = pushable()
14-
const sink = pull.collect((err, tuples) => {
15-
if (err) { return source.end(err) }
16-
17-
add(tuples, options, (err, filesAdded) => {
18-
if (err) { return source.end(err) }
19-
20-
filesAdded.forEach((file) => source.push(file))
21-
source.end()
22-
})
23-
})
24-
25-
return {
26-
sink: sink,
27-
source: source
28-
}
29-
}
30-
}
6+
module.exports = (send) => (options) => toPull(SendFilesStream(send, 'add')(options))

src/files/add-readable-stream.js

+2-28
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,5 @@
11
'use strict'
22

3-
const addCmd = require('./add.js')
4-
const Duplex = require('readable-stream').Duplex
3+
const SendFilesStream = require('../utils/send-files-stream')
54

6-
module.exports = (send) => {
7-
const add = addCmd(send)
8-
9-
return (options) => {
10-
options = options || {}
11-
12-
const tuples = []
13-
14-
const ds = new Duplex({ objectMode: true })
15-
ds._read = (n) => {}
16-
17-
ds._write = (file, enc, next) => {
18-
tuples.push(file)
19-
next()
20-
}
21-
22-
ds.end = () => add(tuples, options, (err, res) => {
23-
if (err) { return ds.emit('error', err) }
24-
25-
res.forEach((tuple) => ds.push(tuple))
26-
ds.push(null)
27-
})
28-
29-
return ds
30-
}
31-
}
5+
module.exports = (send) => SendFilesStream(send, 'add')

src/files/add.js

+25-34
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,42 @@
11
'use strict'
22

3-
const isStream = require('is-stream')
43
const promisify = require('promisify-es6')
5-
const ProgressStream = require('../utils/progress-stream')
6-
const converter = require('../utils/converter')
4+
const ConcatStream = require('concat-stream')
5+
const once = require('once')
6+
const isStream = require('is-stream')
7+
const SendFilesStream = require('../utils/send-files-stream')
78

89
module.exports = (send) => {
9-
return promisify((files, opts, callback) => {
10-
if (typeof opts === 'function') {
11-
callback = opts
12-
opts = {}
13-
}
10+
const createAddStream = SendFilesStream(send, 'add')
1411

15-
opts = opts || {}
16-
17-
const ok = Buffer.isBuffer(files) ||
18-
isStream.readable(files) ||
19-
Array.isArray(files)
20-
21-
if (!ok) {
22-
return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
12+
return promisify((_files, options, _callback) => {
13+
if (typeof options === 'function') {
14+
_callback = options
15+
options = null
2316
}
2417

25-
const qs = {}
18+
const callback = once(_callback)
2619

27-
if (opts['cid-version'] != null) {
28-
qs['cid-version'] = opts['cid-version']
29-
} else if (opts.cidVersion != null) {
30-
qs['cid-version'] = opts.cidVersion
20+
if (!options) {
21+
options = {}
3122
}
3223

33-
if (opts['raw-leaves'] != null) {
34-
qs['raw-leaves'] = opts['raw-leaves']
35-
} else if (opts.rawLeaves != null) {
36-
qs['raw-leaves'] = opts.rawLeaves
37-
}
24+
const ok = Buffer.isBuffer(_files) ||
25+
isStream.readable(_files) ||
26+
Array.isArray(_files)
3827

39-
if (opts.hash != null) {
40-
qs.hash = opts.hash
41-
} else if (opts.hashAlg != null) {
42-
qs.hash = opts.hashAlg
28+
if (!ok) {
29+
return callback(new Error('"files" must be a buffer, readable stream, or array of objects'))
4330
}
4431

45-
const request = { path: 'add', files: files, qs: qs, progress: opts.progress }
32+
const files = [].concat(_files)
33+
34+
const stream = createAddStream(options)
35+
const concat = ConcatStream((result) => callback(null, result))
36+
stream.once('error', callback)
37+
stream.pipe(concat)
4638

47-
send.andTransform(request, (response, cb) => {
48-
converter(ProgressStream.fromStream(opts.progress, response), cb)
49-
}, callback)
39+
files.forEach((file) => stream.write(file))
40+
stream.end()
5041
})
5142
}

src/files/write.js

+24-10
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,42 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const concatStream = require('concat-stream')
5+
const once = require('once')
6+
const SendFilesStream = require('../utils/send-files-stream')
47

58
module.exports = (send) => {
6-
return promisify((pathDst, files, opts, callback) => {
9+
const sendFilesStream = SendFilesStream(send, 'files/write')
10+
11+
return promisify((pathDst, _files, opts, _callback) => {
712
if (typeof opts === 'function' &&
8-
!callback) {
9-
callback = opts
13+
!_callback) {
14+
_callback = opts
1015
opts = {}
1116
}
1217

1318
// opts is the real callback --
1419
// 'callback' is being injected by promisify
1520
if (typeof opts === 'function' &&
16-
typeof callback === 'function') {
17-
callback = opts
21+
typeof _callback === 'function') {
22+
_callback = opts
1823
opts = {}
1924
}
2025

21-
send({
22-
path: 'files/write',
26+
const files = [].concat(_files)
27+
const callback = once(_callback)
28+
29+
const options = {
2330
args: pathDst,
24-
qs: opts,
25-
files: files
26-
}, callback)
31+
qs: opts
32+
}
33+
34+
const stream = sendFilesStream(options)
35+
const concat = concatStream((result) => callback(null, result))
36+
stream.once('error', callback)
37+
stream.pipe(concat)
38+
39+
files.forEach((file) => stream.write(file))
40+
stream.end()
2741
})
2842
}

src/index.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
const multiaddr = require('multiaddr')
44
const loadCommands = require('./utils/load-commands')
55
const getConfig = require('./utils/default-config')
6-
const getRequestAPI = require('./utils/request-api')
6+
const sendRequest = require('./utils/send-request')
77

88
function IpfsAPI (hostOrMultiaddr, port, opts) {
99
const config = getConfig()
@@ -35,7 +35,7 @@ function IpfsAPI (hostOrMultiaddr, port, opts) {
3535
config.port = split[1]
3636
}
3737

38-
const requestAPI = getRequestAPI(config)
38+
const requestAPI = sendRequest(config)
3939
const cmds = loadCommands(requestAPI)
4040
cmds.send = requestAPI
4141
cmds.Buffer = Buffer

src/object/appendData.js

+8-7
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const once = require('once')
45
const cleanMultihash = require('../utils/clean-multihash')
6+
const SendOneFile = require('../utils/send-one-file')
57

68
module.exports = (send) => {
79
const objectGet = require('./get')(send)
10+
const sendOneFile = SendOneFile(send, 'object/patch/append-data')
811

9-
return promisify((multihash, data, opts, callback) => {
12+
return promisify((multihash, data, opts, _callback) => {
1013
if (typeof opts === 'function') {
11-
callback = opts
14+
_callback = opts
1215
opts = {}
1316
}
17+
const callback = once(_callback)
1418
if (!opts) {
1519
opts = {}
1620
}
@@ -21,14 +25,11 @@ module.exports = (send) => {
2125
return callback(err)
2226
}
2327

24-
send({
25-
path: 'object/patch/append-data',
26-
args: [multihash],
27-
files: data
28-
}, (err, result) => {
28+
sendOneFile(data, { args: [multihash] }, (err, result) => {
2929
if (err) {
3030
return callback(err)
3131
}
32+
3233
objectGet(result.Hash, { enc: 'base58' }, callback)
3334
})
3435
})

src/object/put.js

+15-8
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,20 @@ const lruOptions = {
99
}
1010

1111
const cache = LRU(lruOptions)
12+
const SendOneFile = require('../utils/send-one-file')
13+
const once = require('once')
1214

1315
module.exports = (send) => {
14-
return promisify((obj, options, callback) => {
16+
const sendOneFile = SendOneFile(send, 'object/put')
17+
18+
return promisify((obj, options, _callback) => {
1519
if (typeof options === 'function') {
16-
callback = options
20+
_callback = options
1721
options = {}
1822
}
23+
24+
const callback = once(_callback)
25+
1926
if (!options) {
2027
options = {}
2128
}
@@ -56,13 +63,13 @@ module.exports = (send) => {
5663
}
5764
const enc = options.enc || 'json'
5865

59-
send({
60-
path: 'object/put',
61-
qs: { inputenc: enc },
62-
files: buf
63-
}, (err, result) => {
66+
const sendOptions = {
67+
qs: { inputenc: enc }
68+
}
69+
70+
sendOneFile(buf, sendOptions, (err, result) => {
6471
if (err) {
65-
return callback(err)
72+
return callback(err) // early
6673
}
6774

6875
if (Buffer.isBuffer(obj)) {

src/object/setData.js

+7-7
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4+
const once = require('once')
45
const cleanMultihash = require('../utils/clean-multihash')
6+
const SendOneFile = require('../utils/send-one-file')
57

68
module.exports = (send) => {
79
const objectGet = require('./get')(send)
10+
const sendOneFile = SendOneFile(send, 'object/patch/set-data')
811

9-
return promisify((multihash, data, opts, callback) => {
12+
return promisify((multihash, data, opts, _callback) => {
1013
if (typeof opts === 'function') {
11-
callback = opts
14+
_callback = opts
1215
opts = {}
1316
}
17+
const callback = once(_callback)
1418
if (!opts) {
1519
opts = {}
1620
}
@@ -21,11 +25,7 @@ module.exports = (send) => {
2125
return callback(err)
2226
}
2327

24-
send({
25-
path: 'object/patch/set-data',
26-
args: [multihash],
27-
files: data
28-
}, (err, result) => {
28+
sendOneFile(data, { args: [multihash] }, (err, result) => {
2929
if (err) {
3030
return callback(err)
3131
}

0 commit comments

Comments
 (0)