Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort signal #169

Merged
merged 18 commits into from
May 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 47 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Options:
* `method`
* `body`, it can be a `String`, a `Buffer`, `Uint8Array` or a `stream.Readable`.
* `headers`, an object with header-value pairs.
* `signal`, either an `AbortController` or an `EventEmitter`.
* `idempotent`, whether the requests can be safely retried or not.
If `false` the request won't be sent until all preceeding
requests in the pipeline has completed.
Expand Down Expand Up @@ -129,45 +130,61 @@ client.request({
})
```

Abortion is supported by destroying the request or
response body.
Non-idempotent requests will not be pipelined in order
to avoid indirect failures.

```js
// Abort while sending request.
const body = new stream.Passthrough()
const promise = client.request({
path: '/',
method: 'POST',
body
})
body.destroy()
const { statusCode, headers } = await promise
```
Idempotent requests will be automatically retried if
they fail due to indirect failure from the request
at the head of the pipeline. This does not apply to
idempotent requests with a stream request body.

##### Aborting a request

A request can may be aborted using either an `AbortController` or an `EventEmitter`.
To use `AbortController`, you will need to `npm i abort-controller`.

```js
// Abort while reading response.
const { statusCode, headers, body } = await client.request({
const { AbortController } = require('abort-controller')
const { Client } = require('undici')

const client = new Client'http://localhost:3000')
const abortController = new AbortController()

client.request({
path: '/',
method: 'GET'
method: 'GET',
signal: abortController.signal
}, function (err, data) {
console.log(err) // RequestAbortedError
client.close()
})
body.destroy()

abortController.abort()
```

Promises and async await are supported as well!
Alternatively, any `EventEmitter` that emits an `'abort'` event may be used as an abort controller:

```js
const { statusCode, headers, body } = await client.request({
const EventEmitter = require('events')
const { Client } = require('undici')


const client = new Client'http://localhost:3000')
const ee = new EventEmitter()

client.request({
path: '/',
method: 'GET'
method: 'GET',
signal: ee
}, function (err, data) {
console.log(err) // RequestAbortedError
client.close()
})
```

Non-idempotent requests will not be pipelined in order
to avoid indirect failures.
ee.emit('abort')
```
delvedor marked this conversation as resolved.
Show resolved Hide resolved

Idempotent requests will be automatically retried if
they fail due to indirect failure from the request
at the head of the pipeline. This does not apply to
idempotent requests with a stream request body.
Destroying the request or response body will have the same effect.

<a name='stream'></a>
#### `client.stream(opts, factory(data), callback(err))`
Expand Down Expand Up @@ -254,9 +271,9 @@ The `data` parameter in `handler` is defined as follow:
either fully consume or destroy the body unless there is an error, or no further requests
will be processed.

`handler` should return a `Writable` to which the response will be
written to. Usually it should just return the `body` argument unless
some kind of transformation needs to be performed based on e.g.
`handler` should return a `Writable` to which the response will be
written to. Usually it should just return the `body` argument unless
some kind of transformation needs to be performed based on e.g.
`headers` or `statusCode`.

The `handler` should validate the response and save any
Expand Down
79 changes: 63 additions & 16 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ class Parser extends HTTPParser {
assert(!this.read)
assert(!this.body)

let body = request.callback(null, {
statusCode,
headers: parseHeaders(headers),
opaque: request.opaque
}, resumeSocket)
request.callback = null
request.opaque = null
let body

if (request.callback) {
body = request.callback(null, {
statusCode,
headers: parseHeaders(headers),
opaque: request.opaque
}, resumeSocket)
request.callback = null
request.opaque = null
}

if (body && skipBody) {
body(null, null)
Expand All @@ -82,6 +86,17 @@ class Parser extends HTTPParser {

if (body) {
this.body = body

if (request.signal) {
const onAbort = () => {
body(new RequestAbortedError(), null)
}
if ('addEventListener' in request.signal) {
request.signal.addEventListener('abort', onAbort)
} else {
request.signal.once('abort', onAbort)
}
}
} else {
this.next()
}
Expand Down Expand Up @@ -157,7 +172,9 @@ class Parser extends HTTPParser {
const retry = []
for (const request of client[kQueue].slice(client[kComplete], client[kInflight])) {
const { idempotent, body, callback } = request
if (idempotent && (!body || typeof body.pipe !== 'function')) {
if (!callback) {
// Aborted
} else if (idempotent && (!body || typeof body.pipe !== 'function')) {
retry.push(request)
} else {
process.nextTick(callback, err, null)
Expand Down Expand Up @@ -541,7 +558,7 @@ class Client extends EventEmitter {

try {
this[kQueue].push(new Request(opts, callback))
resume(this)
process.nextTick(resume, this)
} catch (err) {
process.nextTick(callback, err, null)
}
Expand Down Expand Up @@ -668,7 +685,9 @@ function connect (client) {
function resume (client) {
if (client[kDestroyed]) {
for (const { callback } of client[kQueue].splice(client[kInflight])) {
callback(new ClientDestroyedError(), null)
if (callback) {
callback(new ClientDestroyedError(), null)
}
}
return
}
Expand Down Expand Up @@ -714,9 +733,18 @@ function resume (client) {
body,
chunked,
rawHeaders,
idempotent
idempotent,
callback,
signal
} = client[kQueue][client[kInflight]]

if (!callback) {
// Request was aborted.
client[kQueue].splice(client[kInflight], 1)
resume(client)
return
}

if (!idempotent && client.running) {
// Non-idempotent request cannot be retried.
// Ensure that no other requests are inflight and
Expand Down Expand Up @@ -779,19 +807,30 @@ function resume (client) {
const onDrain = () => {
body.resume()
}
const onClose = () => {
const onAbort = () => {
onFinished(new RequestAbortedError())
}
const onSocketClose = () => {
onFinished(new SocketError('other side closed'))
}
const onFinished = (err) => {
if (signal) {
if ('removeEventListener' in signal) {
signal.removeEventListener('abort', onAbort)
} else {
signal.removeListener('abort', onAbort)
}
}

socket
.removeListener('drain', onDrain)
.removeListener('error', onFinished)
.removeListener('close', onClose)
.removeListener('close', onSocketClose)
body
.removeListener('data', onData)
.removeListener('end', onFinished)
.removeListener('error', onFinished)
.removeListener('close', onClose)
.removeListener('close', onAbort)
.on('error', nop)

if (err) {
Expand All @@ -817,16 +856,24 @@ function resume (client) {
}
}

if (signal) {
if ('addEventListener' in signal) {
signal.addEventListener('abort', onAbort)
} else {
signal.on('abort', onAbort)
}
}

body
.on('data', onData)
.on('end', onFinished)
.on('error', onFinished)
.on('close', onClose)
.on('close', onAbort)

socket
.on('drain', onDrain)
.on('error', onFinished)
.on('close', onClose)
.on('close', onSocketClose)
.uncork()

client[kWriting] = true
Expand Down
38 changes: 36 additions & 2 deletions lib/request.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
'use strict'

const { AsyncResource } = require('async_hooks')
const { InvalidArgumentError } = require('./errors')
const {
InvalidArgumentError,
RequestAbortedError
} = require('./errors')

const methods = [
'ACL',
Expand Down Expand Up @@ -54,7 +57,11 @@ class Request extends AsyncResource {
constructor (opts, callback) {
super('UNDICI_REQ')

const { path, method, body, headers, idempotent, opaque } = opts
if (!opts) {
throw new InvalidArgumentError('no options passed')
}

const { path, method, body, headers, idempotent, opaque, signal } = opts

if (!(typeof path === 'string' && path[0] === '/')) {
throw new InvalidArgumentError('path must be a valid path')
Expand All @@ -64,10 +71,16 @@ class Request extends AsyncResource {
throw new InvalidArgumentError('method must be a valid method')
}

if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must implement .on(name, callback)')
}

if (!isValidBody(body)) {
throw new InvalidArgumentError('body must be a string, a Buffer or a Readable stream')
}

this.signal = signal

this.method = method

this.path = path
Expand All @@ -88,6 +101,27 @@ class Request extends AsyncResource {

this.rawHeaders = ''

if (this.signal) {
const onAbort = () => {
const { callback } = this

if (!callback) {
return
}

this.callback = null
this.opaque = null

callback(new RequestAbortedError(), null)
}

if ('addEventListener' in this.signal) {
this.signal.addEventListener('abort', onAbort)
} else {
this.signal.once('abort', onAbort)
}
}

if (headers) {
const headerNames = Object.keys(headers)
for (let i = 0; i < headerNames.length; i++) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
},
"homepage": "https://github.com/mcollina/undici#readme",
"devDependencies": {
"abort-controller": "^3.0.0",
"benchmark": "^2.1.4",
"https-pem": "^2.0.0",
"pre-commit": "^1.2.2",
Expand Down
Loading