Skip to content
This repository was archived by the owner on Aug 12, 2020. It is now read-only.

End export stream on completion. #47

Merged
merged 4 commits into from
Jun 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@
"homepage": "https://github.com/diasdavid/js-ipfs-data-importing#readme",
"devDependencies": {
"aegir": "^3.0.1",
"async": "^1.5.2",
"block-stream2": "^1.1.0",
"bs58": "^3.0.0",
"buffer-loader": "0.0.1",
"chai": "^3.5.0",
"concat-stream": "^1.5.1",
"fs-blob-store": "^5.2.1",
"idb-plus-blob-store": "^1.1.2",
"ipfs-repo": "^0.7.5",
Expand All @@ -51,11 +53,13 @@
"string-to-stream": "^1.0.1"
},
"dependencies": {
"async": "^1.5.2",
"block-stream2": "^1.1.0",
"bs58": "^3.0.0",
"debug": "^2.2.0",
"field-trip": "0.0.3",
"ipfs-merkle-dag": "^0.5.0",
"ipfs-unixfs": "^0.1.0",
"is-ipfs": "^0.2.0",
"isstream": "^0.1.2",
"readable-stream": "^1.1.13",
"run-series": "^1.1.4",
Expand Down
118 changes: 55 additions & 63 deletions src/exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
const debug = require('debug')
const log = debug('exporter')
log.err = debug('exporter:error')
const isIPFS = require('is-ipfs')
const bs58 = require('bs58')
const UnixFS = require('ipfs-unixfs')
const series = require('run-series')
const async = require('async')
const Readable = require('readable-stream').Readable
const pathj = require('path')
const util = require('util')
const fieldtrip = require('field-trip')

exports = module.exports = Exporter

Expand All @@ -19,21 +21,29 @@ function Exporter (hash, dagService, options) {
return new Exporter(hash, dagService, options)
}

// Sanitize hash.
if (!isIPFS.multihash(hash)) {
throw new Error('not valid multihash')
}
if (Buffer.isBuffer(hash)) {
hash = bs58.encode(hash)
}

Readable.call(this, { objectMode: true })

this.options = options || {}

this._read = (n) => {}

let fileExporter = (node, name, callback) => {
let init
let fileExporter = (node, name, done) => {
let init = false

if (!callback) { callback = function noop () {} }
if (!done) throw new Error('done must be set')

// Logic to export a single (possibly chunked) unixfs file.
var rs = new Readable()
if (node.links.length === 0) {
const unmarshaledData = UnixFS.unmarshal(node.data)
init = false
rs._read = () => {
if (init) {
return
Expand All @@ -43,10 +53,8 @@ function Exporter (hash, dagService, options) {
rs.push(null)
}
this.push({ content: rs, path: name })
callback()
return
done()
} else {
init = false
rs._read = () => {
if (init) {
return
Expand All @@ -57,7 +65,7 @@ function Exporter (hash, dagService, options) {
return (cb) => {
dagService.get(link.hash, (err, res) => {
if (err) {
cb(err)
return cb(err)
}
var unmarshaledData = UnixFS.unmarshal(res.data)
rs.push(unmarshaledData.data)
Expand All @@ -67,80 +75,64 @@ function Exporter (hash, dagService, options) {
})
series(array, (err, res) => {
if (err) {
callback()
rs.emit('error', err)
return
}
rs.push(null)
callback()
return
})
}
this.push({ content: rs, path: name })
callback()
return
done()
}
}

let dirExporter = (node, name, callback) => {
let init
// Logic to export a unixfs directory.
let dirExporter = (node, name, add, done) => {
if (!add) throw new Error('add must be set')
Copy link
Contributor

@nginnever nginnever Jun 23, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add will always be set since it's a param called from visit() which is afaik not meant to be exposed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now: yes, it is. I'm being defensive here and explicit with my expectations for the function. I thoroughly expect myself or someone else to goof this up when rearranging things in the future and it'll be nice to have a nice obvious error message. :)

if (!done) throw new Error('done must be set')

if (!callback) { callback = function noop () {} }
this.push({content: null, path: name})

var rs = new Readable()
if (node.links.length === 0) {
init = false
rs._read = () => {
if (init) {
return
}
init = true
rs.push(node.data)
rs.push(null)
}
this.push({content: null, path: name})
callback()
return
} else {
async.forEachSeries(node.links, (link, callback) => {
dagService.get(link.hash, (err, res) => {
if (err) {
callback(err)
}
var unmarshaledData = UnixFS.unmarshal(res.data)
if (unmarshaledData.type === 'file') {
return (fileExporter(res, pathj.join(name, link.name), callback))
}
if (unmarshaledData.type === 'directory') {
return (dirExporter(res, pathj.join(name, link.name), callback))
}
callback()
})
}, (err) => {
if (err) {
callback()
return
}
callback()
return
// Directory has links
if (node.links.length > 0) {
node.links.forEach((link) => {
add({ path: pathj.join(name, link.name), hash: link.hash })
})
}
done()
}

dagService.get(hash, (err, fetchedNode) => {
// Traverse the DAG asynchronously
var self = this
fieldtrip([{ path: hash, hash: hash }], visit, (err) => {
if (err) {
this.emit('error', err)
self.emit('error', err)
return
}
const data = UnixFS.unmarshal(fetchedNode.data)
const type = data.type

if (type === 'directory') {
dirExporter(fetchedNode, hash)
}
if (type === 'file') {
fileExporter(fetchedNode, hash)
}
self.push(null)
})

// Visit function: called once per node in the exported graph
function visit (item, add, done) {
dagService.get(item.hash, (err, fetchedNode) => {
if (err) {
self.emit('error', err)
return
}

const data = UnixFS.unmarshal(fetchedNode.data)
const type = data.type

if (type === 'directory') {
dirExporter(fetchedNode, item.path, add, done)
}

if (type === 'file') {
fileExporter(fetchedNode, item.path, done)
}
})
}

return this
}
48 changes: 30 additions & 18 deletions test/test-exporter.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const expect = require('chai').expect
const BlockService = require('ipfs-block-service')
const DAGService = require('ipfs-merkle-dag').DAGService
const UnixFS = require('ipfs-unixfs')
const bl = require('bl')
const concat = require('concat-stream')
const fs = require('fs')
const path = require('path')

Expand All @@ -32,13 +32,16 @@ module.exports = function (repo) {
const unmarsh = UnixFS.unmarshal(fetchedNode.data)
expect(err).to.not.exist
const testExport = exporter(hash, ds)
testExport.on('data', (file) => {
file.content.pipe(bl((err, bldata) => {
expect(err).to.not.exist
testExport.on('error', (err) => {
expect(err).to.not.exist
})
testExport.pipe(concat((files) => {
expect(files).to.be.length(1)
files[0].content.pipe(concat((bldata) => {
expect(bldata).to.deep.equal(unmarsh.data)
done()
}))
})
}))
})
})

Expand All @@ -47,10 +50,12 @@ module.exports = function (repo) {
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('error', (err) => {
expect(err).to.not.exist
})
testExport.on('data', (file) => {
file.content.pipe(bl((err, bldata) => {
file.content.pipe(concat((bldata) => {
expect(bldata).to.deep.equal(bigFile)
expect(err).to.not.exist
done()
}))
})
Expand All @@ -61,10 +66,13 @@ module.exports = function (repo) {
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('error', (err) => {
expect(err).to.not.exist
})
testExport.on('data', (file) => {
expect(file.path).to.equal('QmRQgufjp9vLE8XK2LGKZSsPCFCF6e4iynCQtNB5X2HBKE')
file.content.pipe(bl((err, bldata) => {
expect(err).to.not.exist
file.content.pipe(concat((bldata) => {
expect(bldata).to.exist
done()
}))
})
Expand All @@ -75,24 +83,28 @@ module.exports = function (repo) {
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
var fsa = []
testExport.on('data', (files) => {
fsa.push(files)
testExport.on('error', (err) => {
expect(err).to.not.exist
})
setTimeout(() => {
expect(fsa[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt')
expect(fsa[1].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/dir-another')
expect(fsa[2].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/200Bytes.txt')
expect(fsa[3].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/level-2')
testExport.pipe(concat((files) => {
expect(files[0].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN')
expect(files[1].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/200Bytes.txt')
expect(files[2].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/dir-another')
expect(files[3].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1')
expect(files[4].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/200Bytes.txt')
expect(files[5].path).to.equal('QmWChcSFMNcFkfeJtNd8Yru1rE6PhtCRfewi1tMwjkwKjN/level-1/level-2')
done()
}, 1000)
}))
})

it('returns a null stream for dir', (done) => {
const hash = 'QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn' // This hash doesn't exist in the repo
const bs = new BlockService(repo)
const ds = new DAGService(bs)
const testExport = exporter(hash, ds)
testExport.on('error', (err) => {
expect(err).to.not.exist
})
testExport.on('data', (dir) => {
expect(dir.content).to.equal(null)
done()
Expand Down