Skip to content
This repository has been archived by the owner on Apr 29, 2020. It is now read-only.

Commit

Permalink
perf: write files in parallel chunks, use a through instead of a map
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Dec 20, 2018
1 parent 4ef5dbc commit 6a86d55
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 40 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"pull-batch": "^1.0.0",
"pull-block": "^1.4.0",
"pull-pair": "^1.1.0",
"pull-paramap": "^1.2.2",
"pull-pause": "0.0.2",
"pull-pushable": "^2.2.0",
"pull-stream": "^3.6.9",
Expand Down
82 changes: 42 additions & 40 deletions src/builder/builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@ const extend = require('deep-extend')
const UnixFS = require('ipfs-unixfs')
const pull = require('pull-stream/pull')
const values = require('pull-stream/sources/values')
const asyncMap = require('pull-stream/throughs/async-map')
const map = require('pull-stream/throughs/map')
const collect = require('pull-stream/sinks/collect')
const through = require('pull-through')
const parallel = require('async/parallel')
const waterfall = require('async/waterfall')
const paraMap = require('pull-paramap')
const persist = require('../utils/persist')
const reduce = require('./reduce')
const {
Expand Down Expand Up @@ -106,50 +105,53 @@ module.exports = function builder (createChunker, ipld, createReducer, _options)
pull(
file.content,
chunker,
map(chunk => {
through(chunk => {
if (options.progress && typeof options.progress === 'function') {
options.progress(chunk.byteLength)
}
return chunk
}),
asyncMap((buffer, callback) => {
if (options.rawLeaves) {
return callback(null, {
size: buffer.length,
leafSize: buffer.length,
data: buffer
})
}

const file = new UnixFS(options.leafType, buffer)
paraMap((buffer, callback) => {
waterfall([
(cb) => {
if (options.rawLeaves) {
return cb(null, {
size: buffer.length,
leafSize: buffer.length,
data: buffer
})
}

DAGNode.create(file.marshal(), [], (err, node) => {
if (err) {
return callback(err)
const file = new UnixFS(options.leafType, buffer)

DAGNode.create(file.marshal(), [], (err, node) => {
if (err) {
return cb(err)
}

cb(null, {
size: node.size,
leafSize: file.fileSize(),
data: node
})
})
},
(leaf, cb) => {
persist(leaf.data, ipld, options, (error, results) => {
if (error) {
return cb(error)
}

cb(null, {
size: leaf.size,
leafSize: leaf.leafSize,
data: results.node,
multihash: results.cid.buffer,
path: leaf.path,
name: ''
})
})
}

callback(null, {
size: node.size,
leafSize: file.fileSize(),
data: node
})
})
}),
asyncMap((leaf, callback) => {
persist(leaf.data, ipld, options, (error, results) => {
if (error) {
return callback(error)
}

callback(null, {
size: leaf.size,
leafSize: leaf.leafSize,
data: results.node,
multihash: results.cid.buffer,
path: leaf.path,
name: ''
})
})
], callback)
}),
through( // mark as single node if only one single node
function onData (data) {
Expand Down

0 comments on commit 6a86d55

Please sign in to comment.