Skip to content
This repository was archived by the owner on Mar 10, 2020. It is now read-only.

[WIP] feat: add support for chunked uploads #851

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
"async": "^2.6.1",
"big.js": "^5.1.2",
"bs58": "^4.0.1",
"buffer-to-stream": "^1.0.0",
"cids": "~0.5.3",
"concat-stream": "^1.6.2",
"content": "^4.0.5",
"debug": "^3.1.0",
"detect-node": "^2.0.3",
"err-code": "^1.1.2",
"filereader-stream": "^2.0.0",
"flatmap": "0.0.3",
"glob": "^7.1.2",
"ipfs-block": "~0.7.1",
Expand All @@ -47,18 +51,20 @@
"multiaddr": "^5.0.0",
"multibase": "~0.4.0",
"multihashes": "~0.4.13",
"nanoid": "^1.2.3",
"ndjson": "^1.5.0",
"once": "^1.4.0",
"peer-id": "~0.11.0",
"peer-info": "~0.14.1",
"promise-nodeify": "^3.0.1",
"promisify-es6": "^1.0.3",
"pull-defer": "~0.2.2",
"pull-pushable": "^2.2.0",
"pull-stream-to-stream": "^1.3.4",
"pump": "^3.0.0",
"qs": "^6.5.2",
"readable-stream": "^2.3.6",
"stream-http": "^2.8.3",
"stream-http": "hugomrdias/stream-http#fix/body-handling",
"stream-to-pull-stream": "^1.7.2",
"streamifier": "~0.1.1",
"tar-stream": "^1.6.1"
Expand Down
121 changes: 121 additions & 0 deletions src/files/add-experimental.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
'use strict'

const { Readable } = require('stream')
const toPull = require('stream-to-pull-stream')
const promiseNodeify = require('promise-nodeify')
const concatStream = require('concat-stream')
const pump = require('pump')
const SendStream = require('../utils/send-stream-experimental')

/** @module api/add */

/**
* Converts an array to a stream
*
* @ignore
* @param {Array} data
* @returns {Readable}
*/
const arrayToStream = (data) => {
let i = 0
return new Readable({
objectMode: true,
read () {
this.push(i < data.length ? data[i++] : null)
}
})
}

/**
* @typedef {Object} AddOptions
* @property {number} chunkSize - Value of array element
* @property {number} [cidVersion=0] - Defaults to 0. The CID version to use when storing the data (storage keys are based on the CID, including it's version)
* @property {function(bytes: number): void} progress - function that will be called with the byte length of chunks as a file is added to ipfs.
* @property {Boolean} recursive - When a Path is passed, this option can be enabled to add recursively all the files.
* @property {string} hashAlg - Multihash hashing algorithm to use. (default: sha2-256) The list of all possible values {@link https://github.com/multiformats/js-multihash/blob/master/src/constants.js#L5-L343 hashAlg values}
* @property {Boolean} wrapWithDirectory - Adds a wrapping node around the content.
* @property {Boolean} onlyHash - Doesn't actually add the file to IPFS, but rather calculates its hash.
* @property {Boolean} [pin=true] - Defaults to true. Pin this object when adding.
* @property {Boolean} [rawLeaves=false] - Defaults to false. If true, DAG leaves will contain raw file data and not be wrapped in a protobuf
* @property {string} [chunker=size-262144] Chunking algorithm used to build ipfs DAGs. Available formats:
* - size-{size}
* - rabin
* - rabin-{avg}
* - rabin-{min}-{avg}-{max}
*/

/**
* @typedef {Object} AddResult
* @property {string} path - Object path
* @property {string} hash - Object CID
* @property {number} size - Object size
*/

/**
* @callback AddCallback
* @param {Error} err
* @param {AddResult[]} res
*/

/** @typedef {Function} PullStream */
/** @typedef {(Object[]|Readable|File|PullStream|Buffer)} AddData */
/** @typedef {function(AddData, AddOptions, AddCallback): (Promise.<AddResult[]>|void)} AddFunction */

