diff --git a/package.json b/package.json index 3c49b0c1c..ecf20a945 100644 --- a/package.json +++ b/package.json @@ -29,10 +29,14 @@ "async": "^2.6.1", "big.js": "^5.1.2", "bs58": "^4.0.1", + "buffer-to-stream": "^1.0.0", "cids": "~0.5.3", "concat-stream": "^1.6.2", + "content": "^4.0.5", "debug": "^3.1.0", "detect-node": "^2.0.3", + "err-code": "^1.1.2", + "filereader-stream": "^2.0.0", "flatmap": "0.0.3", "glob": "^7.1.2", "ipfs-block": "~0.7.1", @@ -47,10 +51,12 @@ "multiaddr": "^5.0.0", "multibase": "~0.4.0", "multihashes": "~0.4.13", + "nanoid": "^1.2.3", "ndjson": "^1.5.0", "once": "^1.4.0", "peer-id": "~0.11.0", "peer-info": "~0.14.1", + "promise-nodeify": "^3.0.1", "promisify-es6": "^1.0.3", "pull-defer": "~0.2.2", "pull-pushable": "^2.2.0", @@ -58,7 +64,7 @@ "pump": "^3.0.0", "qs": "^6.5.2", "readable-stream": "^2.3.6", - "stream-http": "^2.8.3", + "stream-http": "hugomrdias/stream-http#fix/body-handling", "stream-to-pull-stream": "^1.7.2", "streamifier": "~0.1.1", "tar-stream": "^1.6.1" diff --git a/src/files/add-experimental.js b/src/files/add-experimental.js new file mode 100644 index 000000000..53cdc3ebe --- /dev/null +++ b/src/files/add-experimental.js @@ -0,0 +1,121 @@ +'use strict' + +const { Readable } = require('stream') +const toPull = require('stream-to-pull-stream') +const promiseNodeify = require('promise-nodeify') +const concatStream = require('concat-stream') +const pump = require('pump') +const SendStream = require('../utils/send-stream-experimental') + +/** @module api/add */ + +/** + * Converts an array to a stream + * + * @ignore + * @param {Array} data + * @returns {Readable} + */ +const arrayToStream = (data) => { + let i = 0 + return new Readable({ + objectMode: true, + read () { + this.push(i < data.length ? data[i++] : null) + } + }) +} + +/** + * @typedef {Object} AddOptions + * @property {number} chunkSize - Value of array element + * @property {number} [cidVersion=0] - Defaults to 0. The CID version to use when storing the data (storage keys are based on the CID, including it's version) + * @property {function(bytes: number): void} progress - function that will be called with the byte length of chunks as a file is added to ipfs. + * @property {Boolean} recursive - When a Path is passed, this option can be enabled to add recursively all the files. + * @property {string} hashAlg - Multihash hashing algorithm to use. (default: sha2-256) The list of all possible values {@link https://github.com/multiformats/js-multihash/blob/master/src/constants.js#L5-L343 hashAlg values} + * @property {Boolean} wrapWithDirectory - Adds a wrapping node around the content. + * @property {Boolean} onlyHash - Doesn't actually add the file to IPFS, but rather calculates its hash. + * @property {Boolean} [pin=true] - Defaults to true. Pin this object when adding. + * @property {Boolean} [rawLeaves=false] - Defaults to false. If true, DAG leaves will contain raw file data and not be wrapped in a protobuf + * @property {string} [chunker=size-262144] Chunking algorithm used to build ipfs DAGs. Available formats: + * - size-{size} + * - rabin + * - rabin-{avg} + * - rabin-{min}-{avg}-{max} + */ + +/** + * @typedef {Object} AddResult + * @property {string} path - Object path + * @property {string} hash - Object CID + * @property {number} size - Object size + */ + +/** + * @callback AddCallback + * @param {Error} err + * @param {AddResult[]} res + */ + +/** @typedef {Function} PullStream */ +/** @typedef {(Object[]|Readable|File|PullStream|Buffer)} AddData */ +/** @typedef {function(AddData, AddOptions, AddCallback): (Promise.<AddResult[]>|void)} AddFunction */ + +/** + * Add to data to ipfs + * + * @param {Function} send + * @returns {AddFunction} + * @memberof api/add + */ +const add = (send) => (data, options, callback) => { + if (typeof options === 'function') { + callback = options + options = {} + } + + let result = [] + const r = new Promise((resolve, reject) => { + pump( + arrayToStream([].concat(data)), + new SendStream(send, options), + concatStream(r => (result = r)), + (err) => { + if (err) { + return reject(err) + } + resolve(result) + } + ) + }) + + return promiseNodeify(r, callback) +} + +/** + * Add to data to ipfs + * + * @param {Function} send + * @returns {function(AddOptions): Readable} + * @memberof api/add + */ +const addReadableStream = (send) => (options = {}) => { + return new SendStream(send, options) +} + +/** + * Add to data to ipfs + * + * @param {Function} send + * @returns {function(AddOptions): PullStream} + * @memberof api/add + */ +const addPullStream = (send) => (options = {}) => { + return toPull(new SendStream(send, options)) +} + +module.exports = { + add, + addReadableStream, + addPullStream +} diff --git a/src/files/add-pull-stream.js b/src/files/add-pull-stream.js index 2076ffa8d..1b0875f21 100644 --- a/src/files/add-pull-stream.js +++ b/src/files/add-pull-stream.js @@ -3,11 +3,12 @@ const SendFilesStream = require('../utils/send-files-stream') const FileResultStreamConverter = require('../utils/file-result-stream-converter') const toPull = require('stream-to-pull-stream') +const { addPullStream } = require('./add-experimental') -module.exports = (send) => { - return (options) => { - options = options || {} - options.converter = FileResultStreamConverter - return toPull(SendFilesStream(send, 'add')({ qs: options })) +module.exports = (send) => (options = {}) => { + if (options.experimental) { + return addPullStream(send)(options) } + options.converter = FileResultStreamConverter + return toPull(SendFilesStream(send, 'add')({ qs: options })) } diff --git a/src/files/add-readable-stream.js b/src/files/add-readable-stream.js index 320abe692..6dbb687bc 100644 --- a/src/files/add-readable-stream.js +++ b/src/files/add-readable-stream.js @@ -2,11 +2,12 @@ const SendFilesStream = require('../utils/send-files-stream') const FileResultStreamConverter = require('../utils/file-result-stream-converter') +const { addReadableStream } = require('./add-experimental') -module.exports = (send) => { - return (options) => { - options = options || {} - options.converter = FileResultStreamConverter - return SendFilesStream(send, 'add')(options) +module.exports = (send) => (options = {}) => { + if (options.experimental) { + return addReadableStream(send)(options) } + options.converter = FileResultStreamConverter + return SendFilesStream(send, 'add')(options) } diff --git a/src/files/add.js b/src/files/add.js index f706843a8..8ffd4d3d2 100644 --- a/src/files/add.js +++ b/src/files/add.js @@ -10,8 +10,6 @@ const FileResultStreamConverter = require('../utils/file-result-stream-converter const SendFilesStream = require('../utils/send-files-stream') module.exports = (send) => { - const createAddStream = SendFilesStream(send, 'add') - const add = promisify((_files, options, _callback) => { if (typeof options === 'function') { _callback = options @@ -26,11 +24,11 @@ module.exports = (send) => { options.converter = FileResultStreamConverter const ok = Buffer.isBuffer(_files) || - isStream.readable(_files) || - Array.isArray(_files) || - OtherBuffer.isBuffer(_files) || - typeof _files === 'object' || - isSource(_files) + isStream.readable(_files) || + Array.isArray(_files) || + OtherBuffer.isBuffer(_files) || + typeof _files === 'object' || + isSource(_files) if (!ok) { return callback(new Error('first arg must be a buffer, readable stream, pull stream, an object or array of objects')) @@ -38,6 +36,7 @@ module.exports = (send) => { const files = [].concat(_files) + const createAddStream = SendFilesStream(send, 'add') const stream = createAddStream({ qs: options }) const concat = ConcatStream((result) => callback(null, result)) stream.once('error', callback) @@ -47,7 +46,11 @@ module.exports = (send) => { stream.end() }) - return function () { + return function (data, options, callback) { + if (options && options.experimental) { + return require('./add-experimental').add(send)(data, options, callback) + } + const args = Array.from(arguments) // If we files.add(<pull stream>), then promisify thinks the pull stream is diff --git a/src/utils/multipart-experimental.js b/src/utils/multipart-experimental.js new file mode 100644 index 000000000..59b83ec44 --- /dev/null +++ b/src/utils/multipart-experimental.js @@ -0,0 +1,202 @@ +'use strict' + +const { Duplex, PassThrough } = require('stream') +const { isSource } = require('is-pull-stream') +const pump = require('pump') +const pullToStream = require('pull-stream-to-stream') +const bufferToStream = require('buffer-to-stream') + +/** @ignore @typedef {import("../files/add-experimental").AddOptions} AddOptions */ + +const PADDING = '--' +const NEW_LINE = '\r\n' +const NEW_LINE_BUFFER = Buffer.from(NEW_LINE) + +/** + * Generate a random boundary to use in a multipart request + * + * @ignore + * @returns {string} + */ +const generateBoundary = () => { + var boundary = '--------------------------' + for (var i = 0; i < 24; i++) { + boundary += Math.floor(Math.random() * 10).toString(16) + } + + return boundary +} + +/** + * Generate leading section for a multipart body + * + * @ignore + * @param {Object} [headers={}] + * @param {string} boundary + * @returns {string} + */ +const leading = (headers = {}, boundary) => { + var leading = [PADDING + boundary] + + Object.keys(headers).forEach((header) => { + leading.push(header + ': ' + headers[header]) + }) + + leading.push('') + leading.push('') + + const leadingStr = leading.join(NEW_LINE) + + return Buffer.from(leadingStr) +} + +/** + * Multipart class to generate a multipart body chunked and non chunked + * + * @ignore + * @class Multipart + * @extends {Duplex} + */ +class Multipart extends Duplex { + /** + * Creates an instance of Multipart. + * @param {AddOptions} options + */ + constructor (options) { + super({ + writableObjectMode: true, + writableHighWaterMark: 1, + readableHighWaterMark: options.chunkSize ? Math.max(136, options.chunkSize) : 16384 // min is 136 + }) + + this._boundary = generateBoundary() + this.source = null + this.chunkSize = options.chunkSize || 0 + this.buffer = Buffer.alloc(this.chunkSize) + this.bufferOffset = 0 + this.extraBytes = 0 + this.sourceReadable = false + } + + _read () { + // empty read + } + + _write (file, encoding, callback) { + this.pushFile(file, () => { + callback() + }) + } + + _final (callback) { + // Flush the rest and finish + const tail = Buffer.from(PADDING + this._boundary + PADDING + NEW_LINE) + if (this.chunkSize === 0) { + this.push(tail) + } else { + this.extraBytes += tail.length + const slice = this.buffer.slice(0, this.bufferOffset) + + this.bufferOffset = 0 + this.push(Buffer.concat([slice, tail], slice.length + tail.length)) + } + + this.push(null) + callback() + } + + resume () { + super.resume() + + // Chunked mode + if (this.chunkSize > 0 && this.sourceReadable) { + let chunk + while (!this.isPaused() && (chunk = this.source.read(this.chunkSize - this.bufferOffset)) !== null) { + this.pushChunk(chunk) + } + } + } + + /** + * Push chunk + * + * @param {Buffer} chunk + * @param {boolean} [isExtra=false] + * @return {boolean} + */ + pushChunk (chunk, isExtra = false) { + if (chunk === null || this.chunkSize === 0) { + return this.push(chunk) + } + + if (isExtra) { + this.extraBytes += chunk.length + } + + if (this.bufferOffset === 0 && chunk.length === this.chunkSize) { + return this.push(chunk) + } + + const bytesNeeded = (this.chunkSize - this.bufferOffset) + // make sure we have the correct amount of bytes + if (chunk.length === bytesNeeded) { + const slice = this.buffer.slice(0, this.bufferOffset) + this.bufferOffset = 0 + return this.push(Buffer.concat([slice, chunk], slice.length + chunk.length)) + } + + if (chunk.length > bytesNeeded) { + this.emit('error', new RangeError(`Chunk is too big needed ${bytesNeeded} got ${chunk.length}`)) + return false + } + + chunk.copy(this.buffer, this.bufferOffset) + this.bufferOffset += chunk.length + + return true + } + + /** + * Consume file and push to readable stream + * + * @param {(Buffer|Readable|PullStream)} file + * @param {function(): void} callback + * @return {void} + */ + pushFile (file, callback) { + this.pushChunk(leading(file.headers, this._boundary), true) + + this.source = file.content || Buffer.alloc(0) + + if (Buffer.isBuffer(this.source)) { + this.source = bufferToStream(this.source) + } else if (isSource(file.content)) { + // pull-stream-to-stream doesn't support readable event... + this.source = pump([pullToStream.source(file.content), new PassThrough()]) + } + + this.source.on('readable', () => { + this.sourceReadable = true + let chunk = null + if (this.chunkSize === 0) { + if ((chunk = this.source.read()) !== null) { + this.pushChunk(chunk) + } + } else { + while (!this.isPaused() && (chunk = this.source.read(this.chunkSize - this.bufferOffset)) !== null) { + this.pushChunk(chunk) + } + } + }) + + this.source.on('end', () => { + this.sourceReadable = false + this.pushChunk(NEW_LINE_BUFFER, true) + callback() + }) + + this.source.on('error', err => this.emit('error', err)) + } +} + +module.exports = Multipart diff --git a/src/utils/prepare-file.js b/src/utils/prepare-file.js index 738c4a4c0..3bed54d79 100644 --- a/src/utils/prepare-file.js +++ b/src/utils/prepare-file.js @@ -1,106 +1,80 @@ 'use strict' -const isNode = require('detect-node') +const errcode = require('err-code') +const { isSource } = require('is-pull-stream') +const isStream = require('is-stream') const flatmap = require('flatmap') +const fileReaderStream = require('filereader-stream') -function loadPaths (opts, file) { - const path = require('path') - const fs = require('fs') - const glob = require('glob') +const isBrowser = typeof window === 'object' && + typeof document === 'object' && + document.nodeType === 9 - const followSymlinks = opts.followSymlinks != null ? opts.followSymlinks : true +function prepareFile (file, opts) { + let files = [].concat(file) - file = path.resolve(file) - const stats = fs.statSync(file) + return flatmap(files, (file) => { + return prepare(file, opts) + }) +} - if (stats.isDirectory() && !opts.recursive) { - throw new Error('Can only add directories using --recursive') +function prepare (file, opts) { + const result = { + path: '', + symlink: false, + dir: false, + content: null } - - if (stats.isDirectory() && opts.recursive) { - // glob requires a POSIX filename - file = file.split(path.sep).join('/') - const fullDir = file + (file.endsWith('/') ? '' : '/') - let dirName = fullDir.split('/') - dirName = dirName[dirName.length - 2] + '/' - const mg = new glob.sync.GlobSync('**/*', { - cwd: file, - follow: followSymlinks, - dot: opts.hidden, - ignore: opts.ignore - }) - - return mg.found - .map((name) => { - const fqn = fullDir + name - // symlinks - if (mg.symlinks[fqn] === true) { - return { - path: dirName + name, - symlink: true, - dir: false, - content: fs.readlinkSync(fqn) - } - } - - // files - if (mg.cache[fqn] === 'FILE') { - return { - path: dirName + name, - symlink: false, - dir: false, - content: fs.createReadStream(fqn) - } - } - - // directories - if (mg.cache[fqn] === 'DIR' || mg.cache[fqn] instanceof Array) { - return { - path: dirName + name, - symlink: false, - dir: true - } - } - // files inside symlinks and others - }) - // filter out null files - .filter(Boolean) + // probably it should be valid and would be handled below with Buffer.from + if (typeof file === 'string') { + throw errcode(new Error('String isn\'t valid as an input'), 'ERR_INVALID_INPUT') } - return { - path: path.basename(file), - content: fs.createReadStream(file) + // needs to test for stream because fs.createReadStream has path prop and would handle here + if (!isStream(file) && file.path && !file.content) { // {path, content} input with no content so we assume directory + result.dir = true + } else if (file.content || file.dir) { // {path, content} input with content or dir just copy + result.content = file.content + result.dir = file.dir + } else if (isBrowser && file instanceof self.File) { // browser File input we create a stream from it + result.path = file.name + result.content = fileReaderStream(file, opts) + } else if (!isStream(file) && !isSource(file) && !Buffer.isBuffer(file)) { // if not pull-stream, stream or buffer try to create a buffer from input + result.content = Buffer.from(file) + } else { // here we only have pull-stream, stream or buffer so we just set content to input + result.content = file } + + return result } -function prepareFile (file, opts) { - let files = [].concat(file) +function prepareWithHeaders (file, opts) { + const obj = prepare(file, opts) - return flatmap(files, (file) => { - if (typeof file === 'string') { - if (!isNode) { - throw new Error('Can only add file paths in node') - } + obj.headers = headers(obj) + return obj +} - return loadPaths(opts, file) - } +function headers (file) { + const name = file.path + ? encodeURIComponent(file.path) + : '' - if (file.path && !file.content) { - file.dir = true - return file - } + const header = { 'Content-Disposition': `file; filename="${name}"` } - if (file.content || file.dir) { - return file - } + if (!file.content) { + header['Content-Type'] = 'application/x-directory' + } else if (file.symlink) { + header['Content-Type'] = 'application/symlink' + } else { + header['Content-Type'] = 'application/octet-stream' + } - return { - path: '', - symlink: false, - dir: false, - content: file - } - }) + return header } -exports = module.exports = prepareFile +module.exports = { + prepareFile, + prepare, + prepareWithHeaders +} diff --git a/src/utils/send-files-stream.js b/src/utils/send-files-stream.js index 46a57e383..b87a94da2 100644 --- a/src/utils/send-files-stream.js +++ b/src/utils/send-files-stream.js @@ -4,7 +4,7 @@ const Duplex = require('stream').Duplex const eachSeries = require('async/eachSeries') const isStream = require('is-stream') const once = require('once') -const prepareFile = require('./prepare-file') +const { prepareFile } = require('./prepare-file') const Multipart = require('./multipart') function headers (file) { diff --git a/src/utils/send-request.js b/src/utils/send-request.js index c8dfb4dd8..81ad8f83e 100644 --- a/src/utils/send-request.js +++ b/src/utils/send-request.js @@ -4,6 +4,7 @@ const Qs = require('qs') const qsDefaultEncoder = require('qs/lib/utils').encode const isNode = require('detect-node') const ndjson = require('ndjson') +const { Transform } = require('stream') const pump = require('pump') const once = require('once') const streamToValue = require('./stream-to-value') @@ -52,22 +53,21 @@ function onRes (buffer, cb) { return cb(null, res) } - // Return a stream of JSON objects if (chunkedObjects && isJson) { - const outputStream = ndjson.parse() - pump(res, outputStream) - res.on('end', () => { - let err = res.trailers['x-stream-error'] - if (err) { - // Not all errors are JSON - try { - err = JSON.parse(err) - } catch (e) { - err = { Message: err } + const outputStream = pump( + res, + ndjson.parse(), + new Transform({ + objectMode: true, + transform (chunk, encoding, callback) { + if (chunk.Type && chunk.Type === 'error') { + callback(new Error(chunk.Message)) + } else { + callback(null, chunk) + } } - outputStream.emit('error', new Error(err.Message)) - } - }) + }) + ) return cb(null, outputStream) } @@ -114,7 +114,7 @@ function requestAPI (config, options, callback) { delete options.qs.followSymlinks const method = 'POST' - const headers = Object.assign({}, config.headers) + const headers = Object.assign({}, config.headers, options.headers) if (isNode) { // Browsers do not allow you to modify the user agent diff --git a/src/utils/send-stream-experimental.js b/src/utils/send-stream-experimental.js new file mode 100644 index 000000000..b5866aeba --- /dev/null +++ b/src/utils/send-stream-experimental.js @@ -0,0 +1,199 @@ +'use strict' +const { Duplex, Transform } = require('stream') +const isStream = require('is-stream') +const nanoid = require('nanoid') +const pump = require('pump') +const Multipart = require('./multipart-experimental') +const { prepareWithHeaders } = require('./prepare-file') + +/** @ignore @typedef {import("../files/add-experimental").AddOptions} AddOptions */ + +const noop = () => {} + +/** + * Convert back to the proper schema + * + * @ignore + * @param {Object} data + * @returns {Object} + */ +const convert = (data) => { + return { + path: data.Name, + hash: data.Hash, + size: data.Size + } +} + +/** + * Factory for prepare stream + * @ignore + * @param {*} options + * @returns {Function} + */ +const prepareTransform = (options) => new Transform({ + objectMode: true, + transform (chunk, encoding, callback) { + try { + callback(null, prepareWithHeaders(chunk, options)) + } catch (err) { + callback(err) + } + } +}) + +/** + * Class to create a stream to send data to the API + * + * @ignore + * @class SendStream + * @extends {Duplex} + */ +class SendStream extends Duplex { + /** + * Creates an instance of SendStream. + * @param {Function} send + * @param {AddOptions} [options={}] + */ + constructor (send, options = {}) { + super({ objectMode: true, highWaterMark: 1 }) + this.options = options + this.send = send + this.multipart = new Multipart(options) + this.boundary = this.multipart._boundary + this.uuid = nanoid() + this.index = 0 + this.rangeStart = 0 + this.rangeEnd = 0 + this.rangeTotal = 0 + this.qs = { + 'cid-version': this.options['cid-version'], + 'raw-leaves': this.options['raw-leaves'], + 'only-hash': this.options.onlyHash, + 'wrap-with-directory': this.options.wrapWithDirectory, + hash: this.options.hashAlg || this.options.hash + } + + this.args = { + path: 'add-chunked', + qs: this.qs, + args: this.options.args, + stream: true, + recursive: true, + progress: Boolean(this.options.progress), + multipart: true, + multipartBoundary: this.boundary + } + + this.source = prepareTransform(options) + + pump([ + this.source, + this.multipart, + !options.chunkSize && this.request() + ].filter(Boolean), (err) => { + if (err) { + this.emit('error', err) + } + }) + + if (options.chunkSize) { + this.multipart.on('data', this.onData.bind(this)) + } + } + + _read () { + // duplex stream needs to implement _read() + } + + _write (chunk, encoding, callback) { + this.source.write(chunk) + callback() + } + + _final () { + this.source.end() + } + + onData (chunk) { + this.multipart.pause() + // stop producing chunks + this.index++ + this.rangeEnd = this.rangeStart + chunk.length + this.rangeTotal += chunk.length + this.requestChunk(chunk) + .then(() => { + this.multipart.resume() + }) + this.rangeStart = this.rangeEnd + } + + requestChunk (chunk) { + this.args.headers = { + 'Content-Range': `bytes ${this.rangeStart}-${this.rangeEnd}/${this.rangeTotal}`, + 'X-Chunked-Input': `uuid="${this.uuid}"; index=${this.index}` + } + return new Promise((resolve, reject) => { + const progressFn = this.options.progress || noop + const req = this.send(this.args, (err, res) => { + if (err) { + return this.emit('error', err) + } + + // progress upload reporting + const totalUp = Math.max((this.rangeTotal - this.multipart.extraBytes) / 2, 0) + progressFn(totalUp) + // we are in the last request + if (isStream(res)) { + res.on('data', d => { + if (d.Hash) { + // files added reporting + this.push(convert(d)) + } else { + // progress add reporting + progressFn((d.Bytes / 2) + totalUp) + } + }) + res.on('error', err => this.emit('error', err)) + res.on('end', () => { + resolve() + this.push(null) + }) + } else { + resolve() + } + }) + + // write and send + if (chunk !== null) { + req.write(Buffer.from(chunk)) + } + req.end() + }) + } + + request () { + const progressFn = this.options.progress || noop + return this.send(this.args, (err, res) => { + if (err) { + return this.emit('error', err) + } + + res.on('data', (d) => { + if (d.Hash) { + // files added reporting + this.push(convert(d)) + } else { + // progress add reporting + progressFn(d.Bytes) + } + }) + res.on('error', err => this.emit('error', err)) + res.on('end', () => { + this.push(null) + }) + }) + } +} + +module.exports = SendStream diff --git a/test/add-experimental.spec.js b/test/add-experimental.spec.js new file mode 100644 index 000000000..b99522de8 --- /dev/null +++ b/test/add-experimental.spec.js @@ -0,0 +1,516 @@ +/* eslint-env mocha */ +/* eslint max-nested-callbacks: ["error", 8] */ +'use strict' + +const chai = require('chai') +const dirtyChai = require('dirty-chai') +const expect = chai.expect +chai.use(dirtyChai) +const loadFixture = require('aegir/fixtures') +const mh = require('multihashes') +const CID = require('cids') +const path = require('path') +const { Readable } = require('stream') +const pull = require('pull-stream') +const IPFSApi = require('../src') +const f = require('./utils/factory') +const expectTimeout = require('./utils/expect-timeout') +const testfile = loadFixture('test/fixtures/2048') + +// TODO: Test against all algorithms Object.keys(mh.names) +// This subset is known to work with both go-ipfs and js-ipfs as of 2017-09-05 +const HASH_ALGS = [ + 'sha1', + 'sha2-256', + 'sha2-512', + 'keccak-224', + 'keccak-256', + 'keccak-384', + 'keccak-512' +] + +const runs = [ + {name: 'chunked', options: {experimental: true, chunkSize: 2 * 1024}}, + {name: 'non-chunked', options: {experimental: true}}, + {name: 'current non-chunked', options: {}} +] + +describe.only('experimental add', function () { + this.timeout(120 * 1000) + + let ipfsd + let ipfs + + ipfs = IPFSApi('localhost', '5002') + const expectedMultihash = 'QmcfPue16BgM2UqRg7tkoqbLgW4PKZok2HKyn9YEu1Eiyz' + + // before((done) => { + // f.spawn({ initOptions: { bits: 1024 } }, (err, _ipfsd) => { + // expect(err).to.not.exist() + // ipfsd = _ipfsd + // ipfs = IPFSApi(_ipfsd.apiAddr) + // done() + // }) + // }) + + // after((done) => { + // if (!ipfsd) return done() + // ipfsd.stop(done) + // }) + runs.forEach(run => { + it(`files.add ${run.name} - file for testing`, (done) => { + ipfs.files.add(testfile, run.options, (err, res) => { + expect(err).to.not.exist() + expect(res).to.have.length(1) + expect(res[0].hash).to.equal(expectedMultihash) + expect(res[0].path).to.equal(expectedMultihash) + done() + }) + }) + + it(`files.add ${run.name} - with Buffer module`, (done) => { + let Buffer = require('buffer').Buffer + + let expectedBufferMultihash = 'QmWfVY9y3xjsixTgbd9AorQxH7VtMpzfx2HaWtsoUYecaX' + let file = Buffer.from('hello') + + ipfs.files.add(file, run.options, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(res[0].hash).to.equal(expectedBufferMultihash) + expect(res[0].path).to.equal(expectedBufferMultihash) + done() + }) + }) + it(`files.add ${run.name} with empty path and buffer content`, (done) => { + const expectedHash = 'QmWfVY9y3xjsixTgbd9AorQxH7VtMpzfx2HaWtsoUYecaX' + const content = Buffer.from('hello') + + ipfs.files.add([{ path: '', content }], run.options, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(res[0].hash).to.equal(expectedHash) + expect(res[0].path).to.equal(expectedHash) + done() + }) + }) + it(`files.add ${run.name} with cid-version=1 and raw-leaves=false`, (done) => { + const expectedCid = 'zdj7WjkeH54wf9wuC9MQrrNgDRuJiFq37DstbjWSwvuiSod9v' + const options = Object.assign({}, run.options, { 'cid-version': 1, 'raw-leaves': false }) + + ipfs.files.add(testfile, options, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(res[0].hash).to.equal(expectedCid) + expect(res[0].path).to.equal(expectedCid) + done() + }) + }) + + it(`files.add ${run.name} with only-hash=true`, function () { + this.slow(10 * 1000) + const content = String(Math.random() + Date.now()) + const options = Object.assign({}, run.options, { onlyHash: true, experimental: true }) + + return ipfs.files.add(Buffer.from(content), options) + .then(files => { + expect(files).to.have.length(1) + + // 'ipfs.object.get(<hash>)' should timeout because content wasn't actually added + return expectTimeout(ipfs.object.get(files[0].hash), 4000) + }) + }) + + it(`files.add ${run.name} with options`, (done) => { + const options = Object.assign({}, run.options, { pin: false }) + ipfs.files.add(testfile, options, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(res[0].hash).to.equal(expectedMultihash) + expect(res[0].path).to.equal(expectedMultihash) + done() + }) + }) + + HASH_ALGS.forEach((name) => { + it(`files.add ${run.name} with hash=${name} and raw-leaves=false`, (done) => { + const content = String(Math.random() + Date.now()) + const file = { + path: content + '.txt', + content: Buffer.from(content) + } + + const options = Object.assign( + {}, + run.options, + { hash: name, 'raw-leaves': false, experimental: true } + ) + + ipfs.files.add([file], options, (err, res) => { + if (err) return done(err) + expect(res).to.have.length(1) + const cid = new CID(res[0].hash) + expect(mh.decode(cid.multihash).name).to.equal(name) + done() + }) + }) + }) + + it(`files.add ${run.name} file with progress option`, (done) => { + let progress + + const options = Object.assign({}, run.options, { progress: p => (progress = p) }) + ipfs.files.add(testfile, options, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(progress).to.be.equal(testfile.byteLength) + done() + }) + }) + + // TODO: needs to be using a big file + it.skip(`files.add ${run.name} - big file with progress option`, (done) => { + let progress = 0 + const options = Object.assign({}, run.options, { progress: p => (progress = p) }) + ipfs.files.add(testfile, options, (err, res) => { + expect(err).to.not.exist() + + expect(res).to.have.length(1) + expect(progress).to.be.equal(testfile.byteLength) + done() + }) + }) + + // TODO: needs to be using a directory + it(`files.add ${run.name} - directory with progress option`, (done) => { + let progress = 0 + + const options = Object.assign({}, run.options, { progress: p => (progress = p) }) + ipfs.files.add(testfile, options, (err, res) => { + expect(err).to.not.exist() + expect(res).to.have.length(1) + expect(progress).to.be.equal(testfile.byteLength) + done() + }) + }) + + it(`files.addPullStream ${run.name} - with object chunks and pull stream content`, (done) => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + pull( + pull.values([{ content: pull.values([Buffer.from('test')]) }]), + ipfs.files.addPullStream(run.options), + pull.collect((err, res) => { + if (err) return done(err) + + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + done() + }) + ) + }) + + it(`files.add ${run.name} - with pull stream (callback)`, (done) => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + ipfs.files.add(pull.values([Buffer.from('test')]), run.options, (err, res) => { + if (err) return done(err) + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + done() + }) + }) + + it(`files.add ${run.name} - with pull stream (promise)`, () => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + return ipfs.files.add(pull.values([Buffer.from('test')]), run.options) + .then((res) => { + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + }) + }) + + it(`files.add ${run.name} - with array of objects with pull stream content`, () => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + return ipfs.files.add( + [{ content: pull.values([Buffer.from('test')]) }], + run.options) + .then((res) => { + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + }) + }) + + // tests from interface-core add + it(`files.add ${run.name} - should not be able to add by path`, (done) => { + const validPath = path.join(process.cwd() + '/package.json') + + ipfs.files.add(validPath, run.options, (err, res) => { + expect(err).to.exist() + done() + }) + }) + + it(`files.add ${run.name} - should add readable stream`, (done) => { + const expectedCid = 'QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS' + + const rs = new Readable() + rs.push(Buffer.from('some data')) + rs.push(null) + + ipfs.files.add(rs, run.options, (err, filesAdded) => { + expect(err).to.not.exist() + + expect(filesAdded).to.be.length(1) + const file = filesAdded[0] + expect(file.path).to.equal(expectedCid) + expect(file.size).to.equal(17) + expect(file.hash).to.equal(expectedCid) + done() + }) + }) + + it(`files.add ${run.name} - should add array of objects with readable stream content`, (done) => { + const expectedCid = 'QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS' + + const rs = new Readable() + rs.push(Buffer.from('some data')) + rs.push(null) + + const tuple = { path: 'data.txt', content: rs } + + ipfs.files.add([tuple], run.options, (err, filesAdded) => { + expect(err).to.not.exist() + + expect(filesAdded).to.be.length(1) + const file = filesAdded[0] + expect(file.path).to.equal('data.txt') + expect(file.size).to.equal(17) + expect(file.hash).to.equal(expectedCid) + done() + }) + }) + + it(`files.add ${run.name} - should add array of objects with readable stream content`, (done) => { + const expectedCid = 'QmVv4Wz46JaZJeH5PMV4LGbRiiMKEmszPYY3g6fjGnVXBS' + + const rs = new Readable() + rs.push(Buffer.from('some data')) + rs.push(null) + + const tuple = { path: 'data.txt', content: rs } + + ipfs.files.add([tuple], run.options, (err, filesAdded) => { + expect(err).to.not.exist() + + expect(filesAdded).to.be.length(1) + const file = filesAdded[0] + expect(file.path).to.equal('data.txt') + expect(file.size).to.equal(17) + expect(file.hash).to.equal(expectedCid) + done() + }) + }) + + it(`files.add ${run.name} - should add array of objects with pull stream content (promised)`, () => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + return ipfs.files.add([{ content: pull.values([Buffer.from('test')]) }], run.options) + .then((res) => { + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + }) + }) + + it(`files.add ${run.name} - should add a nested directory as array of tupples`, function (done) { + const content = (name) => ({ + path: `test-folder/${name}`, + content: Buffer.from('test') + }) + + const emptyDir = (name) => ({ path: `test-folder/${name}` }) + + const dirs = [ + content('pp.txt'), + content('holmes.txt'), + content('jungle.txt'), + content('alice.txt'), + emptyDir('empty-folder'), + content('files/hello.txt'), + content('files/ipfs.txt'), + emptyDir('files/empty') + ] + + ipfs.files.add(dirs, run.options, (err, res) => { + expect(err).to.not.exist() + const root = res[res.length - 1] + + expect(root.path).to.equal('test-folder') + expect(root.hash).to.equal('QmT7pf89dqQYf4vdHryzMUPhcFLCsvW2xmzCk9DbmKiVj3') + done() + }) + }) + + it(`files.add ${run.name} - should fail when passed invalid input`, (done) => { + const nonValid = 'sfdasfasfs' + + ipfs.files.add(nonValid, run.options, (err, result) => { + expect(err).to.exist() + done() + }) + }) + + // TODO: fix current implementation fails here + it.skip(`files.add ${run.name} - should wrap content in a directory`, (done) => { + const data = { path: 'testfile.txt', content: Buffer.from('test') } + const options = Object.assign({}, run.options, { + wrapWithDirectory: true + }) + + ipfs.files.add(data, options, (err, filesAdded) => { + expect(err).to.not.exist() + expect(filesAdded).to.have.length(2) + const file = filesAdded[0] + const wrapped = filesAdded[1] + expect(file.hash).to.equal('QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm') + expect(file.path).to.equal('testfile.txt') + expect(wrapped.path).to.equal('') + done() + }) + }) + + // tests from interface-core add-pullstream + + it(`files.add ${run.name} - should add pull stream of valid files and dirs`, function (done) { + const content = (name) => ({ + path: `test-folder/${name}`, + content: Buffer.from('test') + }) + + const emptyDir = (name) => ({ path: `test-folder/${name}` }) + + const files = [ + content('pp.txt'), + content('holmes.txt'), + content('jungle.txt'), + content('alice.txt'), + emptyDir('empty-folder'), + content('files/hello.txt'), + content('files/ipfs.txt'), + emptyDir('files/empty') + ] + + const stream = ipfs.files.addPullStream(run.options) + + pull( + pull.values(files), + stream, + pull.collect((err, filesAdded) => { + expect(err).to.not.exist() + + filesAdded.forEach((file) => { + if (file.path === 'test-folder') { + expect(file.hash).to.equal('QmT7pf89dqQYf4vdHryzMUPhcFLCsvW2xmzCk9DbmKiVj3') + done() + } + }) + }) + ) + }) + + it(`files.add ${run.name} - should add with object chunks and pull stream content`, (done) => { + const expectedCid = 'QmRf22bZar3WKmojipms22PkXH1MZGmvsqzQtuSvQE3uhm' + + pull( + pull.values([{ content: pull.values([Buffer.from('test')]) }]), + ipfs.files.addPullStream(run.options), + pull.collect((err, res) => { + if (err) return done(err) + expect(res).to.have.length(1) + expect(res[0]).to.deep.equal({ path: expectedCid, hash: expectedCid, size: 12 }) + done() + }) + ) + }) + // tests from interface-core add-readable-stream + + it(`files.add ${run.name} - should add readable stream of valid files and dirs`, function (done) { + const content = (name) => ({ + path: `test-folder/${name}`, + content: Buffer.from('test') + }) + + const emptyDir = (name) => ({ path: `test-folder/${name}` }) + + const files = [ + content('pp.txt'), + content('holmes.txt'), + content('jungle.txt'), + content('alice.txt'), + emptyDir('empty-folder'), + content('files/hello.txt'), + content('files/ipfs.txt'), + emptyDir('files/empty') + ] + + const stream = ipfs.files.addReadableStream(run.options) + + stream.on('error', (err) => { + expect(err).to.not.exist() + }) + + stream.on('data', (file) => { + if (file.path === 'test-folder') { + expect(file.hash).to.equal('QmT7pf89dqQYf4vdHryzMUPhcFLCsvW2xmzCk9DbmKiVj3') + done() + } + }) + + files.forEach((file) => stream.write(file)) + stream.end() + }) + // end runs + }) + + it.skip('files.add pins by default', (done) => { + const newContent = Buffer.from(String(Math.random())) + + ipfs.pin.ls((err, pins) => { + expect(err).to.not.exist() + const initialPinCount = pins.length + ipfs.files.add(newContent, { experimental: true }, (err, res) => { + expect(err).to.not.exist() + + ipfs.pin.ls((err, pins) => { + expect(err).to.not.exist() + expect(pins.length).to.eql(initialPinCount + 1) + done() + }) + }) + }) + }) + + it.skip('files.add with pin=false', (done) => { + const newContent = Buffer.from(String(Math.random())) + + ipfs.pin.ls((err, pins) => { + expect(err).to.not.exist() + const initialPinCount = pins.length + ipfs.files.add(newContent, { pin: false, experimental: true }, (err, res) => { + expect(err).to.not.exist() + + ipfs.pin.ls((err, pins) => { + expect(err).to.not.exist() + expect(pins.length).to.eql(initialPinCount) + done() + }) + }) + }) + }) +}) diff --git a/test/fixtures/2048 b/test/fixtures/2048 new file mode 100644 index 000000000..807b97e3e Binary files /dev/null and b/test/fixtures/2048 differ diff --git a/test/fixtures/chunked-add/chunk1 b/test/fixtures/chunked-add/chunk1 new file mode 100644 index 000000000..b69365566 Binary files /dev/null and b/test/fixtures/chunked-add/chunk1 differ diff --git a/test/fixtures/chunked-add/chunk2 b/test/fixtures/chunked-add/chunk2 new file mode 100644 index 000000000..039d4586a Binary files /dev/null and b/test/fixtures/chunked-add/chunk2 differ diff --git a/test/fixtures/chunked-add/chunk3 b/test/fixtures/chunked-add/chunk3 new file mode 100644 index 000000000..994392ffa --- /dev/null +++ b/test/fixtures/chunked-add/chunk3 @@ -0,0 +1,2 @@ +�=}��%��u��\/�T�K�d�0��E+�UH(�a�,;��m���u]���~-l�+�.��\=�����%���!y~kI��_��)7!��҈9��ң�Uh}]r��4 (~�?�|�����pݑE�hX +----------------------------817904829930564528937953-- diff --git a/test/fixtures/chunked-add/readme.md b/test/fixtures/chunked-add/readme.md new file mode 100644 index 000000000..8f6e384ff --- /dev/null +++ b/test/fixtures/chunked-add/readme.md @@ -0,0 +1,12 @@ + +### Copy/paste this inside inside this folder to test add chunked with curl +Don't forget to start the daemon +```bash +curl 'http://localhost:5002/api/v0/add-chunked?progress=true&stream-channels=true' -H 'X-Chunked-Input: uuid="28085967-267f-e569-9593-e7fb1ce53b45"; index=1' -H 'Content-Type: multipart/form-data; boundary=----------------------------817904829930564528937953' -H 'Content-Range: bytes 0-1024/1024' -H 'Connection: keep-alive' --data-binary @chunk1 --compressed + +curl 'http://localhost:5002/api/v0/add-chunked?progress=true&stream-channels=true' -H 'X-Chunked-Input: uuid="28085967-267f-e569-9593-e7fb1ce53b45"; index=2' -H 'Content-Type: multipart/form-data; boundary=----------------------------817904829930564528937953' -H 'Content-Range: bytes 1024-2048/2048' -H 'Connection: keep-alive' --data-binary @chunk2 --compressed + +curl 'http://localhost:5002/api/v0/add-chunked?progress=true&stream-channels=true' -H 'X-Chunked-Input: uuid="28085967-267f-e569-9593-e7fb1ce53b45"; index=3' -H 'Content-Type: multipart/form-data; boundary=----------------------------817904829930564528937953' -H 'Content-Range: bytes 2048-2242/2242' -H 'Connection: keep-alive' --data-binary @chunk3 --compressed + +curl 'http://localhost:5002/api/v0/add-chunked?progress=true&stream-channels=true' -X POST -H 'X-Chunked-Input: uuid="28085967-267f-e569-9593-e7fb1ce53b45"; index=3' -H 'Content-Type: multipart/form-data; boundary=--------------------------817904829930564528937953' -H 'Content-Range: bytes 2242-2242/2242' -H 'Accept: */*' -H 'Connection: keep-alive' -H 'Content-Length: 0' --compressed +``` \ No newline at end of file