Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

chore: move withTimeoutOption to core-utils #3407

Merged
merged 2 commits into from
Nov 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/interface-ipfs-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
"is-ipfs": "^2.0.0",
"iso-random-stream": "^1.1.1",
"it-all": "^1.0.4",
"it-buffer-stream": "^1.0.5",
"it-concat": "^1.0.1",
"it-drain": "^1.0.3",
"it-last": "^1.0.4",
Expand Down
80 changes: 80 additions & 0 deletions packages/interface-ipfs-core/src/add-all.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const { isNode } = require('ipfs-utils/src/env')
const { getDescribe, getIt, expect } = require('./utils/mocha')
const testTimeout = require('./utils/test-timeout')
const uint8ArrayFromString = require('uint8arrays/from-string')
const bufferStream = require('it-buffer-stream')

/** @typedef { import("ipfsd-ctl/src/factory") } Factory */
/**
Expand Down Expand Up @@ -420,5 +421,84 @@ module.exports = (common, options) => {
expect(files[0].cid.codec).to.equal('dag-pb')
expect(files[0].size).to.equal(18)
})

it('should support bidirectional streaming', async function () {
let progressInvoked

const handler = (bytes, path) => {
progressInvoked = true
}

const source = async function * () {
yield {
content: 'hello',
path: '/file'
}

await new Promise((resolve) => {
const interval = setInterval(() => {
// we've received a progress result, that means we've received some
// data from the server before we're done sending data to the server
// so the streaming is bidirectional and we can finish up
if (progressInvoked) {
clearInterval(interval)
resolve()
}
}, 10)
})
}

await drain(ipfs.addAll(source(), {
progress: handler,
fileImportConcurrency: 1
}))

expect(progressInvoked).to.be.true()
})

it('should error during add-all stream', async function () {
const source = async function * () {
yield {
content: 'hello',
path: '/file'
}

yield {
content: 'hello',
path: '/file'
}
}

await expect(drain(ipfs.addAll(source(), {
fileImportConcurrency: 1,
chunker: 'rabin-2048--50' // invalid chunker parameters, validated after the stream starts moving
}))).to.eventually.be.rejectedWith(/Chunker parameter avg must be an integer/)
})

it('should add big files', async function () {
const totalSize = 1024 * 1024 * 200
const chunkSize = 1024 * 1024 * 99

const source = async function * () {
yield {
path: '/dir/file-200mb-1',
content: bufferStream(totalSize, {
chunkSize
})
}

yield {
path: '/dir/file-200mb-2',
content: bufferStream(totalSize, {
chunkSize
})
}
}

const results = await all(ipfs.addAll(source()))

expect(await ipfs.files.stat(`/ipfs/${results[0].cid}`)).to.have.property('size', totalSize)
expect(await ipfs.files.stat(`/ipfs/${results[1].cid}`)).to.have.property('size', totalSize)
})
})
}
7 changes: 5 additions & 2 deletions packages/ipfs-cli/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@
],
"references": [
{
"path": "../ipfs-core-utils"
"path": "../ipfs-core"
},
{
"path": "../ipfs-core"
"path": "../ipfs-core-utils"
},
{
"path": "../ipfs-http-client"
},
{
"path": "../ipfs-http-gateway"
},
{
"path": "../ipfs-http-server"
}
Expand Down
3 changes: 3 additions & 0 deletions packages/ipfs-core-utils/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
},
"license": "MIT",
"dependencies": {
"any-signal": "^2.0.0",
"blob-to-it": "^1.0.1",
"browser-readablestream-to-it": "^1.0.1",
"cids": "^1.0.0",
Expand All @@ -48,6 +49,8 @@
"it-peekable": "^1.0.1",
"multiaddr": "^8.0.0",
"multiaddr-to-uri": "^6.0.0",
"parse-duration": "^0.4.4",
"timeout-abort-controller": "^1.1.1",
"uint8arrays": "^1.1.0"
},
"devDependencies": {
Expand Down
12 changes: 12 additions & 0 deletions packages/ipfs-core-utils/src/errors.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict'

class TimeoutError extends Error {
constructor (message = 'request timed out') {
super(message)
this.name = 'TimeoutError'
this.code = TimeoutError.code
}
}

TimeoutError.code = 'ERR_TIMEOUT'
exports.TimeoutError = TimeoutError
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ const {
isBytes,
isBlob,
isReadableStream,
isFileObject
isFileObject,
mtimeToObject,
modeToNumber
} = require('./utils')

// eslint-disable-next-line complexity
Expand Down Expand Up @@ -105,7 +107,8 @@ module.exports = async function * normaliseInput (input, normaliseContent) {
async function toFileObject (input, normaliseContent) {
// @ts-ignore - Those properties don't exist on most input types
const { path, mode, mtime, content } = input
const file = { path: path || '', mode, mtime }

const file = { path: path || '', mode: modeToNumber(mode), mtime: mtimeToObject(mtime) }

if (content) {
file.content = await normaliseContent(content)
Expand Down
87 changes: 86 additions & 1 deletion packages/ipfs-core-utils/src/files/normalise-input/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,94 @@ function isFileObject (obj) {
const isReadableStream = (value) =>
value && typeof value.getReader === 'function'

/**
* @param {any} mtime
* @returns {{secs:number, nsecs:number}|undefined}
*/
function mtimeToObject (mtime) {
if (mtime == null) {
return undefined
}

// Javascript Date
if (mtime instanceof Date) {
const ms = mtime.getTime()
const secs = Math.floor(ms / 1000)

return {
secs: secs,
nsecs: (ms - (secs * 1000)) * 1000
}
}

// { secs, nsecs }
if (Object.prototype.hasOwnProperty.call(mtime, 'secs')) {
return {
secs: mtime.secs,
nsecs: mtime.nsecs
}
}

// UnixFS TimeSpec
if (Object.prototype.hasOwnProperty.call(mtime, 'Seconds')) {
return {
secs: mtime.Seconds,
nsecs: mtime.FractionalNanoseconds
}
}

// process.hrtime()
if (Array.isArray(mtime)) {
return {
secs: mtime[0],
nsecs: mtime[1]
}
}
/*
TODO: https://github.com/ipfs/aegir/issues/487

// process.hrtime.bigint()
if (typeof mtime === 'bigint') {
const secs = mtime / BigInt(1e9)
const nsecs = mtime - (secs * BigInt(1e9))

return {
secs: parseInt(secs),
nsecs: parseInt(nsecs)
}
}
*/
}

