diff --git a/packages/ipfs-http-client/package.json b/packages/ipfs-http-client/package.json index 00ccbac4e5..31fe7d8b76 100644 --- a/packages/ipfs-http-client/package.json +++ b/packages/ipfs-http-client/package.json @@ -64,6 +64,7 @@ "multiformats": "^9.4.1", "nanoid": "^3.1.12", "native-abort-controller": "^1.0.3", + "p-defer": "^3.0.0", "parse-duration": "^1.0.0", "stream-to-it": "^0.2.2", "uint8arrays": "^2.1.6" @@ -77,7 +78,6 @@ "it-concat": "^2.0.0", "it-first": "^1.0.4", "nock": "^13.0.2", - "p-defer": "^3.0.0", "rimraf": "^3.0.2" }, "engines": { diff --git a/packages/ipfs-http-client/src/pubsub/subscribe.js b/packages/ipfs-http-client/src/pubsub/subscribe.js index db99bf128f..d265ea6a6b 100644 --- a/packages/ipfs-http-client/src/pubsub/subscribe.js +++ b/packages/ipfs-http-client/src/pubsub/subscribe.js @@ -5,6 +5,7 @@ const uint8ArrayToString = require('uint8arrays/to-string') const log = require('debug')('ipfs-http-client:pubsub:subscribe') const configure = require('../lib/configure') const toUrlSearchParams = require('../lib/to-url-search-params') +const defer = require('p-defer') /** * @typedef {import('../types').HTTPClientExtraOptions} HTTPClientExtraOptions @@ -24,7 +25,9 @@ module.exports = (options, subsTracker) => { * @type {PubsubAPI["subscribe"]} */ async function subscribe (topic, handler, options = {}) { // eslint-disable-line require-await - options.signal = subsTracker.subscribe(topic, handler, options.signal) + const end = defer() + + options.signal = subsTracker.subscribe(topic, handler, end.promise, options.signal) /** @type {(value?: any) => void} */ let done @@ -73,6 +76,9 @@ module.exports = (options, subsTracker) => { done() }) + .finally(() => { + end.resolve() + }) }, 0) return result diff --git a/packages/ipfs-http-client/src/pubsub/subscription-tracker.js b/packages/ipfs-http-client/src/pubsub/subscription-tracker.js index faf15ad6c4..fc5777d069 100644 --- a/packages/ipfs-http-client/src/pubsub/subscription-tracker.js +++ b/packages/ipfs-http-client/src/pubsub/subscription-tracker.js @@ -8,6 +8,7 @@ const { AbortController } = require('native-abort-controller') * @typedef {Object} Subscription * @property {MessageHandlerFn} handler * @property {AbortController} controller + * @property {Promise} end */ class SubscriptionTracker { @@ -19,9 +20,10 @@ class SubscriptionTracker { /** * @param {string} topic * @param {MessageHandlerFn} handler + * @param {Promise} end * @param {AbortSignal} [signal] */ - subscribe (topic, handler, signal) { + subscribe (topic, handler, end, signal) { const topicSubs = this._subs.get(topic) || [] if (topicSubs.find(s => s.handler === handler)) { @@ -31,7 +33,7 @@ class SubscriptionTracker { // Create controller so a call to unsubscribe can cancel the request const controller = new AbortController() - this._subs.set(topic, [{ handler, controller }].concat(topicSubs)) + this._subs.set(topic, [{ handler, controller, end }].concat(topicSubs)) // If there is an external signal, forward the abort event if (signal) { @@ -45,7 +47,7 @@ class SubscriptionTracker { * @param {string} topic * @param {MessageHandlerFn} [handler] */ - unsubscribe (topic, handler) { + async unsubscribe (topic, handler) { const subs = this._subs.get(topic) || [] let unsubs @@ -62,6 +64,10 @@ class SubscriptionTracker { } unsubs.forEach(s => s.controller.abort()) + + await Promise.all( + unsubs.map(sub => sub.end) + ) } } diff --git a/packages/ipfs-http-client/src/pubsub/unsubscribe.js b/packages/ipfs-http-client/src/pubsub/unsubscribe.js index 79c40eb3ba..1b00f88117 100644 --- a/packages/ipfs-http-client/src/pubsub/unsubscribe.js +++ b/packages/ipfs-http-client/src/pubsub/unsubscribe.js @@ -15,7 +15,7 @@ module.exports = (options, subsTracker) => { * @type {PubsubAPI["unsubscribe"]} */ async function unsubscribe (topic, handler) { - subsTracker.unsubscribe(topic, handler) + await subsTracker.unsubscribe(topic, handler) } return unsubscribe }