Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort signal #169

Merged
merged 18 commits into from
May 20, 2020
Merged
69 changes: 56 additions & 13 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ class Parser extends HTTPParser {
assert(!this.read)
assert(!this.body)

let body = request.callback(null, {
statusCode,
headers: parseHeaders(headers),
opaque: request.opaque
}, resumeSocket)
request.callback = null
request.opaque = null
let body

if (request.callback) {
body = request.callback(null, {
statusCode,
headers: parseHeaders(headers),
opaque: request.opaque
}, resumeSocket)
request.callback = null
request.opaque = null
}

if (body && skipBody) {
body(null, null)
Expand All @@ -82,6 +86,17 @@ class Parser extends HTTPParser {

if (body) {
this.body = body

if (request.signal) {
const onAbort = () => {
body(new RequestAbortedError(), null)
}
if ('addEventListener' in request.signal) {
request.signal.addEventListener('abort', onAbort)
} else {
request.signal.once('abort', onAbort)
}
}
} else {
this.next()
}
Expand Down Expand Up @@ -702,9 +717,18 @@ function resume (client) {
body,
chunked,
rawHeaders,
idempotent
idempotent,
callback,
signal
} = client[kQueue][client[kInflight]]

if (!callback) {
// Request was aborted.
client[kQueue].splice(client[kInflight], 1)
resume(client)
return
}

if (!idempotent && client.running) {
// Non-idempotent request cannot be retried.
// Ensure that no other requests are inflight and
Expand Down Expand Up @@ -767,19 +791,30 @@ function resume (client) {
const onDrain = () => {
body.resume()
}
const onClose = () => {
const onAbort = () => {
onFinished(new RequestAbortedError())
}
const onSocketClose = () => {
onFinished(new SocketError('other side close'))
delvedor marked this conversation as resolved.
Show resolved Hide resolved
}
const onFinished = (err) => {
if (signal) {
if ('removeEventListener' in signal) {
signal.removeEventListener('abort', onAbort)
} else {
signal.removeListener('abort', onAbort)
}
}

socket
.removeListener('drain', onDrain)
.removeListener('error', onFinished)
.removeListener('close', onClose)
.removeListener('close', onSocketClose)
body
.removeListener('data', onData)
.removeListener('end', onFinished)
.removeListener('error', onFinished)
.removeListener('close', onClose)
.removeListener('close', onAbort)
.on('error', nop)

if (err) {
Expand All @@ -805,16 +840,24 @@ function resume (client) {
}
}

if (signal) {
if ('addEventListener' in signal) {
signal.addEventListener('abort', onAbort)
} else {
signal.on('abort', onAbort)
}
}

body
.on('data', onData)
.on('end', onFinished)
.on('error', onFinished)
.on('close', onClose)
.on('close', onAbort)

socket
.on('drain', onDrain)
.on('error', onFinished)
.on('close', onClose)
.on('close', onSocketClose)
.uncork()

client[kWriting] = true
Expand Down
38 changes: 36 additions & 2 deletions lib/request.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
'use strict'

const { AsyncResource } = require('async_hooks')
const { InvalidArgumentError } = require('./errors')
const {
InvalidArgumentError,
RequestAbortedError
} = require('./errors')

const methods = [
'ACL',
Expand Down Expand Up @@ -54,7 +57,11 @@ class Request extends AsyncResource {
constructor (opts, callback) {
super('UNDICI_REQ')

const { path, method, body, headers, idempotent, opaque } = opts
if (!opts) {
throw new InvalidArgumentError('no options passed')
}

const { path, method, body, headers, idempotent, opaque, signal } = opts

if (!(typeof path === 'string' && path[0] === '/')) {
throw new InvalidArgumentError('path must be a valid path')
Expand All @@ -64,10 +71,16 @@ class Request extends AsyncResource {
throw new InvalidArgumentError('method must be a valid method')
}

if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must implement .on(name, callback)')
}

if (!isValidBody(body)) {
throw new InvalidArgumentError('body must be a string, a Buffer or a Readable stream')
}

this.signal = signal

this.method = method

this.path = path
Expand All @@ -88,6 +101,27 @@ class Request extends AsyncResource {

this.rawHeaders = ''

if (this.signal) {
const onAbort = () => {
const { callback } = this

if (!callback) {
return
}

this.callback = null
this.opaque = null

callback(new RequestAbortedError(), null)
}

if ('addEventListener' in this.signal) {
this.signal.addEventListener('abort', onAbort)
} else {
this.signal.once('abort', onAbort)
}
}

if (headers) {
const headerNames = Object.keys(headers)
for (let i = 0; i < headerNames.length; i++) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
},
"homepage": "https://github.com/mcollina/undici#readme",
"devDependencies": {
"abort-controller": "^3.0.0",
"benchmark": "^2.1.4",
"https-pem": "^2.0.0",
"pre-commit": "^1.2.2",
Expand Down
Loading