diff --git a/src/core/server/http/integration_tests/request.test.ts b/src/core/server/http/integration_tests/request.test.ts index a2560c2c39fad..83dffdc73468e 100644 --- a/src/core/server/http/integration_tests/request.test.ts +++ b/src/core/server/http/integration_tests/request.test.ts @@ -198,6 +198,42 @@ describe('KibanaRequest', () => { expect(nextSpy).toHaveBeenCalledTimes(1); }); + it('emits once and completes when request aborted after the payload has been consumed', async () => { + expect.assertions(1); + const { server: innerServer, createRouter } = await server.setup(setupDeps); + const router = createRouter('/'); + + const nextSpy = jest.fn(); + + const done = new Promise((resolve) => { + router.post( + { path: '/', validate: { body: schema.any() } }, + async (context, request, res) => { + request.events.aborted$.subscribe({ + next: nextSpy, + complete: resolve, + }); + + // prevents the server to respond + await delay(30000); + return res.ok({ body: 'ok' }); + } + ); + }); + + await server.start(); + + const incomingRequest = supertest(innerServer.listener) + .post('/') + .send({ hello: 'dolly' }) + // end required to send request + .end(); + + setTimeout(() => incomingRequest.abort(), 50); + await done; + expect(nextSpy).toHaveBeenCalledTimes(1); + }); + it('completes & does not emit when request handled', async () => { const { server: innerServer, createRouter } = await server.setup(setupDeps); const router = createRouter('/'); @@ -336,6 +372,41 @@ describe('KibanaRequest', () => { await done; expect(nextSpy).toHaveBeenCalledTimes(1); }); + + it('emits once and completes when response is aborted after the payload has been consumed', async () => { + expect.assertions(2); + const { server: innerServer, createRouter } = await server.setup(setupDeps); + const router = createRouter('/'); + + const nextSpy = jest.fn(); + + const done = new Promise((resolve) => { + router.post( + { path: '/', validate: { body: schema.any() } }, + async (context, req, res) => { + req.events.completed$.subscribe({ + next: nextSpy, + complete: resolve, + }); + + expect(nextSpy).not.toHaveBeenCalled(); + await delay(30000); + return res.ok({ body: 'ok' }); + } + ); + }); + + await server.start(); + + const incomingRequest = supertest(innerServer.listener) + .post('/') + .send({ foo: 'bar' }) + // end required to send request + .end(); + setTimeout(() => incomingRequest.abort(), 50); + await done; + expect(nextSpy).toHaveBeenCalledTimes(1); + }); }); }); diff --git a/src/core/server/http/router/request.ts b/src/core/server/http/router/request.ts index e53a30124a420..5a914fb800da9 100644 --- a/src/core/server/http/router/request.ts +++ b/src/core/server/http/router/request.ts @@ -9,8 +9,8 @@ import { URL } from 'url'; import uuid from 'uuid'; import { Request, RouteOptionsApp, RequestApplicationState, RouteOptions } from '@hapi/hapi'; -import { Observable, fromEvent, merge } from 'rxjs'; -import { shareReplay, first, takeUntil } from 'rxjs/operators'; +import { Observable, fromEvent } from 'rxjs'; +import { shareReplay, first, filter } from 'rxjs/operators'; import { RecursiveReadonly } from '@kbn/utility-types'; import { deepFreeze } from '@kbn/std'; @@ -127,6 +127,7 @@ export class KibanaRequest< const body = routeValidator.getBody(req.payload, 'request body'); return { query, params, body }; } + /** * A identifier to identify this request. * @@ -215,11 +216,9 @@ export class KibanaRequest< } private getEvents(request: Request): KibanaRequestEvents { - // the response is completed, or its underlying connection was terminated prematurely - const finish$ = fromEvent(request.raw.res, 'close').pipe(shareReplay(1), first()); - - const aborted$ = fromEvent(request.raw.req, 'aborted').pipe(first(), takeUntil(finish$)); - const completed$ = merge(finish$, aborted$).pipe(shareReplay(1), first()); + const completed$ = fromEvent(request.raw.res, 'close').pipe(shareReplay(1), first()); + // the response's underlying connection was terminated prematurely + const aborted$ = completed$.pipe(filter(() => !isCompleted(request))); return { aborted$, @@ -331,3 +330,7 @@ function isRequest(request: any): request is Request { export function isRealRequest(request: unknown): request is KibanaRequest | Request { return isKibanaRequest(request) || isRequest(request); } + +function isCompleted(request: Request) { + return request.raw.res.writableFinished; +}