Skip to content

Commit

Permalink
fix: stalling subscription on (node) http-client when daemon is stopp…
Browse files Browse the repository at this point in the history
…ed (#3468)

This change fixes #3465 by upgrading to a temporary fork of node-fetch with
node-fetch/node-fetch#1172 applied.

Co-authored-by: achingbrain <alex@achingbrain.net>
  • Loading branch information
Gozala and achingbrain authored Jun 1, 2021
1 parent 58052db commit bf88d98
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 8 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"ipfs-core-types": "^0.5.0",
"ipfs-core-utils": "^0.8.1",
"ipfs-unixfs": "^4.0.3",
"ipfs-utils": "^7.0.0",
"ipfs-utils": "^8.1.2",
"ipld-block": "^0.11.0",
"ipld-dag-cbor": "^1.0.0",
"ipld-dag-pb": "^0.22.1",
Expand Down
30 changes: 23 additions & 7 deletions src/pubsub/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ module.exports = configure((api, options) => {
return
}

readMessages(response.ndjson(), {
readMessages(response, {
onMessage: handler,
onEnd: () => subsTracker.unsubscribe(topic, handler),
onError: options.onError
Expand All @@ -78,17 +78,17 @@ module.exports = configure((api, options) => {
})

/**
* @param {*} msgStream
* @param {import('ipfs-utils/src/types').ExtendedResponse} response
* @param {object} options
* @param {(message: Message) => void} options.onMessage
* @param {() => void} options.onEnd
* @param {ErrorHandlerFn} [options.onError]
*/
async function readMessages (msgStream, { onMessage, onEnd, onError }) {
async function readMessages (response, { onMessage, onEnd, onError }) {
onError = onError || log

try {
for await (const msg of msgStream) {
for await (const msg of response.ndjson()) {
try {
if (!msg.from) {
continue
Expand All @@ -106,12 +106,28 @@ async function readMessages (msgStream, { onMessage, onEnd, onError }) {
}
}
} catch (err) {
// FIXME: In testing with Chrome, err.type is undefined (should not be!)
// Temporarily use the name property instead.
if (err.type !== 'aborted' && err.name !== 'AbortError') {
if (!isAbortError(err)) {
onError(err, true) // Fatal
}
} finally {
onEnd()
}
}

/**
* @param {Error & {type?:string}} error
* @returns {boolean}
*/
const isAbortError = error => {
switch (error.type) {
case 'aborted':
return true
// It is `abort` in Electron instead of `aborted`
case 'abort':
return true
default:
// FIXME: In testing with Chrome, err.type is undefined (should not be!)
// Temporarily use the name property instead.
return error.name === 'AbortError'
}
}
84 changes: 84 additions & 0 deletions test/pubsub.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/* eslint-env mocha */
'use strict'

const { expect } = require('aegir/utils/chai')
const { AbortController } = require('native-abort-controller')

const f = require('./utils/factory')()

describe('.pubsub', function () {
this.timeout(20 * 1000)
describe('.subscribe', () => {
let ipfs
let ctl

beforeEach(async function () {
this.timeout(30 * 1000) // slow CI

ctl = await await f.spawn({
args: '--enable-pubsub-experiment'
})

ipfs = ctl.api
})

afterEach(() => f.clean())

it('.onError when connection is closed', async () => {
const topic = 'gossipboom'
let messageCount = 0
let onError
const error = new Promise(resolve => { onError = resolve })

await ipfs.pubsub.subscribe(topic, message => {
messageCount++

if (messageCount === 2) {
// Stop the daemon
ctl.stop().catch()
}
}, {
onError
})

await ipfs.pubsub.publish(topic, 'hello')
await ipfs.pubsub.publish(topic, 'bye')

await expect(error).to.eventually.be.fulfilled().and.to.be.instanceOf(Error)
})

it('does not call onError when aborted', async () => {
const controller = new AbortController()
const topic = 'gossipabort'
const messages = []
let onError
let onReceived

const received = new Promise(resolve => { onReceived = resolve })
const error = new Promise(resolve => { onError = resolve })

await ipfs.pubsub.subscribe(topic, message => {
messages.push(message)
if (messages.length === 2) {
onReceived()
}
}, {
onError,
signal: controller.signal
})

await ipfs.pubsub.publish(topic, 'hello')
await ipfs.pubsub.publish(topic, 'bye')

await received
controller.abort()

// Stop the daemon
await ctl.stop()
// Just to make sure no error is caused by above line
setTimeout(onError, 200, 'aborted')

await expect(error).to.eventually.be.fulfilled().and.to.equal('aborted')
})
})
})

0 comments on commit bf88d98

Please sign in to comment.