/**
* @param {any} mode
* @returns {number|undefined}
*/
function modeToNumber (mode) {
if (mode == null) {
return undefined
}

if (typeof mode === 'number') {
return mode
}

mode = mode.toString()

if (mode.substring(0, 1) === '0') {
// octal string
return parseInt(mode, 8)
}

// decimal string
return parseInt(mode, 10)
}

module.exports = {
isBytes,
isBlob,
isFileObject,
isReadableStream
isReadableStream,
mtimeToObject,
modeToNumber
}
6 changes: 6 additions & 0 deletions packages/ipfs-core-utils/src/index.js
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
'use strict'

/**
* @template {any[]} ARGS
* @template R
* @typedef {(...args: ARGS) => R} Fn
*/
106 changes: 106 additions & 0 deletions packages/ipfs-core-utils/src/with-timeout-option.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/* eslint-disable no-unreachable */
'use strict'

const TimeoutController = require('timeout-abort-controller')
const { anySignal } = require('any-signal')
const parseDuration = require('parse-duration').default
const { TimeoutError } = require('./errors')

/**
* @template {any[]} ARGS
* @template {Promise<any> | AsyncIterable<any>} R - The return type of `fn`
* @param {Fn<ARGS, R>} fn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achingbrain this seems to have introduced a regression, because Fn typedef ended up in index.js which is not imported here. Which for some reason TS still seems to kind of pick up, but it no longer behaves correctly & causing returned function to be inferred as any.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the whole Fn type is kind of redundant and we should probably change type annotation for withTimeoutOptions to something like this instead:

/**
 * @template {any[]} Args
 * @template {Promise<any> | AsyncIterable<any>} R - The return type of `fn`
 * @param {(...args:Args) => R} fn
 * @param {number} [optionsArgIndex]
 * @returns {(...args:Args) => R}
 */

* @param {number} [optionsArgIndex]
* @returns {Fn<ARGS, R>}
*/
function withTimeoutOption (fn, optionsArgIndex) {
// eslint-disable-next-line
return /** @returns {R} */(/** @type {ARGS} */...args) => {
const options = args[optionsArgIndex == null ? args.length - 1 : optionsArgIndex]
if (!options || !options.timeout) return fn(...args)

const timeout = typeof options.timeout === 'string'
? parseDuration(options.timeout)
: options.timeout

const controller = new TimeoutController(timeout)

options.signal = anySignal([options.signal, controller.signal])

const fnRes = fn(...args)
// eslint-disable-next-line promise/param-names
const timeoutPromise = new Promise((_resolve, reject) => {
controller.signal.addEventListener('abort', () => {
reject(new TimeoutError())
})
})

const start = Date.now()

const maybeThrowTimeoutError = () => {
if (controller.signal.aborted) {
throw new TimeoutError()
}

const timeTaken = Date.now() - start

// if we have starved the event loop by adding microtasks, we could have
// timed out already but the TimeoutController will never know because it's
// setTimeout will not fire until we stop adding microtasks
if (timeTaken > timeout) {
controller.abort()
throw new TimeoutError()
}
}

if (fnRes[Symbol.asyncIterator]) {
// @ts-ignore
return (async function * () {
const it = fnRes[Symbol.asyncIterator]()

try {
while (true) {
const { value, done } = await Promise.race([it.next(), timeoutPromise])

if (done) {
break
}

maybeThrowTimeoutError()

yield value
}
} catch (err) {
maybeThrowTimeoutError()

throw err
} finally {
controller.clear()

if (it.return) {
it.return()
}
}
})()
}

// @ts-ignore
return (async () => {
try {
const res = await Promise.race([fnRes, timeoutPromise])

maybeThrowTimeoutError()

return res
} catch (err) {
maybeThrowTimeoutError()

throw err
} finally {
controller.clear()
}
})()
}
}

module.exports = withTimeoutOption
2 changes: 0 additions & 2 deletions packages/ipfs-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
"dep-check": "aegir dep-check -i typescript -i interface-ipfs-core"
},
"dependencies": {
"any-signal": "^2.0.0",
"array-shuffle": "^1.0.1",
"bignumber.js": "^9.0.0",
"cbor": "^5.1.0",
Expand Down Expand Up @@ -116,7 +115,6 @@
"parse-duration": "^0.4.4",
"peer-id": "^0.14.1",
"streaming-iterables": "^5.0.2",
"timeout-abort-controller": "^1.1.1",
"uint8arrays": "^1.1.0"
},
"devDependencies": {
Expand Down
Loading