diff --git a/package.json b/package.json index cf0d3dbe..ce4c2d2b 100644 --- a/package.json +++ b/package.json @@ -35,10 +35,12 @@ "homepage": "https://github.com/diasdavid/js-ipfs-data-importing#readme", "devDependencies": { "aegir": "^3.0.1", + "async": "^1.5.2", "block-stream2": "^1.1.0", "bs58": "^3.0.0", "buffer-loader": "0.0.1", "chai": "^3.5.0", + "concat-stream": "^1.5.1", "fs-blob-store": "^5.2.1", "idb-plus-blob-store": "^1.1.2", "ipfs-repo": "^0.7.5", @@ -51,11 +53,13 @@ "string-to-stream": "^1.0.1" }, "dependencies": { - "async": "^1.5.2", "block-stream2": "^1.1.0", + "bs58": "^3.0.0", "debug": "^2.2.0", + "field-trip": "0.0.3", "ipfs-merkle-dag": "^0.5.0", "ipfs-unixfs": "^0.1.0", + "is-ipfs": "^0.2.0", "isstream": "^0.1.2", "readable-stream": "^1.1.13", "run-series": "^1.1.4", diff --git a/src/exporter.js b/src/exporter.js index 3bfaaab0..1833cf41 100644 --- a/src/exporter.js +++ b/src/exporter.js @@ -3,12 +3,14 @@ const debug = require('debug') const log = debug('exporter') log.err = debug('exporter:error') +const isIPFS = require('is-ipfs') +const bs58 = require('bs58') const UnixFS = require('ipfs-unixfs') const series = require('run-series') -const async = require('async') const Readable = require('readable-stream').Readable const pathj = require('path') const util = require('util') +const fieldtrip = require('field-trip') exports = module.exports = Exporter @@ -19,21 +21,29 @@ function Exporter (hash, dagService, options) { return new Exporter(hash, dagService, options) } + // Sanitize hash. + if (!isIPFS.multihash(hash)) { + throw new Error('not valid multihash') + } + if (Buffer.isBuffer(hash)) { + hash = bs58.encode(hash) + } + Readable.call(this, { objectMode: true }) this.options = options || {} this._read = (n) => {} - let fileExporter = (node, name, callback) => { - let init + let fileExporter = (node, name, done) => { + let init = false - if (!callback) { callback = function noop () {} } + if (!done) throw new Error('done must be set') + // Logic to export a single (possibly chunked) unixfs file. var rs = new Readable() if (node.links.length === 0) { const unmarshaledData = UnixFS.unmarshal(node.data) - init = false rs._read = () => { if (init) { return @@ -43,10 +53,8 @@ function Exporter (hash, dagService, options) { rs.push(null) } this.push({ content: rs, path: name }) - callback() - return + done() } else { - init = false rs._read = () => { if (init) { return @@ -57,7 +65,7 @@ function Exporter (hash, dagService, options) { return (cb) => { dagService.get(link.hash, (err, res) => { if (err) { - cb(err) + return cb(err) } var unmarshaledData = UnixFS.unmarshal(res.data) rs.push(unmarshaledData.data) @@ -67,80 +75,64 @@ function Exporter (hash, dagService, options) { }) series(array, (err, res) => { if (err) { - callback() + rs.emit('error', err) return } rs.push(null) - callback() return }) } this.push({ content: rs, path: name }) - callback() - return + done() } } - let dirExporter = (node, name, callback) => { - let init + // Logic to export a unixfs directory. + let dirExporter = (node, name, add, done) => { + if (!add) throw new Error('add must be set') + if (!done) throw new Error('done must be set') - if (!callback) { callback = function noop () {} } + this.push({content: null, path: name}) - var rs = new Readable() - if (node.links.length === 0) { - init = false - rs._read = () => { - if (init) { - return - } - init = true - rs.push(node.data) - rs.push(null) - } - this.push({content: null, path: name}) - callback() - return - } else { - async.forEachSeries(node.links, (link, callback) => { - dagService.get(link.hash, (err, res) => { - if (err) { - callback(err) - } - var unmarshaledData = UnixFS.unmarshal(res.data) - if (unmarshaledData.type === 'file') { - return (fileExporter(res, pathj.join(name, link.name), callback)) - } - if (unmarshaledData.type === 'directory') { - return (dirExporter(res, pathj.join(name, link.name), callback)) - } - callback() - }) - }, (err) => { - if (err) { - callback() - return - } - callback() - return + // Directory has links + if (node.links.length > 0) { + node.links.forEach((link) => { + add({ path: pathj.join(name, link.name), hash: link.hash }) }) } + done() } - dagService.get(hash, (err, fetchedNode) => { + // Traverse the DAG asynchronously + var self = this + fieldtrip([{ path: hash, hash: hash }], visit, (err) => { if (err) { - this.emit('error', err) + self.emit('error', err) return } - const data = UnixFS.unmarshal(fetchedNode.data) - const type = data.type - - if (type === 'directory') { - dirExporter(fetchedNode, hash) - } - if (type === 'file') { - fileExporter(fetchedNode, hash) - } + self.push(null) }) + // Visit function: called once per node in the exported graph + function visit (item, add, done) { + dagService.get(item.hash, (err, fetchedNode) => { + if (err) { + self.emit('error', err) + return + } + + const data = UnixFS.unmarshal(fetchedNode.data) + const type = data.type + + if (type === 'directory') { + dirExporter(fetchedNode, item.path, add, done) + } + + if (type === 'file') { + fileExporter(fetchedNode, item.path, done) + } + }) + } + return this } diff --git a/test/test-exporter.js b/test/test-exporter.js index 8e4f79db..abb77456 100644 --- a/test/test-exporter.js +++ b/test/test-exporter.js @@ -7,7 +7,7 @@ const expect = require('chai').expect const BlockService = require('ipfs-block-service') const DAGService = require('ipfs-merkle-dag').DAGService const UnixFS = require('ipfs-unixfs') -const bl = require('bl') +const concat = require('concat-stream') const fs = require('fs') const path = require('path') @@ -32,13 +32,16 @@ module.exports = function (repo) { const unmarsh = UnixFS.unmarshal(fetchedNode.data) expect(err).to.not.exist const testExport = exporter(hash, ds) - testExport.on('data', (file) => { - file.content.pipe(bl((err, bldata) => { - expect(err).to.not.exist + testExport.on('error', (err) => { + expect(err).to.not.exist + }) + testExport.pipe(concat((files) => { + expect(files).to.be.length(1) + files[0].content.pipe(concat((bldata) => { expect(bldata).to.deep.equal(unmarsh.data) done() })) - }) + })) }) }) @@ -47,10 +50,12 @@ module.exports = function (repo) { const bs = new BlockService(repo) const ds = new DAGService(bs) const testExport = exporter(hash, ds) + testExport.on('error', (err) => { + expect(err).to.not.exist + }) testExport.on('data', (file) => { - file.content.pipe(bl((err, bldata) => { + file.content.pipe(concat((bldata) => { expect(bldata).to.deep.equal(bigFile) - expect(err).to.not.exist done() })) }) @@ -61,10 +66,13 @@ module.exports = function (repo) { const bs = new BlockService(repo) const ds = new DAGService(bs) const testExport = exporter(hash, ds) + testExport.on('error', (err) => { + expect(err).to.not.exist + }) testExport.on('data', (file) => { expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE') - file.content.pipe(bl((err, bldata) => { - expect(err).to.not.exist + file.content.pipe(concat((bldata) => { + expect(bldata).to.exist done() })) }) @@ -75,17 +83,18 @@ module.exports = function (repo) { const bs = new BlockService(repo) const ds = new DAGService(bs) const testExport = exporter(hash, ds) - var fsa = [] - testExport.on('data', (files) => { - fsa.push(files) + testExport.on('error', (err) => { + expect(err).to.not.exist }) - setTimeout(() => { - expect(fsa[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt') - expect(fsa[1].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/dir-another') - expect(fsa[2].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/200Bytes.txt') - expect(fsa[3].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/level-2') + testExport.pipe(concat((files) => { + expect(files[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN') + expect(files[1].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt') + expect(files[2].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/dir-another') + expect(files[3].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1') + expect(files[4].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/200Bytes.txt') + expect(files[5].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/level-2') done() - }, 1000) + })) }) it('returns a null stream for dir', (done) => { @@ -93,6 +102,9 @@ module.exports = function (repo) { const bs = new BlockService(repo) const ds = new DAGService(bs) const testExport = exporter(hash, ds) + testExport.on('error', (err) => { + expect(err).to.not.exist + }) testExport.on('data', (dir) => { expect(dir.content).to.equal(null) done()