Skip to content

Commit

Permalink
Abort signal (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
delvedor authored May 20, 2020
1 parent 7e20074 commit a9b59eb
Show file tree
Hide file tree
Showing 6 changed files with 552 additions and 48 deletions.
77 changes: 47 additions & 30 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Options:
* `method`
* `body`, it can be a `String`, a `Buffer`, `Uint8Array` or a `stream.Readable`.
* `headers`, an object with header-value pairs.
* `signal`, either an `AbortController` or an `EventEmitter`.
* `idempotent`, whether the requests can be safely retried or not.
If `false` the request won't be sent until all preceeding
requests in the pipeline has completed.
Expand Down Expand Up @@ -129,45 +130,61 @@ client.request({
})
```

Abortion is supported by destroying the request or
response body.
Non-idempotent requests will not be pipelined in order
to avoid indirect failures.

```js
// Abort while sending request.
const body = new stream.Passthrough()
const promise = client.request({
path: '/',
method: 'POST',
body
})
body.destroy()
const { statusCode, headers } = await promise
```
Idempotent requests will be automatically retried if
they fail due to indirect failure from the request
at the head of the pipeline. This does not apply to
idempotent requests with a stream request body.

##### Aborting a request

A request can may be aborted using either an `AbortController` or an `EventEmitter`.
To use `AbortController`, you will need to `npm i abort-controller`.

```js
// Abort while reading response.
const { statusCode, headers, body } = await client.request({
const { AbortController } = require('abort-controller')
const { Client } = require('undici')

const client = new Client'http://localhost:3000')
const abortController = new AbortController()

client.request({
path: '/',
method: 'GET'
method: 'GET',
signal: abortController.signal
}, function (err, data) {
console.log(err) // RequestAbortedError
client.close()
})
body.destroy()

abortController.abort()
```

Promises and async await are supported as well!
Alternatively, any `EventEmitter` that emits an `'abort'` event may be used as an abort controller:

```js
const { statusCode, headers, body } = await client.request({
const EventEmitter = require('events')
const { Client } = require('undici')


const client = new Client'http://localhost:3000')
const ee = new EventEmitter()

client.request({
path: '/',
method: 'GET'
method: 'GET',
signal: ee
}, function (err, data) {
console.log(err) // RequestAbortedError
client.close()
})
```

Non-idempotent requests will not be pipelined in order
to avoid indirect failures.
ee.emit('abort')
```

Idempotent requests will be automatically retried if
they fail due to indirect failure from the request
at the head of the pipeline. This does not apply to
idempotent requests with a stream request body.
Destroying the request or response body will have the same effect.

<a name='stream'></a>
#### `client.stream(opts, factory(data), callback(err))`
Expand Down Expand Up @@ -254,9 +271,9 @@ The `data` parameter in `handler` is defined as follow:
either fully consume or destroy the body unless there is an error, or no further requests
will be processed.
`handler` should return a `Writable` to which the response will be
written to. Usually it should just return the `body` argument unless
some kind of transformation needs to be performed based on e.g.
`handler` should return a `Writable` to which the response will be
written to. Usually it should just return the `body` argument unless
some kind of transformation needs to be performed based on e.g.
`headers` or `statusCode`.
The `handler` should validate the response and save any
Expand Down
79 changes: 63 additions & 16 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,17 @@ class Parser extends HTTPParser {
assert(!this.read)
assert(!this.body)

let body = request.callback(null, {
statusCode,
headers: parseHeaders(headers),
opaque: request.opaque
}, resumeSocket)
request.callback = null
request.opaque = null
let body

if (request.callback) {
body = request.callback(null, {
statusCode,
headers: parseHeaders(headers),
opaque: request.opaque
}, resumeSocket)
request.callback = null
request.opaque = null
}

if (body && skipBody) {
body(null, null)
Expand All @@ -82,6 +86,17 @@ class Parser extends HTTPParser {

if (body) {
this.body = body

if (request.signal) {
const onAbort = () => {
body(new RequestAbortedError(), null)
}
if ('addEventListener' in request.signal) {
request.signal.addEventListener('abort', onAbort)
} else {
request.signal.once('abort', onAbort)
}
}
} else {
this.next()
}
Expand Down Expand Up @@ -157,7 +172,9 @@ class Parser extends HTTPParser {
const retry = []
for (const request of client[kQueue].slice(client[kComplete], client[kInflight])) {
const { idempotent, body, callback } = request
if (idempotent && (!body || typeof body.pipe !== 'function')) {
if (!callback) {
// Aborted
} else if (idempotent && (!body || typeof body.pipe !== 'function')) {
retry.push(request)
} else {
process.nextTick(callback, err, null)
Expand Down Expand Up @@ -541,7 +558,7 @@ class Client extends EventEmitter {

try {
this[kQueue].push(new Request(opts, callback))
resume(this)
process.nextTick(resume, this)
} catch (err) {
process.nextTick(callback, err, null)
}
Expand Down Expand Up @@ -668,7 +685,9 @@ function connect (client) {
function resume (client) {
if (client[kDestroyed]) {
for (const { callback } of client[kQueue].splice(client[kInflight])) {
callback(new ClientDestroyedError(), null)
if (callback) {
callback(new ClientDestroyedError(), null)
}
}
return
}
Expand Down Expand Up @@ -714,9 +733,18 @@ function resume (client) {
body,
chunked,
rawHeaders,
idempotent
idempotent,
callback,
signal
} = client[kQueue][client[kInflight]]

if (!callback) {
// Request was aborted.
client[kQueue].splice(client[kInflight], 1)
resume(client)
return
}

if (!idempotent && client.running) {
// Non-idempotent request cannot be retried.
// Ensure that no other requests are inflight and
Expand Down Expand Up @@ -779,19 +807,30 @@ function resume (client) {
const onDrain = () => {
body.resume()
}
const onClose = () => {
const onAbort = () => {
onFinished(new RequestAbortedError())
}
const onSocketClose = () => {
onFinished(new SocketError('other side closed'))
}
const onFinished = (err) => {
if (signal) {
if ('removeEventListener' in signal) {
signal.removeEventListener('abort', onAbort)
} else {
signal.removeListener('abort', onAbort)
}
}

socket
.removeListener('drain', onDrain)
.removeListener('error', onFinished)
.removeListener('close', onClose)
.removeListener('close', onSocketClose)
body
.removeListener('data', onData)
.removeListener('end', onFinished)
.removeListener('error', onFinished)
.removeListener('close', onClose)
.removeListener('close', onAbort)
.on('error', nop)

if (err) {
Expand All @@ -817,16 +856,24 @@ function resume (client) {
}
}

if (signal) {
if ('addEventListener' in signal) {
signal.addEventListener('abort', onAbort)
} else {
signal.on('abort', onAbort)
}
}

body
.on('data', onData)
.on('end', onFinished)
.on('error', onFinished)
.on('close', onClose)
.on('close', onAbort)

socket
.on('drain', onDrain)
.on('error', onFinished)
.on('close', onClose)
.on('close', onSocketClose)
.uncork()

client[kWriting] = true
Expand Down
38 changes: 36 additions & 2 deletions lib/request.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
'use strict'

const { AsyncResource } = require('async_hooks')
const { InvalidArgumentError } = require('./errors')
const {
InvalidArgumentError,
RequestAbortedError
} = require('./errors')

const methods = [
'ACL',
Expand Down Expand Up @@ -54,7 +57,11 @@ class Request extends AsyncResource {
constructor (opts, callback) {
super('UNDICI_REQ')

const { path, method, body, headers, idempotent, opaque } = opts
if (!opts) {
throw new InvalidArgumentError('no options passed')
}

const { path, method, body, headers, idempotent, opaque, signal } = opts

if (!(typeof path === 'string' && path[0] === '/')) {
throw new InvalidArgumentError('path must be a valid path')
Expand All @@ -64,10 +71,16 @@ class Request extends AsyncResource {
throw new InvalidArgumentError('method must be a valid method')
}

if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
throw new InvalidArgumentError('signal must implement .on(name, callback)')
}

if (!isValidBody(body)) {
throw new InvalidArgumentError('body must be a string, a Buffer or a Readable stream')
}

this.signal = signal

this.method = method

this.path = path
Expand All @@ -88,6 +101,27 @@ class Request extends AsyncResource {

this.rawHeaders = ''

if (this.signal) {
const onAbort = () => {
const { callback } = this

if (!callback) {
return
}

this.callback = null
this.opaque = null

callback(new RequestAbortedError(), null)
}

if ('addEventListener' in this.signal) {
this.signal.addEventListener('abort', onAbort)
} else {
this.signal.once('abort', onAbort)
}
}

if (headers) {
const headerNames = Object.keys(headers)
for (let i = 0; i < headerNames.length; i++) {
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
},
"homepage": "https://github.com/mcollina/undici#readme",
"devDependencies": {
"abort-controller": "^3.0.0",
"benchmark": "^2.1.4",
"https-pem": "^2.0.0",
"pre-commit": "^1.2.2",
Expand Down
Loading

0 comments on commit a9b59eb

Please sign in to comment.