/**
* Add to data to ipfs
*
* @param {Function} send
* @returns {AddFunction}
* @memberof api/add
*/
const add = (send) => (data, options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}

let result = []
const r = new Promise((resolve, reject) => {
pump(
arrayToStream([].concat(data)),
new SendStream(send, options),
concatStream(r => (result = r)),
(err) => {
if (err) {
return reject(err)
}
resolve(result)
}
)
})

return promiseNodeify(r, callback)
}

/**
* Add to data to ipfs
*
* @param {Function} send
* @returns {function(AddOptions): Readable}
* @memberof api/add
*/
const addReadableStream = (send) => (options = {}) => {
return new SendStream(send, options)
}

/**
* Add to data to ipfs
*
* @param {Function} send
* @returns {function(AddOptions): PullStream}
* @memberof api/add
*/
const addPullStream = (send) => (options = {}) => {
return toPull(new SendStream(send, options))
}

module.exports = {
add,
addReadableStream,
addPullStream
}
11 changes: 6 additions & 5 deletions src/files/add-pull-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
const SendFilesStream = require('../utils/send-files-stream')
const FileResultStreamConverter = require('../utils/file-result-stream-converter')
const toPull = require('stream-to-pull-stream')
const { addPullStream } = require('./add-experimental')

module.exports = (send) => {
return (options) => {
options = options || {}
options.converter = FileResultStreamConverter
return toPull(SendFilesStream(send, 'add')({ qs: options }))
module.exports = (send) => (options = {}) => {
if (options.experimental) {
return addPullStream(send)(options)
}
options.converter = FileResultStreamConverter
return toPull(SendFilesStream(send, 'add')({ qs: options }))
}
11 changes: 6 additions & 5 deletions src/files/add-readable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

const SendFilesStream = require('../utils/send-files-stream')
const FileResultStreamConverter = require('../utils/file-result-stream-converter')
const { addReadableStream } = require('./add-experimental')

module.exports = (send) => {
return (options) => {
options = options || {}
options.converter = FileResultStreamConverter
return SendFilesStream(send, 'add')(options)
module.exports = (send) => (options = {}) => {
if (options.experimental) {
return addReadableStream(send)(options)
}
options.converter = FileResultStreamConverter
return SendFilesStream(send, 'add')(options)
}
19 changes: 11 additions & 8 deletions src/files/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ const FileResultStreamConverter = require('../utils/file-result-stream-converter
const SendFilesStream = require('../utils/send-files-stream')

module.exports = (send) => {
const createAddStream = SendFilesStream(send, 'add')

const add = promisify((_files, options, _callback) => {
if (typeof options === 'function') {
_callback = options
Expand All @@ -26,18 +24,19 @@ module.exports = (send) => {
options.converter = FileResultStreamConverter

const ok = Buffer.isBuffer(_files) ||
isStream.readable(_files) ||
Array.isArray(_files) ||
OtherBuffer.isBuffer(_files) ||
typeof _files === 'object' ||
isSource(_files)
isStream.readable(_files) ||
Array.isArray(_files) ||
OtherBuffer.isBuffer(_files) ||
typeof _files === 'object' ||
isSource(_files)

if (!ok) {
return callback(new Error('first arg must be a buffer, readable stream, pull stream, an object or array of objects'))
}

const files = [].concat(_files)

const createAddStream = SendFilesStream(send, 'add')
const stream = createAddStream({ qs: options })
const concat = ConcatStream((result) => callback(null, result))
stream.once('error', callback)
Expand All @@ -47,7 +46,11 @@ module.exports = (send) => {
stream.end()
})

return function () {
return function (data, options, callback) {
if (options && options.experimental) {
return require('./add-experimental').add(send)(data, options, callback)
}

const args = Array.from(arguments)

// If we files.add(<pull stream>), then promisify thinks the pull stream is
Expand Down
Loading