Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

refactor: async iterables #2547

Closed
wants to merge 13 commits into from
14 changes: 8 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
],
"main": "src/core/index.js",
"browser": {
"./src/core/components/init-assets.js": false,
"./src/core/runtime/init-assets-nodejs.js": "./src/core/runtime/init-assets-browser.js",
"./src/core/runtime/add-from-fs-nodejs.js": "./src/core/runtime/add-from-fs-browser.js",
"./src/core/runtime/config-nodejs.js": "./src/core/runtime/config-browser.js",
"./src/core/runtime/dns-nodejs.js": "./src/core/runtime/dns-browser.js",
Expand All @@ -25,7 +25,8 @@
"./src/core/runtime/repo-nodejs.js": "./src/core/runtime/repo-browser.js",
"./src/core/runtime/ipld-nodejs.js": "./src/core/runtime/ipld-browser.js",
"./test/utils/create-repo-nodejs.js": "./test/utils/create-repo-browser.js",
"stream": "readable-stream"
"stream": "readable-stream",
"ipfs-utils/src/files/glob-source": false
},
"browser-all-ipld-formats": {
"./src/core/runtime/ipld-browser.js": "./src/core/runtime/ipld-browser-all.js"
Expand Down Expand Up @@ -100,15 +101,15 @@
"ipfs-bitswap": "^0.26.0",
"ipfs-block": "~0.8.1",
"ipfs-block-service": "~0.16.0",
"ipfs-http-client": "^40.0.1",
"ipfs-http-client": "github:ipfs/js-ipfs-http-client#refactor/async-iterables2",
"ipfs-http-response": "~0.4.0",
"ipfs-mfs": "^0.13.2",
"ipfs-multipart": "^0.2.0",
"ipfs-repo": "^0.30.0",
"ipfs-unixfs": "~0.1.16",
"ipfs-unixfs-exporter": "^0.38.0",
"ipfs-unixfs-importer": "^0.40.0",
"ipfs-utils": "~0.4.0",
"ipfs-utils": "^0.5.0",
"ipld": "~0.25.0",
"ipld-bitcoin": "~0.3.0",
"ipld-dag-cbor": "~0.15.0",
Expand Down Expand Up @@ -205,9 +206,10 @@
"execa": "^3.0.0",
"form-data": "^3.0.0",
"hat": "0.0.3",
"interface-ipfs-core": "^0.124.1",
"interface-ipfs-core": "github:ipfs/interface-js-ipfs-core#refactor/async-iterables",
"ipfs-interop": "^0.1.1",
"ipfsd-ctl": "^0.47.2",
"ipfsd-ctl": "github:ipfs/js-ipfsd-ctl#fix/do-not-call-shutdown-twice",
"it-all": "^1.0.1",
"libp2p-websocket-star": "~0.10.2",
"lodash": "^4.17.15",
"ncp": "^2.0.0",
Expand Down
12 changes: 6 additions & 6 deletions src/cli/commands/add.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,20 +156,20 @@ module.exports = {
? globSource(argv.file, { recursive: argv.recursive })
: process.stdin // Pipe directly to ipfs.add

let finalHash
let finalCid

try {
for await (const file of ipfs._addAsyncIterator(source, options)) {
for await (const file of ipfs.add(source, options)) {
if (argv.silent) {
continue
}

if (argv.quieter) {
finalHash = file.hash
finalCid = file.cid
continue
}

const cid = cidToString(file.hash, { base: argv.cidBase })
const cid = cidToString(file.cid, { base: argv.cidBase })
let message = cid

if (!argv.quiet) {
Expand All @@ -184,7 +184,7 @@ module.exports = {
bar.terminate()
}

// Tweak the error message and add more relevant infor for the CLI
// Tweak the error message and add more relevant info for the CLI
if (err.code === 'ERR_DIR_NON_RECURSIVE') {
err.message = `'${err.path}' is a directory, use the '-r' flag to specify directories`
}
Expand All @@ -197,7 +197,7 @@ module.exports = {
}

if (argv.quieter) {
log(cidToString(finalHash, { base: argv.cidBase }))
log(cidToString(finalCid, { base: argv.cidBase }))
}
})())
}
Expand Down
2 changes: 1 addition & 1 deletion src/cli/commands/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ module.exports = {
const IPFS = require('../../core')
const Repo = require('ipfs-repo')

const node = new IPFS({
const node = await IPFS.create({
repo: new Repo(path),
init: false,
start: false,
Expand Down
4 changes: 1 addition & 3 deletions src/cli/daemon.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,7 @@ class Daemon {

// start the daemon
const ipfsOpts = Object.assign({}, { init: true, start: true, libp2p }, this._options)
const ipfs = await IPFS.create(ipfsOpts)

this._ipfs = ipfs
const ipfs = this._ipfs = await IPFS.create(ipfsOpts)

// start HTTP servers (if API or Gateway is enabled in options)
const httpApi = new HttpApi(ipfs, ipfsOpts)
Expand Down
21 changes: 21 additions & 0 deletions src/core/api-manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
module.exports = class ApiManager {
constructor () {
this._api = {}
this._onUndef = () => undefined
this.api = new Proxy({}, {
get: (_, prop) => {
if (prop === 'then') return undefined // Not a promise!
return this._api[prop] === undefined ? this._onUndef(prop) : this._api[prop]
},
has: (_, prop) => prop in this._api
})
}

update (nextApi, onUndef) {
const prevApi = this._api
const prevUndef = this._onUndef
this._api = nextApi
if (onUndef) this._onUndef = onUndef
return { cancel: () => this.update(prevApi, prevUndef), api: this.api }
}
}
90 changes: 0 additions & 90 deletions src/core/boot.js

This file was deleted.

121 changes: 121 additions & 0 deletions src/core/components/add/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
'use strict'

const importer = require('ipfs-unixfs-importer')
const normaliseAddInput = require('ipfs-utils/src/files/normalise-input')
const { parseChunkerString } = require('./utils')
const pipe = require('it-pipe')

module.exports = ({ ipld, dag, gcLock, preload, pin, constructorOptions }) => {
const isShardingEnabled = constructorOptions.EXPERIMENTAL && constructorOptions.EXPERIMENTAL.sharding
return async function * add (source, options) {
options = options || {}

const opts = {
shardSplitThreshold: isShardingEnabled ? 1000 : Infinity,
...options,
strategy: 'balanced',
...parseChunkerString(options.chunker)
}

// CID v0 is for multihashes encoded with sha2-256
if (opts.hashAlg && opts.cidVersion !== 1) {
opts.cidVersion = 1
}

if (opts.trickle) {
opts.strategy = 'trickle'
}

delete opts.trickle

if (opts.progress) {
let total = 0
const prog = opts.progress

opts.progress = (bytes) => {
total += bytes
prog(total)
}
}

const iterator = pipe(
normaliseAddInput(source),
source => importer(source, ipld, opts),
transformFile(dag, opts),
preloadFile(preload, opts),
pinFile(pin, opts)
)

const releaseLock = await gcLock.readLock()

try {
yield * iterator
} finally {
releaseLock()
}
}
}

function transformFile (dag, opts) {
return async function * (source) {
for await (const { cid, path, unixfs } of source) {
if (opts.onlyHash) {
yield {
cid,
path: path || cid.toString(),
size: unixfs.fileSize()
}

continue
}

const { value: node } = await dag.get(cid, { ...opts, preload: false })

yield {
cid,
path: path || cid.toString(),
size: Buffer.isBuffer(node) ? node.length : node.size
}
}
}
}

function preloadFile (preload, opts) {
return async function * (source) {
for await (const file of source) {
const isRootFile = !file.path || opts.wrapWithDirectory
? file.path === ''
: !file.path.includes('/')

const shouldPreload = isRootFile && !opts.onlyHash && opts.preload !== false

if (shouldPreload) {
preload(file.hash)
}

yield file
}
}
}

function pinFile (pin, opts) {
return async function * (source) {
for await (const file of source) {
// Pin a file if it is the root dir of a recursive add or the single file
// of a direct add.
const isRootDir = !file.path.includes('/')
const shouldPin = (opts.pin == null ? true : opts.pin) && isRootDir && !opts.onlyHash

if (shouldPin) {
// Note: addAsyncIterator() has already taken a GC lock, so tell
// pin.add() not to take a (second) GC lock
await pin.add(file.hash, {
preload: false,
lock: false
})
}

yield file
}
}
}
Loading