Skip to content

Update events #932

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions docs/observability.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ client.on('response', (err, result) => {
The client emits the following events:
[cols=2*]
|===
|`prepare-request`
a|Emitted once the user invokes any API.
[source,js]
----
client.on('prepare-request', (err, result) => {
console.log(err, result)
})
----

|`request`
a|Emitted before sending the actual request to Elasticsearch _(emitted multiple times in case of retries)_.
[source,js]
Expand Down Expand Up @@ -86,8 +95,8 @@ meta: {
context: any;
name: string;
request: {
params: TransportRequestParams;
options: TransportRequestOptions;
params: TransportRequestParams; // serialized by the client
options: TransportRequestOptions; // serialized by the client
id: any;
};
connection: Connection;
Expand All @@ -100,6 +109,29 @@ meta: {
};
----

`result` value in `prepare-request` will be:
[source,ts]
----
params: TransportRequestParams; // as provided by the user
options: TransportRequestOptions; // as provided by the user
meta: {
context: any;
name: string;
request: {
params: null;
options: null;
id: any;
};
connection: null;
attempts: number;
aborted: boolean;
sniff?: {
hosts: any[];
reason: string;
};
};
----

While the `result` value in `resurrect` will be:
[source,ts]
----
Expand Down
3 changes: 3 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { ConnectionOptions as TlsConnectionOptions } from 'tls';
import Transport, {
ApiResponse,
RequestEvent,
PrepareRequestEvent,
TransportRequestParams,
TransportRequestOptions,
nodeFilterFn,
Expand Down Expand Up @@ -621,6 +622,7 @@ declare class Client extends EventEmitter {

declare const events: {
RESPONSE: string;
PREPARE_REQUEST: string;
REQUEST: string;
SNIFF: string;
RESURRECT: string;
Expand All @@ -636,6 +638,7 @@ export {
errors,
ApiResponse,
RequestEvent,
PrepareRequestEvent,
ResurrectEvent,
RequestParams,
ClientOptions,
Expand Down
1 change: 1 addition & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ function getAuth (node) {

const events = {
RESPONSE: 'response',
PREPARE_REQUEST: 'prepare-request',
REQUEST: 'request',
SNIFF: 'sniff',
RESURRECT: 'resurrect'
Expand Down
21 changes: 21 additions & 0 deletions lib/Transport.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,27 @@ export interface RequestEvent<T = any, C = any> {
};
}

export interface PrepareRequestEvent<C = any> {
params: TransportRequestParams;
options: TransportRequestOptions;
meta: {
context: C;
name: string;
request: {
params: null;
options: null;
id: any;
};
connection: null;
attempts: number;
aborted: boolean;
sniff?: {
hosts: any[];
reason: string;
};
};
}

// ApiResponse and RequestEvent are the same thing
// we are doing this for have more clear names
export interface ApiResponse<T = any, C = any> extends RequestEvent<T, C> {}
Expand Down
31 changes: 22 additions & 9 deletions lib/Transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,8 @@ class Transport {
const compression = options.compression || this.compression
var request = { abort: noop }

const makeRequest = () => {
if (meta.aborted === true) return
meta.connection = this.getConnection({ requestId: meta.request.id })
if (meta.connection === null) {
return callback(new NoLivingConnectionsError('There are not living connections'), result)
}

const prepareRequest = () => {
this.emit('prepare-request', null, { params, options, meta })
// TODO: make this assignment FAST
const headers = Object.assign({}, this.headers, options.headers)

Expand All @@ -120,6 +115,7 @@ class Transport {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
this.emit('request', err, result)
return callback(err, result)
}
}
Expand All @@ -143,6 +139,7 @@ class Transport {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
this.emit('request', err, result)
return callback(err, result)
}
} else {
Expand Down Expand Up @@ -170,11 +167,27 @@ class Transport {

meta.request.params = params
meta.request.options = options
this.emit('request', null, result)

// handles request timeout
params.timeout = toMs(options.requestTimeout || this.requestTimeout)
if (options.asStream === true) params.asStream = true
return makeRequest()
}

const makeRequest = () => {
if (meta.aborted === true) {
this.emit('request', null, result)
return
}

meta.connection = this.getConnection({ requestId: meta.request.id })
if (meta.connection === null) {
const err = new NoLivingConnectionsError('There are not living connections')
this.emit('request', err, result)
return callback(err, result)
}

this.emit('request', null, result)
// perform the actual http request
return meta.connection.request(params, onResponse)
}
Expand Down Expand Up @@ -298,7 +311,7 @@ class Transport {
})
}

request = makeRequest()
request = prepareRequest()

return {
abort: () => {
Expand Down
47 changes: 40 additions & 7 deletions test/behavior/observability.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ test('Request id', t => {
})

t.test('Custom generateRequestId', t => {
t.plan(7)
t.plan(9)

const options = { context: { winter: 'is coming' } }

Expand All @@ -38,6 +38,11 @@ test('Request id', t => {
}
})

client.on('prepare-request', (err, { meta }) => {
t.error(err)
t.strictEqual(meta.request.id, 'custom-id')
})

client.on('request', (err, { meta }) => {
t.error(err)
t.strictEqual(meta.request.id, 'custom-id')
Expand All @@ -52,13 +57,18 @@ test('Request id', t => {
})

t.test('Custom request id in method options', t => {
t.plan(5)
t.plan(7)

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})

client.on('prepare-request', (err, { meta }) => {
t.error(err)
t.strictEqual(meta.request.id, 'custom-id')
})

client.on('request', (err, { meta }) => {
t.error(err)
t.strictEqual(meta.request.id, 'custom-id')
Expand Down Expand Up @@ -89,7 +99,7 @@ test('Request id', t => {
})

t.test('sniffOnConnectionFault - should use the request id', t => {
t.plan(5)
t.plan(7)

const client = new Client({
nodes: ['http://localhost:9200', 'http://localhost:9201'],
Expand All @@ -98,6 +108,10 @@ test('Request id', t => {
maxRetries: 0
})

client.on('prepare-request', (e, { meta }) => {
t.strictEqual(meta.request.id, 'custom')
})

client.on('request', (e, { meta }) => {
t.strictEqual(meta.request.id, 'custom')
})
Expand Down Expand Up @@ -151,13 +165,18 @@ test('Request id', t => {

test('Request context', t => {
t.test('no value', t => {
t.plan(5)
t.plan(7)

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})

client.on('prepare-request', (err, { meta }) => {
t.error(err)
t.strictEqual(meta.context, null)
})

client.on('request', (err, { meta }) => {
t.error(err)
t.strictEqual(meta.context, null)
Expand All @@ -172,13 +191,18 @@ test('Request context', t => {
})

t.test('custom value', t => {
t.plan(5)
t.plan(7)

const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})

client.on('prepare-request', (err, { meta }) => {
t.error(err)
t.deepEqual(meta.context, { winter: 'is coming' })
})

client.on('request', (err, { meta }) => {
t.error(err)
t.deepEqual(meta.context, { winter: 'is coming' })
Expand Down Expand Up @@ -206,13 +230,18 @@ test('Client name', t => {
})

t.test('Is present in the event metadata', t => {
t.plan(6)
t.plan(8)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection,
name: 'cluster'
})

client.on('prepare-request', (err, { meta }) => {
t.error(err)
t.strictEqual(meta.name, 'cluster')
})

client.on('request', (err, { meta }) => {
t.error(err)
t.strictEqual(meta.name, 'cluster')
Expand Down Expand Up @@ -246,7 +275,7 @@ test('Client name', t => {
})

t.test('sniffOnConnectionFault', t => {
t.plan(5)
t.plan(7)

const client = new Client({
nodes: ['http://localhost:9200', 'http://localhost:9201'],
Expand All @@ -255,6 +284,10 @@ test('Client name', t => {
maxRetries: 0
})

client.on('prepare-request', (e, { meta }) => {
t.strictEqual(meta.name, 'elasticsearch-js')
})

client.on('request', (e, { meta }) => {
t.strictEqual(meta.name, 'elasticsearch-js')
})
Expand Down
Loading