Skip to content

Commit

Permalink
Stop using aborted event for KibanaRequest.events.aborted$ (elast…
Browse files Browse the repository at this point in the history
…ic#126184)

* Stop using `aborted` event for `KibanaRequest.events.aborted$`

* add another test, just in case

* use a single `fromEvent`

* add replay effect to aborted$

* improve impl

* remove useless bottom-stream replay

* yup, that's simpler
  • Loading branch information
pgayvallet authored and lucasfcosta committed Mar 2, 2022
1 parent f8de190 commit a1a8c2d
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 7 deletions.
71 changes: 71 additions & 0 deletions src/core/server/http/integration_tests/request.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,42 @@ describe('KibanaRequest', () => {
expect(nextSpy).toHaveBeenCalledTimes(1);
});

it('emits once and completes when request aborted after the payload has been consumed', async () => {
expect.assertions(1);
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');

const nextSpy = jest.fn();

const done = new Promise<void>((resolve) => {
router.post(
{ path: '/', validate: { body: schema.any() } },
async (context, request, res) => {
request.events.aborted$.subscribe({
next: nextSpy,
complete: resolve,
});

// prevents the server to respond
await delay(30000);
return res.ok({ body: 'ok' });
}
);
});

await server.start();

const incomingRequest = supertest(innerServer.listener)
.post('/')
.send({ hello: 'dolly' })
// end required to send request
.end();

setTimeout(() => incomingRequest.abort(), 50);
await done;
expect(nextSpy).toHaveBeenCalledTimes(1);
});

it('completes & does not emit when request handled', async () => {
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');
Expand Down Expand Up @@ -336,6 +372,41 @@ describe('KibanaRequest', () => {
await done;
expect(nextSpy).toHaveBeenCalledTimes(1);
});

it('emits once and completes when response is aborted after the payload has been consumed', async () => {
expect.assertions(2);
const { server: innerServer, createRouter } = await server.setup(setupDeps);
const router = createRouter('/');

const nextSpy = jest.fn();

const done = new Promise<void>((resolve) => {
router.post(
{ path: '/', validate: { body: schema.any() } },
async (context, req, res) => {
req.events.completed$.subscribe({
next: nextSpy,
complete: resolve,
});

expect(nextSpy).not.toHaveBeenCalled();
await delay(30000);
return res.ok({ body: 'ok' });
}
);
});

await server.start();

const incomingRequest = supertest(innerServer.listener)
.post('/')
.send({ foo: 'bar' })
// end required to send request
.end();
setTimeout(() => incomingRequest.abort(), 50);
await done;
expect(nextSpy).toHaveBeenCalledTimes(1);
});
});
});

Expand Down
17 changes: 10 additions & 7 deletions src/core/server/http/router/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import { URL } from 'url';
import uuid from 'uuid';
import { Request, RouteOptionsApp, RequestApplicationState, RouteOptions } from '@hapi/hapi';
import { Observable, fromEvent, merge } from 'rxjs';
import { shareReplay, first, takeUntil } from 'rxjs/operators';
import { Observable, fromEvent } from 'rxjs';
import { shareReplay, first, filter } from 'rxjs/operators';
import { RecursiveReadonly } from '@kbn/utility-types';
import { deepFreeze } from '@kbn/std';

Expand Down Expand Up @@ -127,6 +127,7 @@ export class KibanaRequest<
const body = routeValidator.getBody(req.payload, 'request body');
return { query, params, body };
}

/**
* A identifier to identify this request.
*
Expand Down Expand Up @@ -215,11 +216,9 @@ export class KibanaRequest<
}

private getEvents(request: Request): KibanaRequestEvents {
// the response is completed, or its underlying connection was terminated prematurely
const finish$ = fromEvent(request.raw.res, 'close').pipe(shareReplay(1), first());

const aborted$ = fromEvent<void>(request.raw.req, 'aborted').pipe(first(), takeUntil(finish$));
const completed$ = merge<void, void>(finish$, aborted$).pipe(shareReplay(1), first());
const completed$ = fromEvent<void>(request.raw.res, 'close').pipe(shareReplay(1), first());
// the response's underlying connection was terminated prematurely
const aborted$ = completed$.pipe(filter(() => !isCompleted(request)));

return {
aborted$,
Expand Down Expand Up @@ -331,3 +330,7 @@ function isRequest(request: any): request is Request {
export function isRealRequest(request: unknown): request is KibanaRequest | Request {
return isKibanaRequest(request) || isRequest(request);
}

function isCompleted(request: Request) {
return request.raw.res.writableFinished;
}

0 comments on commit a1a8c2d

Please sign in to comment.