Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort signal #169

Merged
merged 18 commits into from
May 20, 2020
56 changes: 43 additions & 13 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ class Parser extends HTTPParser {
assert(!this.read)
assert(!this.body)

let body = request.callback(null, {
statusCode,
headers: parseHeaders(headers),
opaque: request.opaque
}, resumeSocket)
request.callback = null
request.opaque = null
let body

if (request.callback) {
body = request.callback(null, {
statusCode,
headers: parseHeaders(headers),
opaque: request.opaque
}, resumeSocket)
request.callback = null
request.opaque = null
}

if (body && skipBody) {
body(null, null)
Expand All @@ -82,6 +86,12 @@ class Parser extends HTTPParser {

if (body) {
this.body = body

if (request.signal) {
request.signal.once('abort', () => {
body(new RequestAbortedError(), null)
})
}
} else {
this.next()
}
Expand Down Expand Up @@ -702,9 +712,18 @@ function resume (client) {
body,
chunked,
rawHeaders,
idempotent
idempotent,
callback,
signal
} = client[kQueue][client[kInflight]]

if (!callback) {
// Request was aborted.
client[kQueue].splice(client[kInflight], 1)
resume(client)
return
}

if (!idempotent && client.running) {
// Non-idempotent request cannot be retried.
// Ensure that no other requests are inflight and
Expand Down Expand Up @@ -767,19 +786,26 @@ function resume (client) {
const onDrain = () => {
body.resume()
}
const onClose = () => {
const onAbort = () => {
onFinished(new RequestAbortedError())
}
const onSocketClose = () => {
onFinished(new SocketError('other side close'))
delvedor marked this conversation as resolved.
Show resolved Hide resolved
}
const onFinished = (err) => {
if (signal) {
signal.removeListener('abort', onAbort)
}

socket
.removeListener('drain', onDrain)
.removeListener('error', onFinished)
.removeListener('close', onClose)
.removeListener('close', onSocketClose)
body
.removeListener('data', onData)
.removeListener('end', onFinished)
.removeListener('error', onFinished)
.removeListener('close', onClose)
.removeListener('close', onAbort)
.on('error', nop)

if (err) {
Expand All @@ -805,16 +831,20 @@ function resume (client) {
}
}

if (signal) {
signal.on('abort', onAbort)
}

body
.on('data', onData)
.on('end', onFinished)
.on('error', onFinished)
.on('close', onClose)
.on('close', onAbort)

socket
.on('drain', onDrain)
.on('error', onFinished)
.on('close', onClose)
.on('close', onSocketClose)
.uncork()

client[kWriting] = true
Expand Down
32 changes: 30 additions & 2 deletions lib/request.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
'use strict'

const { AsyncResource } = require('async_hooks')
const { InvalidArgumentError } = require('./errors')
const {
InvalidArgumentError,
RequestAbortedError
} = require('./errors')

const methods = [
'ACL',
Expand Down Expand Up @@ -54,7 +57,11 @@ class Request extends AsyncResource {
constructor (opts, callback) {
super('UNDICI_REQ')

const { path, method, body, headers, idempotent, opaque } = opts
if (!opts) {
throw new InvalidArgumentError('no options passed')
}

const { path, method, body, headers, idempotent, opaque, signal } = opts

if (!(typeof path === 'string' && path[0] === '/')) {
throw new InvalidArgumentError('path must be a valid path')
Expand All @@ -64,10 +71,16 @@ class Request extends AsyncResource {
throw new InvalidArgumentError('method must be a valid method')
}

if (signal && typeof signal.on !== 'function') {
throw new InvalidArgumentError('signal must implement .on(name, callback)')
}

if (!isValidBody(body)) {
throw new InvalidArgumentError('body must be a string, a Buffer or a Readable stream')
}

this.signal = signal

this.method = method

this.path = path
Expand All @@ -88,6 +101,21 @@ class Request extends AsyncResource {

this.rawHeaders = ''

if (this.signal) {
this.signal.once('abort', () => {
const { callback } = this

if (!callback) {
return
}

this.callback = null
this.opaque = null

callback(new RequestAbortedError(), null)
})
}

if (headers) {
const headerNames = Object.keys(headers)
for (let i = 0; i < headerNames.length; i++) {
Expand Down
106 changes: 106 additions & 0 deletions test/abort.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
'use strict'

const { test } = require('tap')
const EventEmitter = require('events')
const { Client, errors } = require('..')
const { createServer } = require('http')

test('Abort while sending request - event emitter (no body)', { skip: 'never ending' }, (t) => {
t.plan(1)

const server = createServer((req, res) => {
t.fail('The requets should be aborted')
})
t.teardown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
const ee = new EventEmitter()
t.teardown(client.close.bind(client))

client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => {
t.ok(err instanceof errors.RequestAbortedError)
})

ee.emit('abort')
})
})

test('Abort while waiting response - event emitter (no body)', (t) => {
t.plan(1)

const server = createServer((req, res) => {
setTimeout(() => {
res.setHeader('content-type', 'text/plain')
res.end('hello world')
}, 1000)
})
t.teardown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
const ee = new EventEmitter()
t.teardown(client.close.bind(client))

client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => {
t.ok(err instanceof errors.RequestAbortedError)
})

setTimeout(() => {
ee.emit('abort')
}, 500)
})
})

test('Abort while waiting response - event emitter (write headers started) (no body)', (t) => {
t.plan(1)

const server = createServer((req, res) => {
res.writeHead(200, { 'content-type': 'text/plain' })
setTimeout(() => {
res.end('hello world')
}, 1000)
})
t.teardown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
const ee = new EventEmitter()
t.teardown(client.close.bind(client))

client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => {
t.ok(err instanceof errors.RequestAbortedError)
})

setTimeout(() => {
ee.emit('abort')
}, 500)
})
})

test('Abort while waiting response - event emitter (write headers (write body started) (no body)', (t) => {
t.plan(1)

const server = createServer((req, res) => {
res.writeHead(200, { 'content-type': 'text/plain' })
res.write('hello')
setTimeout(() => {
res.end('world')
}, 1000)
})
t.teardown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
const ee = new EventEmitter()
t.teardown(client.close.bind(client))

client.request({ path: '/', method: 'GET', signal: ee }, (err, response) => {
t.ok(err instanceof errors.RequestAbortedError)
delvedor marked this conversation as resolved.
Show resolved Hide resolved
})

setTimeout(() => {
ee.emit('abort')
}, 500)
delvedor marked this conversation as resolved.
Show resolved Hide resolved
})
})