diff --git a/packages/interface-ipfs-core/src/pubsub/unsubscribe.js b/packages/interface-ipfs-core/src/pubsub/unsubscribe.js index 3f83014ca1..0caa42760a 100644 --- a/packages/interface-ipfs-core/src/pubsub/unsubscribe.js +++ b/packages/interface-ipfs-core/src/pubsub/unsubscribe.js @@ -4,7 +4,6 @@ const { isBrowser, isWebWorker, isElectronRenderer } = require('ipfs-utils/src/env') const { getTopic } = require('./utils') const { getDescribe, getIt, expect } = require('../utils/mocha') -const delay = require('delay') /** @typedef { import("ipfsd-ctl/src/factory") } Factory */ /** @@ -41,9 +40,7 @@ module.exports = (common, options) => { await ipfs.pubsub.unsubscribe(someTopic, handlers[i]) } - await delay(100) - const topics = await ipfs.pubsub.ls() - expect(topics).to.eql([]) + return expect(ipfs.pubsub.ls()).to.eventually.eql([]) }) it(`should subscribe ${count} handlers and unsubscribe once with no reference to the handlers`, async () => { @@ -53,9 +50,7 @@ module.exports = (common, options) => { } await ipfs.pubsub.unsubscribe(someTopic) - await delay(100) - const topics = await ipfs.pubsub.ls() - expect(topics).to.eql([]) + return expect(ipfs.pubsub.ls()).to.eventually.eql([]) }) }) } diff --git a/packages/ipfs-core-types/src/repo/index.d.ts b/packages/ipfs-core-types/src/repo/index.d.ts index 2401185bae..5a930737ad 100644 --- a/packages/ipfs-core-types/src/repo/index.d.ts +++ b/packages/ipfs-core-types/src/repo/index.d.ts @@ -26,14 +26,16 @@ export interface GCOptions extends AbortOptions { } export interface GCError { - err: Error + err: Error, + cid?: never } export interface GCSuccess { + err?: never, cid: CID } -export type GCResult = GCSuccess | GCError +export type GCResult = GCSuccess | GCError export interface StatResult { numObjects: BigInt diff --git a/packages/ipfs-http-client/src/pubsub/index.js b/packages/ipfs-http-client/src/pubsub/index.js index 4898e6687c..4693ebd8a8 100644 --- a/packages/ipfs-http-client/src/pubsub/index.js +++ b/packages/ipfs-http-client/src/pubsub/index.js @@ -1,12 +1,18 @@ 'use strict' +const SubscriptionTracker = require('./subscription-tracker') + /** * @param {import('../types').Options} config */ -module.exports = config => ({ - ls: require('./ls')(config), - peers: require('./peers')(config), - publish: require('./publish')(config), - subscribe: require('./subscribe')(config), - unsubscribe: require('./unsubscribe')(config) -}) +module.exports = config => { + const subscriptionTracker = new SubscriptionTracker() + + return { + ls: require('./ls')(config), + peers: require('./peers')(config), + publish: require('./publish')(config), + subscribe: require('./subscribe')(config, subscriptionTracker), + unsubscribe: require('./unsubscribe')(config, subscriptionTracker) + } +} diff --git a/packages/ipfs-http-client/src/pubsub/subscribe.js b/packages/ipfs-http-client/src/pubsub/subscribe.js index 1348117455..db99bf128f 100644 --- a/packages/ipfs-http-client/src/pubsub/subscribe.js +++ b/packages/ipfs-http-client/src/pubsub/subscribe.js @@ -3,7 +3,6 @@ const uint8ArrayFromString = require('uint8arrays/from-string') const uint8ArrayToString = require('uint8arrays/to-string') const log = require('debug')('ipfs-http-client:pubsub:subscribe') -const SubscriptionTracker = require('./subscription-tracker') const configure = require('../lib/configure') const toUrlSearchParams = require('../lib/to-url-search-params') @@ -12,70 +11,75 @@ const toUrlSearchParams = require('../lib/to-url-search-params') * @typedef {import('ipfs-core-types/src/pubsub').Message} Message * @typedef {(err: Error, fatal: boolean, msg?: Message) => void} ErrorHandlerFn * @typedef {import('ipfs-core-types/src/pubsub').API} PubsubAPI + * @typedef {import('../types').Options} Options */ -module.exports = configure((api, options) => { - const subsTracker = SubscriptionTracker.singleton() - - /** - * @type {PubsubAPI["subscribe"]} - */ - async function subscribe (topic, handler, options = {}) { // eslint-disable-line require-await - options.signal = subsTracker.subscribe(topic, handler, options.signal) - - /** @type {(value?: any) => void} */ - let done - /** @type {(error: Error) => void} */ - let fail - - const result = new Promise((resolve, reject) => { - done = resolve - fail = reject - }) - - // In Firefox, the initial call to fetch does not resolve until some data - // is received. If this doesn't happen within 1 second assume success - const ffWorkaround = setTimeout(() => done(), 1000) - - // Do this async to not block Firefox - setTimeout(() => { - api.post('pubsub/sub', { - timeout: options.timeout, - signal: options.signal, - searchParams: toUrlSearchParams({ - arg: topic, - ...options - }), - headers: options.headers +/** + * @param {Options} options + * @param {import('./subscription-tracker')} subsTracker + */ +module.exports = (options, subsTracker) => { + return configure((api) => { + /** + * @type {PubsubAPI["subscribe"]} + */ + async function subscribe (topic, handler, options = {}) { // eslint-disable-line require-await + options.signal = subsTracker.subscribe(topic, handler, options.signal) + + /** @type {(value?: any) => void} */ + let done + /** @type {(error: Error) => void} */ + let fail + + const result = new Promise((resolve, reject) => { + done = resolve + fail = reject }) - .catch((err) => { - // Initial subscribe fail, ensure we clean up - subsTracker.unsubscribe(topic, handler) - fail(err) + // In Firefox, the initial call to fetch does not resolve until some data + // is received. If this doesn't happen within 1 second assume success + const ffWorkaround = setTimeout(() => done(), 1000) + + // Do this async to not block Firefox + setTimeout(() => { + api.post('pubsub/sub', { + timeout: options.timeout, + signal: options.signal, + searchParams: toUrlSearchParams({ + arg: topic, + ...options + }), + headers: options.headers }) - .then((response) => { - clearTimeout(ffWorkaround) - - if (!response) { - // if there was no response, the subscribe failed - return - } - - readMessages(response, { - onMessage: handler, - onEnd: () => subsTracker.unsubscribe(topic, handler), - onError: options.onError + .catch((err) => { + // Initial subscribe fail, ensure we clean up + subsTracker.unsubscribe(topic, handler) + + fail(err) }) + .then((response) => { + clearTimeout(ffWorkaround) - done() - }) - }, 0) + if (!response) { + // if there was no response, the subscribe failed + return + } - return result - } - return subscribe -}) + readMessages(response, { + onMessage: handler, + onEnd: () => subsTracker.unsubscribe(topic, handler), + onError: options.onError + }) + + done() + }) + }, 0) + + return result + } + return subscribe + })(options) +} /** * @param {import('ipfs-utils/src/types').ExtendedResponse} response diff --git a/packages/ipfs-http-client/src/pubsub/subscription-tracker.js b/packages/ipfs-http-client/src/pubsub/subscription-tracker.js index a257b57a89..faf15ad6c4 100644 --- a/packages/ipfs-http-client/src/pubsub/subscription-tracker.js +++ b/packages/ipfs-http-client/src/pubsub/subscription-tracker.js @@ -16,12 +16,6 @@ class SubscriptionTracker { this._subs = new Map() } - static singleton () { - if (SubscriptionTracker.instance) return SubscriptionTracker.instance - SubscriptionTracker.instance = new SubscriptionTracker() - return SubscriptionTracker.instance - } - /** * @param {string} topic * @param {MessageHandlerFn} handler @@ -63,13 +57,12 @@ class SubscriptionTracker { unsubs = subs } + if (!(this._subs.get(topic) || []).length) { + this._subs.delete(topic) + } + unsubs.forEach(s => s.controller.abort()) } } -/** - * @type {SubscriptionTracker | null} - */ -SubscriptionTracker.instance = null - module.exports = SubscriptionTracker diff --git a/packages/ipfs-http-client/src/pubsub/unsubscribe.js b/packages/ipfs-http-client/src/pubsub/unsubscribe.js index 4989883727..79c40eb3ba 100644 --- a/packages/ipfs-http-client/src/pubsub/unsubscribe.js +++ b/packages/ipfs-http-client/src/pubsub/unsubscribe.js @@ -1,18 +1,16 @@ 'use strict' -const SubscriptionTracker = require('./subscription-tracker') - /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions * @typedef {import('ipfs-core-types/src/pubsub').API} PubsubAPI + * @typedef {import('../types').Options} Options */ /** - * @param {import('../types').Options} config + * @param {Options} options + * @param {import('./subscription-tracker')} subsTracker */ -module.exports = config => { - const subsTracker = SubscriptionTracker.singleton() - +module.exports = (options, subsTracker) => { /** * @type {PubsubAPI["unsubscribe"]} */ diff --git a/packages/ipfs-http-client/src/refs/index.js b/packages/ipfs-http-client/src/refs/index.js index 2ecbc1c2fe..cdab0085b2 100644 --- a/packages/ipfs-http-client/src/refs/index.js +++ b/packages/ipfs-http-client/src/refs/index.js @@ -10,7 +10,7 @@ const toUrlSearchParams = require('../lib/to-url-search-params') * @typedef {import('ipfs-core-types/src/refs').API} RefsAPI */ -module.exports = configure((api, options) => { +module.exports = configure((api, opts) => { /** * @type {RefsAPI["refs"]} */ @@ -34,6 +34,6 @@ module.exports = configure((api, options) => { } return Object.assign(refs, { - local: require('./local')(options) + local: require('./local')(opts) }) }) diff --git a/packages/ipfs-http-server/package.json b/packages/ipfs-http-server/package.json index eb714a3705..cf52c06906 100644 --- a/packages/ipfs-http-server/package.json +++ b/packages/ipfs-http-server/package.json @@ -47,14 +47,16 @@ "ipld-dag-pb": "^0.22.1", "it-all": "^1.0.4", "it-drain": "^1.0.3", + "it-filter": "^1.0.2", "it-first": "^1.0.4", "it-last": "^1.0.4", "it-map": "^1.0.4", + "it-merge": "^1.0.1", "it-multipart": "^2.0.0", "it-pipe": "^1.1.0", + "it-pushable": "^1.4.0", + "it-reduce": "^1.0.5", "it-tar": "^3.0.0", - "it-to-stream": "^1.0.0", - "iterable-ndjson": "^1.1.0", "joi": "^17.2.1", "just-safe-set": "^2.2.1", "multiaddr": "^9.0.1", @@ -64,7 +66,6 @@ "native-abort-controller": "^1.0.3", "parse-duration": "^1.0.0", "stream-to-it": "^0.2.2", - "streaming-iterables": "^5.0.2", "uint8arrays": "^2.1.3", "uri-to-multiaddr": "^5.0.0" }, diff --git a/packages/ipfs-http-server/src/api/resources/block.js b/packages/ipfs-http-server/src/api/resources/block.js index df8108da05..51704e1f04 100644 --- a/packages/ipfs-http-server/src/api/resources/block.js +++ b/packages/ipfs-http-server/src/api/resources/block.js @@ -8,9 +8,7 @@ const Boom = require('@hapi/boom') const { cidToString } = require('ipfs-core-utils/src/cid') const all = require('it-all') const { pipe } = require('it-pipe') -const { map } = require('streaming-iterables') -// @ts-ignore no types -const ndjson = require('iterable-ndjson') +const map = require('it-map') const streamResponse = require('../../utils/stream-response') exports.get = { @@ -234,8 +232,9 @@ exports.rm = { timeout, signal }), - map(({ cid, error }) => ({ Hash: cidToString(cid, { base: cidBase }), Error: error ? error.message : undefined })), - ndjson.stringify + async function * (source) { + yield * map(source, ({ cid, error }) => ({ Hash: cidToString(cid, { base: cidBase }), Error: error ? error.message : undefined })) + } )) } } diff --git a/packages/ipfs-http-server/src/api/resources/dht.js b/packages/ipfs-http-server/src/api/resources/dht.js index f498a3d319..feb58fcb67 100644 --- a/packages/ipfs-http-server/src/api/resources/dht.js +++ b/packages/ipfs-http-server/src/api/resources/dht.js @@ -3,18 +3,8 @@ const Joi = require('../../utils/joi') const Boom = require('@hapi/boom') const { pipe } = require('it-pipe') -// @ts-ignore no types -const ndjson = require('iterable-ndjson') -// @ts-ignore no types -const toStream = require('it-to-stream') -const { map } = require('streaming-iterables') -const { PassThrough } = require('stream') -// @ts-ignore no types -const toIterable = require('stream-to-it') -const debug = require('debug') -const log = Object.assign(debug('ipfs:http-api:dht'), { - error: debug('ipfs:http-api:dht:error') -}) +const map = require('it-map') +const streamResponse = require('../../utils/stream-response') exports.findPeer = { options: { @@ -123,51 +113,26 @@ exports.findProvs = { } } = request - let providersFound = false - const output = new PassThrough() - - pipe( - ipfs.dht.findProvs(cid, { - numProviders, - signal, - timeout - }), - map(({ id, addrs }) => { - providersFound = true - - return { - Responses: [{ - ID: id.toString(), - Addrs: (addrs || []).map((/** @type {import('multiaddr').Multiaddr} */ a) => a.toString()) - }], - Type: 4 - } - }), - ndjson.stringify, - toIterable.sink(output) - ) - .catch((/** @type {Error} */ err) => { - log.error(err) - - if (!providersFound && output.writable) { - output.write(' ') - } - - request.raw.res.addTrailers({ - 'X-Stream-Error': JSON.stringify({ - Message: err.message, - Code: 0 + return streamResponse(request, h, () => { + return pipe( + ipfs.dht.findProvs(cid, { + numProviders, + signal, + timeout + }), + async function * (source) { + yield * map(source, ({ id, addrs }) => { + return { + Responses: [{ + ID: id.toString(), + Addrs: (addrs || []).map(a => a.toString()) + }], + Type: 4 + } }) - }) - }) - .finally(() => { - output.end() - }) - - return h.response(output) - .header('x-chunked-output', '1') - .header('content-type', 'application/json') - .header('Trailer', 'X-Stream-Error') + } + ) + }) } } @@ -352,17 +317,16 @@ exports.query = { } } = request - const response = toStream.readable( - pipe( + return streamResponse(request, h, () => { + return pipe( ipfs.dht.query(peerId, { signal, timeout }), - map(({ id }) => ({ ID: id.toString() })), - ndjson.stringify + async function * (source) { + yield * map(source, ({ id }) => ({ ID: id.toString() })) + } ) - ) - - return h.response(response) + }) } } diff --git a/packages/ipfs-http-server/src/api/resources/files-regular.js b/packages/ipfs-http-server/src/api/resources/files-regular.js index 25e5de9e25..ea39d7481c 100644 --- a/packages/ipfs-http-server/src/api/resources/files-regular.js +++ b/packages/ipfs-http-server/src/api/resources/files-regular.js @@ -1,24 +1,17 @@ 'use strict' const multipart = require('../../utils/multipart-request-parser') -const debug = require('debug') // @ts-ignore no types const tar = require('it-tar') -const log = Object.assign(debug('ipfs:http-api:files'), { - error: debug('ipfs:http-api:files:error') -}) -// @ts-ignore no types -const toIterable = require('stream-to-it') const Joi = require('../../utils/joi') const Boom = require('@hapi/boom') -const { PassThrough } = require('stream') const { cidToString } = require('ipfs-core-utils/src/cid') const { pipe } = require('it-pipe') const all = require('it-all') -// @ts-ignore no types -const ndjson = require('iterable-ndjson') -const { map } = require('streaming-iterables') const streamResponse = require('../../utils/stream-response') +const merge = require('it-merge') +const { PassThrough } = require('stream') +const map = require('it-map') /** * @param {AsyncIterable} source @@ -260,25 +253,14 @@ exports.add = { } } = request - let filesParsed = false - const output = new PassThrough() - /** - * @type {import('ipfs-core-types/src/root').AddProgressFn} - */ - const progressHandler = (bytes, path) => { - // TODO: path should be passed as a second option - output.write(JSON.stringify({ - Name: path, - Bytes: bytes - }) + '\n') - } - - pipe( + return streamResponse(request, h, () => pipe( multipart(request.raw.req), /** * @param {AsyncIterable} source */ async function * (source) { + let filesParsed = false + for await (const entry of source) { if (entry.type === 'file') { filesParsed = true @@ -301,73 +283,69 @@ exports.add = { } } } + + if (!filesParsed) { + throw new Error("File argument 'data' is required.") + } }, /** * @param {import('ipfs-core-types/src/utils').ImportCandidateStream} source */ - function (source) { - return ipfs.addAll(source, { - cidVersion, - rawLeaves, - progress: progress ? progressHandler : () => {}, - onlyHash, - hashAlg, - wrapWithDirectory, - pin, - chunker, - trickle, - preload, - shardSplitThreshold, - - // this has to be hardcoded to 1 because we can only read one file - // at a time from a http request and we have to consume it completely - // before we can read the next file - fileImportConcurrency: 1, - blockWriteConcurrency, - signal, - timeout - }) - }, - map(file => { - return { - Name: file.path, - Hash: cidToString(file.cid, { base: cidBase }), - Size: file.size, - Mode: file.mode === undefined ? undefined : file.mode.toString(8).padStart(4, '0'), - Mtime: file.mtime ? file.mtime.secs : undefined, - MtimeNsecs: file.mtime ? file.mtime.nsecs : undefined - } - }), - ndjson.stringify, - toIterable.sink(output) - ) - .then(() => { - if (!filesParsed) { - throw new Error("File argument 'data' is required.") - } - }) - .catch((/** @type {Error} */ err) => { - log.error(err) - - if (!filesParsed && output.writable) { - output.write(' ') - } - - request.raw.res.addTrailers({ - 'X-Stream-Error': JSON.stringify({ - Message: err.message, - Code: 0 - }) + async function * (source) { + const progressStream = new PassThrough({ + objectMode: true }) - }) - .finally(() => { - output.end() - }) - return h.response(output) - .header('x-chunked-output', '1') - .header('content-type', 'application/json') - .header('Trailer', 'X-Stream-Error') + yield * merge( + progressStream, + pipe( + ipfs.addAll(source, { + cidVersion, + rawLeaves, + progress: progress + ? (bytes, path) => { + progressStream.write({ + Name: path, + Bytes: bytes + }) + } + : () => {}, + onlyHash, + hashAlg, + wrapWithDirectory, + pin, + chunker, + trickle, + preload, + shardSplitThreshold, + + // this has to be hardcoded to 1 because we can only read one file + // at a time from a http request and we have to consume it completely + // before we can read the next file + fileImportConcurrency: 1, + blockWriteConcurrency, + signal, + timeout + }), + async function * (source) { + yield * map(source, file => { + return { + Name: file.path, + Hash: cidToString(file.cid, { base: cidBase }), + Size: file.size, + Mode: file.mode === undefined ? undefined : file.mode.toString(8).padStart(4, '0'), + Mtime: file.mtime ? file.mtime.secs : undefined, + MtimeNsecs: file.mtime ? file.mtime.nsecs : undefined + } + }) + + // no more files, end the progress stream + progressStream.end() + } + ) + ) + } + )) } } @@ -475,8 +453,9 @@ exports.ls = { signal, timeout }), - map(link => ({ Objects: [{ Hash: path, Links: [mapLink(link)] }] })), - ndjson.stringify + async function * (source) { + yield * map(source, link => ({ Objects: [{ Hash: path, Links: [mapLink(link)] }] })) + } )) } } @@ -557,8 +536,9 @@ exports.refs = { signal, timeout }), - map(({ ref, err }) => ({ Ref: ref, Err: err })), - ndjson.stringify + async function * (source) { + yield * map(source, ({ ref, err }) => ({ Ref: ref, Err: err })) + } )) } } @@ -600,8 +580,9 @@ exports.refsLocal = { signal, timeout }), - map(({ ref, err }) => ({ Ref: ref, Err: err })), - ndjson.stringify + async function * (source) { + yield * map(source, ({ ref, err }) => ({ Ref: ref, Err: err })) + } )) } } diff --git a/packages/ipfs-http-server/src/api/resources/files/ls.js b/packages/ipfs-http-server/src/api/resources/files/ls.js index 22a6bd6642..44a4de10b6 100644 --- a/packages/ipfs-http-server/src/api/resources/files/ls.js +++ b/packages/ipfs-http-server/src/api/resources/files/ls.js @@ -74,8 +74,7 @@ const mfsLs = { signal, timeout }), - source => map(source, (entry) => mapEntry(entry, { cidBase, long })), - source => map(source, (entry) => JSON.stringify(entry) + '\n') + source => map(source, (entry) => mapEntry(entry, { cidBase, long })) )) } diff --git a/packages/ipfs-http-server/src/api/resources/name.js b/packages/ipfs-http-server/src/api/resources/name.js index 54fafee3cc..401387e1d5 100644 --- a/packages/ipfs-http-server/src/api/resources/name.js +++ b/packages/ipfs-http-server/src/api/resources/name.js @@ -2,10 +2,8 @@ const Joi = require('../../utils/joi') const { pipe } = require('it-pipe') -const { map } = require('streaming-iterables') +const map = require('it-map') const last = require('it-last') -// @ts-ignore no types -const ndjson = require('iterable-ndjson') const streamResponse = require('../../utils/stream-response') exports.resolve = { @@ -68,8 +66,9 @@ exports.resolve = { signal, timeout }), - map(value => ({ Path: value })), - ndjson.stringify + async function * (source) { + yield * map(source, value => ({ Path: value })) + } )) } } diff --git a/packages/ipfs-http-server/src/api/resources/pin.js b/packages/ipfs-http-server/src/api/resources/pin.js index 8b020d024a..2bb1407ee8 100644 --- a/packages/ipfs-http-server/src/api/resources/pin.js +++ b/packages/ipfs-http-server/src/api/resources/pin.js @@ -2,10 +2,9 @@ const Joi = require('../../utils/joi') const Boom = require('@hapi/boom') -const { map, reduce } = require('streaming-iterables') +const map = require('it-map') +const reduce = require('it-reduce') const { pipe } = require('it-pipe') -// @ts-ignore no types -const ndjson = require('iterable-ndjson') const { cidToString } = require('ipfs-core-utils/src/cid') const streamResponse = require('../../utils/stream-response') const all = require('it-all') @@ -94,10 +93,16 @@ exports.ls = { if (!stream) { const res = await pipe( source, - reduce((/** @type {{ Keys: Record }} */ res, { type, cid, metadata }) => { - res.Keys[cidToString(cid, { base: cidBase })] = toPin(type, undefined, metadata) - return res - }, { Keys: {} }) + function collectKeys (source) { + /** @type {{ Keys: Record }} */ + const init = { Keys: {} } + + return reduce(source, (res, { type, cid, metadata }) => { + res.Keys[cidToString(cid, { base: cidBase })] = toPin(type, undefined, metadata) + + return res + }, init) + } ) return h.response(res) @@ -105,8 +110,9 @@ exports.ls = { return streamResponse(request, h, () => pipe( source, - map(({ type, cid, metadata }) => toPin(type, cidToString(cid, { base: cidBase }), metadata)), - ndjson.stringify + async function * transform (source) { + yield * map(source, ({ type, cid, metadata }) => toPin(type, cidToString(cid, { base: cidBase }), metadata)) + } )) } } diff --git a/packages/ipfs-http-server/src/api/resources/ping.js b/packages/ipfs-http-server/src/api/resources/ping.js index 63ed42eb25..23b863cd05 100644 --- a/packages/ipfs-http-server/src/api/resources/ping.js +++ b/packages/ipfs-http-server/src/api/resources/ping.js @@ -2,9 +2,7 @@ const Joi = require('../../utils/joi') const { pipe } = require('it-pipe') -const { map } = require('streaming-iterables') -// @ts-ignore no types -const ndjson = require('iterable-ndjson') +const map = require('it-map') const streamResponse = require('../../utils/stream-response') module.exports = { @@ -51,13 +49,14 @@ module.exports = { } = request return streamResponse(request, h, () => pipe( - ipfs.ping(peerId, { + ipfs.ping(peerId.toString(), { count, signal, timeout }), - map(pong => ({ Success: pong.success, Time: pong.time, Text: pong.text })), - ndjson.stringify + async function * (source) { + yield * map(source, pong => ({ Success: pong.success, Time: pong.time, Text: pong.text })) + } )) } } diff --git a/packages/ipfs-http-server/src/api/resources/pubsub.js b/packages/ipfs-http-server/src/api/resources/pubsub.js index 5d62a3a45d..a7bec9ae20 100644 --- a/packages/ipfs-http-server/src/api/resources/pubsub.js +++ b/packages/ipfs-http-server/src/api/resources/pubsub.js @@ -1,12 +1,13 @@ 'use strict' const Joi = require('../../utils/joi') -const PassThrough = require('stream').PassThrough const all = require('it-all') const multipart = require('../../utils/multipart-request-parser') const Boom = require('@hapi/boom') const uint8ArrayToString = require('uint8arrays/to-string') const uint8ArrayFromString = require('uint8arrays/from-string') +const streamResponse = require('../../utils/stream-response') +const pushable = require('it-pushable') exports.subscribe = { options: { @@ -45,39 +46,43 @@ exports.subscribe = { topic } } = request - const res = new PassThrough({ highWaterMark: 1 }) - /** - * @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} - */ - const handler = (msg) => { - res.write(JSON.stringify({ - from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base58btc'), 'base64pad'), - data: uint8ArrayToString(msg.data, 'base64pad'), - seqno: uint8ArrayToString(msg.seqno, 'base64pad'), - topicIDs: msg.topicIDs - }) + '\n', 'utf8') - } + // request.raw.res.setHeader('x-chunked-output', '1') + request.raw.res.setHeader('content-type', 'identity') // stop gzip from buffering, see https://github.com/hapijs/hapi/issues/2975 + // request.raw.res.setHeader('Trailer', 'X-Stream-Error') - // js-ipfs-http-client needs a reply, and go-ipfs does the same thing - res.write('{}\n') + return streamResponse(request, h, () => { + const output = pushable() - const unsubscribe = () => { - ipfs.pubsub.unsubscribe(topic, handler) - res.end() - } + /** + * @type {import('ipfs-core-types/src/pubsub').MessageHandlerFn} + */ + const handler = (msg) => { + output.push({ + from: uint8ArrayToString(uint8ArrayFromString(msg.from, 'base58btc'), 'base64pad'), + data: uint8ArrayToString(msg.data, 'base64pad'), + seqno: uint8ArrayToString(msg.seqno, 'base64pad'), + topicIDs: msg.topicIDs + }) + } - request.events.once('disconnect', unsubscribe) - request.events.once('finish', unsubscribe) + // js-ipfs-http-client needs a reply, and go-ipfs does the same thing + output.push({}) - await ipfs.pubsub.subscribe(topic, handler, { - signal - }) + const unsubscribe = () => { + ipfs.pubsub.unsubscribe(topic, handler) + output.end() + } + + request.raw.res.once('close', unsubscribe) + + ipfs.pubsub.subscribe(topic, handler, { + signal + }) + .catch(err => output.end(err)) - return h.response(res) - .header('X-Chunked-Output', '1') - .header('content-encoding', 'identity') // stop gzip from buffering, see https://github.com/hapijs/hapi/issues/2975 - .header('content-type', 'application/json') + return output + }) } } diff --git a/packages/ipfs-http-server/src/api/resources/repo.js b/packages/ipfs-http-server/src/api/resources/repo.js index abb3f42e28..2de56459ce 100644 --- a/packages/ipfs-http-server/src/api/resources/repo.js +++ b/packages/ipfs-http-server/src/api/resources/repo.js @@ -1,10 +1,9 @@ 'use strict' const Joi = require('../../utils/joi') -const { map, filter } = require('streaming-iterables') +const map = require('it-map') +const filter = require('it-filter') const { pipe } = require('it-pipe') -// @ts-ignore no types -const ndjson = require('iterable-ndjson') const streamResponse = require('../../utils/stream-response') exports.gc = { @@ -49,12 +48,15 @@ exports.gc = { signal, timeout }), - filter(r => !r.err || streamErrors), - map(r => ({ - Error: (r.err && r.err.message) || undefined, - Key: (!r.err && { '/': r.cid.toString() }) || undefined - })), - ndjson.stringify + async function * filterErrors (source) { + yield * filter(source, r => !r.err || streamErrors) + }, + async function * transformGcOutput (source) { + yield * map(source, r => ({ + Error: (r.err && r.err.message) || undefined, + Key: (!r.err && { '/': r.cid.toString() }) || undefined + })) + } )) } } diff --git a/packages/ipfs-http-server/src/api/resources/stats.js b/packages/ipfs-http-server/src/api/resources/stats.js index 80628038c1..49245582f3 100644 --- a/packages/ipfs-http-server/src/api/resources/stats.js +++ b/packages/ipfs-http-server/src/api/resources/stats.js @@ -1,9 +1,7 @@ 'use strict' -const { map } = require('streaming-iterables') +const map = require('it-map') const { pipe } = require('it-pipe') -// @ts-ignore no types -const ndjson = require('iterable-ndjson') const streamResponse = require('../../utils/stream-response') const Joi = require('../../utils/joi') @@ -59,13 +57,14 @@ exports.bw = { signal, timeout }), - map(stat => ({ - TotalIn: stat.totalIn.toString(), - TotalOut: stat.totalOut.toString(), - RateIn: stat.rateIn.toString(), - RateOut: stat.rateOut.toString() - })), - ndjson.stringify + async function * (source) { + yield * map(source, stat => ({ + TotalIn: stat.totalIn.toString(), + TotalOut: stat.totalOut.toString(), + RateIn: stat.rateIn.toString(), + RateOut: stat.rateOut.toString() + })) + } )) } } diff --git a/packages/ipfs-http-server/src/utils/stream-response.js b/packages/ipfs-http-server/src/utils/stream-response.js index 18b392c885..1838ea1987 100644 --- a/packages/ipfs-http-server/src/utils/stream-response.js +++ b/packages/ipfs-http-server/src/utils/stream-response.js @@ -6,18 +6,16 @@ const log = require('debug')('ipfs:http-api:utils:stream-response') // @ts-ignore no types const toIterable = require('stream-to-it') -const errorTrailer = 'X-Stream-Error' +const ERROR_TRAILER = 'X-Stream-Error' /** * * @param {import('../types').Request} request * @param {import('@hapi/hapi').ResponseToolkit} h * @param {() => AsyncIterable} getSource - * @param {{ objectMode?: boolean, onError?: (error: Error) => void }} [options] + * @param {{ onError?: (error: Error) => void }} [options] */ async function streamResponse (request, h, getSource, options = {}) { - options.objectMode = options.objectMode !== false - // eslint-disable-next-line no-async-promise-executor const stream = await new Promise(async (resolve, reject) => { let started = false @@ -32,7 +30,12 @@ async function streamResponse (request, h, getSource, options = {}) { started = true resolve(stream) } - yield chunk + + if (chunk instanceof Uint8Array || typeof chunk === 'string') { + yield chunk + } else { + yield JSON.stringify(chunk) + '\n' + } } if (!started) { // Maybe it was an empty source? @@ -46,9 +49,9 @@ async function streamResponse (request, h, getSource, options = {}) { options.onError(err) } - if (started) { + if (request.raw.res.headersSent) { request.raw.res.addTrailers({ - [errorTrailer]: JSON.stringify({ + [ERROR_TRAILER]: JSON.stringify({ Message: err.message, Code: 0 }) @@ -66,9 +69,9 @@ async function streamResponse (request, h, getSource, options = {}) { }) return h.response(stream) - .header('x-chunked-output', '1') - .header('content-type', 'application/json') - .header('Trailer', errorTrailer) + .header('X-Chunked-Output', '1') + .header('Content-Type', 'application/json') + .header('Trailer', ERROR_TRAILER) } module.exports = streamResponse diff --git a/packages/ipfs-http-server/test/inject/mfs/ls.js b/packages/ipfs-http-server/test/inject/mfs/ls.js index 3c428c1bdc..7a1e6410f3 100644 --- a/packages/ipfs-http-server/test/inject/mfs/ls.js +++ b/packages/ipfs-http-server/test/inject/mfs/ls.js @@ -122,7 +122,7 @@ describe('/files/ls', () => { }) it('accepts a timeout when streaming', async () => { - ipfs.files.ls.withArgs({ + ipfs.files.ls.withArgs(path, { ...defaultOptions, timeout: 1000 }).returns([file]) diff --git a/packages/ipfs-http-server/test/inject/ping.js b/packages/ipfs-http-server/test/inject/ping.js index e77a7c679d..e5171747c3 100644 --- a/packages/ipfs-http-server/test/inject/ping.js +++ b/packages/ipfs-http-server/test/inject/ping.js @@ -8,7 +8,6 @@ const http = require('../utils/http') const sinon = require('sinon') const allNdjson = require('../utils/all-ndjson') const { AbortSignal } = require('native-abort-controller') -const CID = require('cids') const defaultOptions = { count: 10, @@ -48,8 +47,8 @@ describe('/ping', function () { expect(res).to.have.property('statusCode', 400) }) - it('returns 500 for incorrect Peer Id', async () => { - ipfs.ping.withArgs(new CID(peerId)).throws(new Error('derp')) + it('returns error for incorrect Peer Id', async () => { + ipfs.ping.withArgs(peerId).throws(new Error('derp')) const res = await http({ method: 'POST', @@ -60,7 +59,7 @@ describe('/ping', function () { }) it('pings with a count', async () => { - ipfs.ping.withArgs(new CID(peerId), { + ipfs.ping.withArgs(peerId, { ...defaultOptions, count: 5 }).returns([]) @@ -74,7 +73,7 @@ describe('/ping', function () { }) it('pings with a count as n', async () => { - ipfs.ping.withArgs(new CID(peerId), { + ipfs.ping.withArgs(peerId, { ...defaultOptions, count: 5 }).returns([]) @@ -88,7 +87,7 @@ describe('/ping', function () { }) it('pings a remote peer', async () => { - ipfs.ping.withArgs(new CID(peerId), defaultOptions).returns([{ + ipfs.ping.withArgs(peerId, defaultOptions).returns([{ success: true, time: 1, text: 'hello' @@ -116,7 +115,7 @@ describe('/ping', function () { }) it('accepts a timeout', async () => { - ipfs.ping.withArgs(new CID(peerId), { + ipfs.ping.withArgs(peerId, { ...defaultOptions, timeout: 1000 }).returns([])