From 9befdbd2f0576eabd1108e67a7040386757c56d9 Mon Sep 17 00:00:00 2001 From: Vadim Dalecky Date: Sat, 14 Dec 2019 01:39:04 -0800 Subject: [PATCH] bfetch (#52888) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: 🎸 add bfetch plugin stub * feat: 🎸 add sample routes * feat: 🎸 implement streaming response * feat: 🎸 add Defer class * refactor: 💡 move Defer inot /common folder * feat: 🎸 add fromStreamingXhr() method * feat: 🎸 add split method * feat: 🎸 add fetchStreaming() function * test: 💍 fix test after refactor * test: 💍 add tests for fetStreaming() method * refactor: 💡 move removeLeadingSlash() to /common folder * feat: 🎸 expor stateful fetchStreaming() throuh plugin contract * chore: 🤖 clean up bfetch * chore: 🤖 prepare to replace ajax_stream by bfetch * Change ajax_stream to use new-line delimited JSON * refactor: 💡 move batched_fetch to use bfetch service * refactor: 💡 make use of defer() utility from kibana_utils * chore: 🤖 remove ajax_stream library * fix: 🐛 fix tests and inject fetchStreaming() method as dep * refactor: 💡 make split() operator more readable * refactor: 💡 improvee PR according to feedback * docs: ✏️ add fetchStreaming() reference * refactor: 💡 use NP logger, rename to createNDJSONStream() * chore: 🤖 adress Luke's review comments * chore: 🤖 add missing type --- .../canvas/ajax_stream/ajax_stream.test.ts | 201 -------------- .../public/canvas/ajax_stream/ajax_stream.ts | 152 ----------- .../public/canvas/batched_fetch.test.ts | 38 ++- .../public/canvas/batched_fetch.ts | 79 +++--- .../load_legacy_server_function_wrappers.ts | 8 +- .../ui/public/new_platform/new_platform.ts | 3 + src/plugins/bfetch/README.md | 9 + src/plugins/bfetch/common/index.ts | 21 ++ src/plugins/bfetch/common/streaming/index.ts | 20 ++ src/plugins/bfetch/common/streaming/types.ts | 24 ++ src/plugins/bfetch/common/types.ts | 20 ++ src/plugins/bfetch/common/util/index.ts | 20 ++ .../common/util/remove_leading_slash.ts | 20 ++ src/plugins/bfetch/docs/browser/reference.md | 15 ++ src/plugins/bfetch/kibana.json | 6 + .../bfetch/public}/index.ts | 15 +- src/plugins/bfetch/public/mocks.ts | 63 +++++ src/plugins/bfetch/public/plugin.ts | 81 ++++++ .../public/streaming/fetch_streaming.test.ts | 248 ++++++++++++++++++ .../public/streaming/fetch_streaming.ts | 65 +++++ .../streaming/from_streaming_xhr.test.ts | 217 +++++++++++++++ .../public/streaming/from_streaming_xhr.ts | 62 +++++ src/plugins/bfetch/public/streaming/index.ts | 22 ++ .../bfetch/public/streaming/split.test.ts | 71 +++++ src/plugins/bfetch/public/streaming/split.ts | 59 +++++ src/plugins/bfetch/public/test_helpers/xhr.ts | 73 ++++++ src/plugins/bfetch/public/types.ts | 18 ++ src/plugins/bfetch/server/index.ts | 27 ++ src/plugins/bfetch/server/mocks.ts | 61 +++++ src/plugins/bfetch/server/plugin.ts | 92 +++++++ .../server/streaming/create_ndjson_stream.ts | 52 ++++ src/plugins/bfetch/server/streaming/index.ts | 20 ++ src/plugins/kibana_utils/common/defer.test.ts | 69 +++++ src/plugins/kibana_utils/common/defer.ts | 41 +++ src/plugins/kibana_utils/common/index.ts | 20 ++ src/plugins/kibana_utils/public/index.ts | 2 +- 36 files changed, 1590 insertions(+), 424 deletions(-) delete mode 100644 src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.test.ts delete mode 100644 src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.ts create mode 100644 src/plugins/bfetch/README.md create mode 100644 src/plugins/bfetch/common/index.ts create mode 100644 src/plugins/bfetch/common/streaming/index.ts create mode 100644 src/plugins/bfetch/common/streaming/types.ts create mode 100644 src/plugins/bfetch/common/types.ts create mode 100644 src/plugins/bfetch/common/util/index.ts create mode 100644 src/plugins/bfetch/common/util/remove_leading_slash.ts create mode 100644 src/plugins/bfetch/docs/browser/reference.md create mode 100644 src/plugins/bfetch/kibana.json rename src/{legacy/core_plugins/interpreter/public/canvas/ajax_stream => plugins/bfetch/public}/index.ts (70%) create mode 100644 src/plugins/bfetch/public/mocks.ts create mode 100644 src/plugins/bfetch/public/plugin.ts create mode 100644 src/plugins/bfetch/public/streaming/fetch_streaming.test.ts create mode 100644 src/plugins/bfetch/public/streaming/fetch_streaming.ts create mode 100644 src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts create mode 100644 src/plugins/bfetch/public/streaming/from_streaming_xhr.ts create mode 100644 src/plugins/bfetch/public/streaming/index.ts create mode 100644 src/plugins/bfetch/public/streaming/split.test.ts create mode 100644 src/plugins/bfetch/public/streaming/split.ts create mode 100644 src/plugins/bfetch/public/test_helpers/xhr.ts create mode 100644 src/plugins/bfetch/public/types.ts create mode 100644 src/plugins/bfetch/server/index.ts create mode 100644 src/plugins/bfetch/server/mocks.ts create mode 100644 src/plugins/bfetch/server/plugin.ts create mode 100644 src/plugins/bfetch/server/streaming/create_ndjson_stream.ts create mode 100644 src/plugins/bfetch/server/streaming/index.ts create mode 100644 src/plugins/kibana_utils/common/defer.test.ts create mode 100644 src/plugins/kibana_utils/common/defer.ts create mode 100644 src/plugins/kibana_utils/common/index.ts diff --git a/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.test.ts b/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.test.ts deleted file mode 100644 index 4463758e30bd6..0000000000000 --- a/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.test.ts +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import { ajaxStream, XMLHttpRequestLike } from './ajax_stream'; - -// eslint-disable-next-line no-empty -function noop() {} - -describe('ajaxStream', () => { - it('pulls items from the stream and calls the handler', async () => { - const handler = jest.fn(() => ({})); - const { req, sendText, done } = mockRequest(); - const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n']; - - const promise = ajaxStream('', {}, req, { - url: '/test/endpoint', - onResponse: handler, - }); - - sendText(messages[0]); - sendText(messages[1]); - done(); - - await promise; - expect(handler).toHaveBeenCalledTimes(2); - expect(handler).toHaveBeenCalledWith({ hello: 'world' }); - expect(handler).toHaveBeenCalledWith({ tis: 'fate' }); - }); - - it('handles newlines in values', async () => { - const handler = jest.fn(() => ({})); - const { req, sendText, done } = mockRequest(); - const messages = [ - JSON.stringify({ hello: 'wo\nrld' }), - '\n', - JSON.stringify({ tis: 'fa\nte' }), - '\n', - ]; - - const promise = ajaxStream('', {}, req, { - url: '/test/endpoint', - onResponse: handler, - }); - - messages.forEach(sendText); - done(); - - await promise; - expect(handler).toHaveBeenCalledTimes(2); - expect(handler).toHaveBeenCalledWith({ hello: 'wo\nrld' }); - expect(handler).toHaveBeenCalledWith({ tis: 'fa\nte' }); - }); - - it('handles partial messages', async () => { - const handler = jest.fn(() => ({})); - const { req, sendText, done } = mockRequest(); - const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join(''); - - const promise = ajaxStream('', {}, req, { - url: '/test/endpoint', - onResponse: handler, - }); - - for (const s of messages) { - sendText(s); - } - done(); - - await promise; - expect(handler).toHaveBeenCalledTimes(2); - expect(handler).toHaveBeenCalledWith({ hello: 'world' }); - expect(handler).toHaveBeenCalledWith({ tis: 'fate' }); - }); - - it('sends the request', async () => { - const handler = jest.fn(() => ({})); - const { req, done } = mockRequest(); - - const promise = ajaxStream('mehBasePath', { a: 'b' }, req, { - url: '/test/endpoint', - onResponse: handler, - body: 'whatup', - headers: { foo: 'bar' }, - }); - - done(); - - await promise; - expect(req.open).toHaveBeenCalledWith('POST', 'mehBasePath/test/endpoint'); - expect(req.setRequestHeader).toHaveBeenCalledWith('foo', 'bar'); - expect(req.setRequestHeader).toHaveBeenCalledWith('a', 'b'); - expect(req.send).toHaveBeenCalledWith('whatup'); - }); - - it('rejects if network failure', async () => { - const handler = jest.fn(() => ({})); - const { req, done } = mockRequest(); - - const promise = ajaxStream('', {}, req, { - url: '/test/endpoint', - onResponse: handler, - body: 'whatup', - }); - - done(0); - expect(await promise.then(() => true).catch(() => false)).toBeFalsy(); - }); - - it('rejects if http status error', async () => { - const handler = jest.fn(() => ({})); - const { req, done } = mockRequest(); - - const promise = ajaxStream('', {}, req, { - url: '/test/endpoint', - onResponse: handler, - body: 'whatup', - }); - - done(400); - expect(await promise.then(() => true).catch(() => false)).toBeFalsy(); - }); - - it('rejects if the payload contains invalid JSON', async () => { - const handler = jest.fn(() => ({})); - const { req, sendText, done } = mockRequest(); - const messages = ['{ waut? }\n'].join(''); - - const promise = ajaxStream('', {}, req, { - url: '/test/endpoint', - onResponse: handler, - }); - - sendText(messages); - done(); - - expect(await promise.then(() => true).catch(() => false)).toBeFalsy(); - }); - - it('rejects if the handler throws', async () => { - const handler = jest.fn(() => { - throw new Error('DOH!'); - }); - const { req, sendText, done } = mockRequest(); - const messages = ['{ "hello": "world" }\n', '{ "tis": "fate" }\n'].join(''); - - const promise = ajaxStream('', {}, req, { - url: '/test/endpoint', - onResponse: handler, - }); - - sendText(messages); - done(); - - expect(await promise.then(() => true).catch(({ message }) => message)).toMatch(/doh!/i); - }); -}); - -function mockRequest() { - const req: XMLHttpRequestLike = { - onprogress: noop, - onreadystatechange: noop, - open: jest.fn(), - readyState: 0, - responseText: '', - send: jest.fn(), - setRequestHeader: jest.fn(), - abort: jest.fn(), - status: 0, - withCredentials: false, - }; - - return { - req, - sendText(text: string) { - req.responseText += text; - req.onreadystatechange(); - req.onprogress(); - }, - done(status = 200) { - req.status = status; - req.readyState = 4; - req.onreadystatechange(); - }, - }; -} diff --git a/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.ts b/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.ts deleted file mode 100644 index 867581081f82f..0000000000000 --- a/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/ajax_stream.ts +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to Elasticsearch B.V. under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch B.V. licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import { once } from 'lodash'; - -/** - * This file contains the client-side logic for processing a streaming AJAX response. - * This allows things like request batching to process individual batch item results - * as soon as the server sends them, instead of waiting for the entire response before - * client-side processing can begin. - * - * The server sends responses in this format: {length}:{json}, for example: - * - * 18:{"hello":"world"}\n16:{"hello":"you"}\n - */ - -// T is the response payload (the JSON), and we don't really -// care what it's type / shape is. -export type BatchResponseHandler = (result: T) => void; - -export interface BatchOpts { - url: string; - onResponse: BatchResponseHandler; - method?: string; - body?: string; - headers?: { [k: string]: string }; -} - -// The subset of XMLHttpRequest that we use -export interface XMLHttpRequestLike { - abort: () => void; - onreadystatechange: any; - onprogress: any; - open: (method: string, url: string) => void; - readyState: number; - responseText: string; - send: (body?: string) => void; - setRequestHeader: (header: string, value: string) => void; - status: number; - withCredentials: boolean; -} - -// Create a function which, when successively passed streaming response text, -// calls a handler callback with each response in the batch. -function processBatchResponseStream(handler: BatchResponseHandler) { - let index = 0; - - return (text: string) => { - // While there's text to process... - while (index < text.length) { - // We're using new line-delimited JSON. - const delim = '\n'; - const delimIndex = text.indexOf(delim, index); - - // We've got an incomplete batch length - if (delimIndex < 0) { - return; - } - - const payload = JSON.parse(text.slice(index, delimIndex)); - handler(payload); - - index = delimIndex + 1; - } - }; -} - -/** - * Sends an AJAX request to the server, and processes the result as a - * streaming HTTP/1 response. - * - * @param basePath - The Kibana basepath - * @param defaultHeaders - The default HTTP headers to be sent with each request - * @param req - The XMLHttpRequest - * @param opts - The request options - * @returns A promise which resolves when the entire batch response has been processed. - */ -export function ajaxStream( - basePath: string, - defaultHeaders: { [k: string]: string }, - req: XMLHttpRequestLike, - opts: BatchOpts -) { - return new Promise((resolve, reject) => { - const { url, method, headers } = opts; - - // There are several paths by which the promise may resolve or reject. We wrap this - // in "once" as a safeguard against cases where we attempt more than one call. (e.g. - // a batch handler fails, so we reject the promise, but then new data comes in for - // a subsequent batch item) - const complete = once((err: Error | undefined = undefined) => - err ? reject(err) : resolve(req) - ); - - // Begin the request - req.open(method || 'POST', `${basePath}/${url.replace(/^\//, '')}`); - req.withCredentials = true; - - // Set the HTTP headers - Object.entries(Object.assign({}, defaultHeaders, headers)).forEach(([k, v]) => - req.setRequestHeader(k, v) - ); - - const batchHandler = processBatchResponseStream(opts.onResponse); - const processBatch = () => { - try { - batchHandler(req.responseText); - } catch (err) { - req.abort(); - complete(err); - } - }; - - req.onprogress = processBatch; - - req.onreadystatechange = () => { - // Older browsers don't support onprogress, so we need - // to call this here, too. It's safe to call this multiple - // times even for the same progress event. - processBatch(); - - // 4 is the magic number that means the request is done - if (req.readyState === 4) { - // 0 indicates a network failure. 400+ messages are considered server errors - if (req.status === 0 || req.status >= 400) { - complete(new Error(`Batch request failed with status ${req.status}`)); - } else { - complete(); - } - } - }; - - // Send the payload to the server - req.send(opts.body); - }); -} diff --git a/src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.test.ts b/src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.test.ts index f5b1960b608a3..3da15cf54cda0 100644 --- a/src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.test.ts +++ b/src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.test.ts @@ -18,23 +18,35 @@ */ import { batchedFetch, Request } from './batched_fetch'; +import { defer } from '../../../../../plugins/kibana_utils/public'; +import { Subject } from 'rxjs'; const serialize = (o: any) => JSON.stringify(o); -const ajaxStream = jest.fn(async ({ body, onResponse }) => { +const fetchStreaming = jest.fn(({ body }) => { const { functions } = JSON.parse(body); - functions.map(({ id, functionName, context, args }: Request) => - onResponse({ - id, - statusCode: context, - result: Number(context) >= 400 ? { err: {} } : `${functionName}${context}${args}`, - }) - ); -}); + const { promise, resolve } = defer(); + const stream = new Subject(); + + setTimeout(() => { + functions.map(({ id, functionName, context, args }: Request) => + stream.next( + JSON.stringify({ + id, + statusCode: context, + result: Number(context) >= 400 ? { err: {} } : `${functionName}${context}${args}`, + }) + '\n' + ) + ); + resolve(); + }, 1); + + return { promise, stream }; +}) as any; describe('batchedFetch', () => { it('resolves the correct promise', async () => { - const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 }); + const ajax = batchedFetch({ fetchStreaming, serialize, ms: 1 }); const result = await Promise.all([ ajax({ functionName: 'a', context: 1, args: 'aaa' }), @@ -45,7 +57,7 @@ describe('batchedFetch', () => { }); it('dedupes duplicate calls', async () => { - const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 }); + const ajax = batchedFetch({ fetchStreaming, serialize, ms: 1 }); const result = await Promise.all([ ajax({ functionName: 'a', context: 1, args: 'aaa' }), @@ -55,11 +67,11 @@ describe('batchedFetch', () => { ]); expect(result).toEqual(['a1aaa', 'b2bbb', 'a1aaa', 'a1aaa']); - expect(ajaxStream).toHaveBeenCalledTimes(2); + expect(fetchStreaming).toHaveBeenCalledTimes(2); }); it('rejects responses whose statusCode is >= 300', async () => { - const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 }); + const ajax = batchedFetch({ fetchStreaming, serialize, ms: 1 }); const result = await Promise.all([ ajax({ functionName: 'a', context: 500, args: 'aaa' }).catch(() => 'fail'), diff --git a/src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.ts b/src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.ts index cba70fe1da8fc..717a87fc90f9f 100644 --- a/src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.ts +++ b/src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.ts @@ -18,13 +18,15 @@ */ import _ from 'lodash'; +import { filter, map } from 'rxjs/operators'; +// eslint-disable-next-line +import { split } from '../../../../../plugins/bfetch/public/streaming'; +import { BfetchPublicApi } from '../../../../../plugins/bfetch/public'; +import { defer } from '../../../../../plugins/kibana_utils/public'; import { FUNCTIONS_URL } from './consts'; -// TODO: Import this type from kibana_util. -type AjaxStream = any; - export interface Options { - ajaxStream: any; + fetchStreaming: BfetchPublicApi['fetchStreaming']; serialize: any; ms?: number; } @@ -47,7 +49,7 @@ export interface Request { * Create a function which executes an Expression function on the * server as part of a larger batch of executions. */ -export function batchedFetch({ ajaxStream, serialize, ms = 10 }: Options) { +export function batchedFetch({ fetchStreaming, serialize, ms = 10 }: Options) { // Uniquely identifies each function call in a batch operation // so that the appropriate promise can be resolved / rejected later. let id = 0; @@ -66,7 +68,7 @@ export function batchedFetch({ ajaxStream, serialize, ms = 10 }: Options) { }; const runBatch = () => { - processBatch(ajaxStream, batch); + processBatch(fetchStreaming, batch); reset(); }; @@ -92,7 +94,7 @@ export function batchedFetch({ ajaxStream, serialize, ms = 10 }: Options) { } // If not, create a new promise, id, and add it to the batched collection. - const future = createFuture(); + const future = defer(); const newId = nextId(); request.id = newId; @@ -106,48 +108,39 @@ export function batchedFetch({ ajaxStream, serialize, ms = 10 }: Options) { } /** - * An externally resolvable / rejectable promise, used to make sure - * individual batch responses go to the correct caller. + * Runs the specified batch of functions on the server, then resolves + * the related promises. */ -function createFuture() { - let resolve; - let reject; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; +async function processBatch(fetchStreaming: BfetchPublicApi['fetchStreaming'], batch: Batch) { + const { stream, promise } = fetchStreaming({ + url: FUNCTIONS_URL, + body: JSON.stringify({ + functions: Object.values(batch).map(({ request }) => request), + }), }); - return { - resolve, - reject, - promise, - }; -} + stream + .pipe( + split('\n'), + filter(Boolean), + map((json: string) => JSON.parse(json)) + ) + .subscribe((message: any) => { + const { id, statusCode, result } = message; + const { future } = batch[id]; + + if (statusCode >= 400) { + future.reject(result); + } else { + future.resolve(result); + } + }); -/** - * Runs the specified batch of functions on the server, then resolves - * the related promises. - */ -async function processBatch(ajaxStream: AjaxStream, batch: Batch) { try { - await ajaxStream({ - url: FUNCTIONS_URL, - body: JSON.stringify({ - functions: Object.values(batch).map(({ request }) => request), - }), - onResponse({ id, statusCode, result }: any) { - const { future } = batch[id]; - - if (statusCode >= 400) { - future.reject(result); - } else { - future.resolve(result); - } - }, - }); - } catch (err) { + await promise; + } catch (error) { Object.values(batch).forEach(({ future }) => { - future.reject(err); + future.reject(error); }); } } diff --git a/src/legacy/core_plugins/interpreter/public/canvas/load_legacy_server_function_wrappers.ts b/src/legacy/core_plugins/interpreter/public/canvas/load_legacy_server_function_wrappers.ts index 1a67c14185def..2c2f79b3d6f51 100644 --- a/src/legacy/core_plugins/interpreter/public/canvas/load_legacy_server_function_wrappers.ts +++ b/src/legacy/core_plugins/interpreter/public/canvas/load_legacy_server_function_wrappers.ts @@ -30,9 +30,8 @@ import { get, identity } from 'lodash'; // @ts-ignore -import { npSetup } from 'ui/new_platform'; +import { npSetup, npStart } from 'ui/new_platform'; import { FUNCTIONS_URL } from './consts'; -import { ajaxStream } from './ajax_stream'; import { batchedFetch } from './batched_fetch'; export function getType(node: any) { @@ -69,10 +68,7 @@ export const loadLegacyServerFunctionWrappers = async () => { const types = npSetup.plugins.expressions.__LEGACY.types.toJS(); const { serialize } = serializeProvider(types); const batch = batchedFetch({ - ajaxStream: ajaxStream( - npSetup.core.injectedMetadata.getKibanaVersion(), - npSetup.core.injectedMetadata.getBasePath() - ), + fetchStreaming: npStart.plugins.bfetch.fetchStreaming, serialize, }); diff --git a/src/legacy/ui/public/new_platform/new_platform.ts b/src/legacy/ui/public/new_platform/new_platform.ts index f88d5be1328ca..f245cc063024d 100644 --- a/src/legacy/ui/public/new_platform/new_platform.ts +++ b/src/legacy/ui/public/new_platform/new_platform.ts @@ -33,9 +33,11 @@ import { KibanaLegacySetup, KibanaLegacyStart } from '../../../../plugins/kibana import { HomePublicPluginSetup, HomePublicPluginStart } from '../../../../plugins/home/public'; import { SharePluginSetup, SharePluginStart } from '../../../../plugins/share/public'; import { LicensingPluginSetup } from '../../../../../x-pack/plugins/licensing/common/types'; +import { BfetchPublicSetup, BfetchPublicStart } from '../../../../plugins/bfetch/public'; import { UsageCollectionSetup } from '../../../../plugins/usage_collection/public'; export interface PluginsSetup { + bfetch: BfetchPublicSetup; data: ReturnType; embeddable: IEmbeddableSetup; expressions: ReturnType; @@ -50,6 +52,7 @@ export interface PluginsSetup { } export interface PluginsStart { + bfetch: BfetchPublicStart; data: ReturnType; embeddable: IEmbeddableStart; eui_utils: EuiUtilsStart; diff --git a/src/plugins/bfetch/README.md b/src/plugins/bfetch/README.md new file mode 100644 index 0000000000000..9c18720e30d96 --- /dev/null +++ b/src/plugins/bfetch/README.md @@ -0,0 +1,9 @@ +# `bfetch` plugin + +`bfetch` allows to batch HTTP requests and streams responses back. + + +## Reference + +- [Browser](./docs/browser/reference.md) +- Server diff --git a/src/plugins/bfetch/common/index.ts b/src/plugins/bfetch/common/index.ts new file mode 100644 index 0000000000000..afa73ade80084 --- /dev/null +++ b/src/plugins/bfetch/common/index.ts @@ -0,0 +1,21 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export * from './util'; +export * from './streaming'; diff --git a/src/plugins/bfetch/common/streaming/index.ts b/src/plugins/bfetch/common/streaming/index.ts new file mode 100644 index 0000000000000..d8f7b5091eb8f --- /dev/null +++ b/src/plugins/bfetch/common/streaming/index.ts @@ -0,0 +1,20 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export * from './types'; diff --git a/src/plugins/bfetch/common/streaming/types.ts b/src/plugins/bfetch/common/streaming/types.ts new file mode 100644 index 0000000000000..1ee92edbc89ff --- /dev/null +++ b/src/plugins/bfetch/common/streaming/types.ts @@ -0,0 +1,24 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Observable } from 'rxjs'; + +export interface StreamingResponseHandler { + onRequest(payload: Payload): Observable; +} diff --git a/src/plugins/bfetch/common/types.ts b/src/plugins/bfetch/common/types.ts new file mode 100644 index 0000000000000..132f0e06e2c26 --- /dev/null +++ b/src/plugins/bfetch/common/types.ts @@ -0,0 +1,20 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export * from './streaming/types'; diff --git a/src/plugins/bfetch/common/util/index.ts b/src/plugins/bfetch/common/util/index.ts new file mode 100644 index 0000000000000..02843af9b4350 --- /dev/null +++ b/src/plugins/bfetch/common/util/index.ts @@ -0,0 +1,20 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export * from './remove_leading_slash'; diff --git a/src/plugins/bfetch/common/util/remove_leading_slash.ts b/src/plugins/bfetch/common/util/remove_leading_slash.ts new file mode 100644 index 0000000000000..99b185a3d4848 --- /dev/null +++ b/src/plugins/bfetch/common/util/remove_leading_slash.ts @@ -0,0 +1,20 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export const removeLeadingSlash = (text: string) => (text[0] === '/' ? text.substr(1) : text); diff --git a/src/plugins/bfetch/docs/browser/reference.md b/src/plugins/bfetch/docs/browser/reference.md new file mode 100644 index 0000000000000..47a67c08a4c1f --- /dev/null +++ b/src/plugins/bfetch/docs/browser/reference.md @@ -0,0 +1,15 @@ +# `bfetch` browser reference + +- [`fetchStreaming`](#fetchStreaming) + + +## `fetchStreaming` + +Executes an HTTP request and expects that server streams back results using +HTTP/1 `Transfer-Encoding: chunked`. + +```ts +const { stream } = bfetch.fetchStreaming({ url: 'http://elastic.co' }); + +stream.subscribe(value => {}); +``` \ No newline at end of file diff --git a/src/plugins/bfetch/kibana.json b/src/plugins/bfetch/kibana.json new file mode 100644 index 0000000000000..462d2f4b8bb7d --- /dev/null +++ b/src/plugins/bfetch/kibana.json @@ -0,0 +1,6 @@ +{ + "id": "bfetch", + "version": "kibana", + "server": true, + "ui": true +} diff --git a/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/index.ts b/src/plugins/bfetch/public/index.ts similarity index 70% rename from src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/index.ts rename to src/plugins/bfetch/public/index.ts index 5a58414499594..a57dd77fe7e67 100644 --- a/src/legacy/core_plugins/interpreter/public/canvas/ajax_stream/index.ts +++ b/src/plugins/bfetch/public/index.ts @@ -17,12 +17,11 @@ * under the License. */ -import { ajaxStream as ajax, BatchOpts } from './ajax_stream'; +import { PluginInitializerContext } from '../../../core/public'; +import { BfetchPublicPlugin } from './plugin'; -export const ajaxStream = (version: string, basePath: string) => (opts: BatchOpts) => { - const defaultHeaders = { - 'Content-Type': 'application/json', - 'kbn-version': version, - }; - return ajax(basePath, defaultHeaders, new XMLHttpRequest(), opts); -}; +export { BfetchPublicSetup, BfetchPublicStart, BfetchPublicApi } from './plugin'; + +export function plugin(initializerContext: PluginInitializerContext) { + return new BfetchPublicPlugin(initializerContext); +} diff --git a/src/plugins/bfetch/public/mocks.ts b/src/plugins/bfetch/public/mocks.ts new file mode 100644 index 0000000000000..e8caf5c9cb739 --- /dev/null +++ b/src/plugins/bfetch/public/mocks.ts @@ -0,0 +1,63 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { BfetchPublicSetup, BfetchPublicStart } from '.'; +import { plugin as pluginInitializer } from '.'; +import { coreMock } from '../../../core/public/mocks'; + +export type Setup = jest.Mocked; +export type Start = jest.Mocked; + +const createSetupContract = (): Setup => { + const setupContract: Setup = { + fetchStreaming: jest.fn(), + }; + return setupContract; +}; + +const createStartContract = (): Start => { + const startContract: Start = { + fetchStreaming: jest.fn(), + }; + + return startContract; +}; + +const createPlugin = async () => { + const pluginInitializerContext = coreMock.createPluginInitializerContext(); + const coreSetup = coreMock.createSetup(); + const coreStart = coreMock.createStart(); + const plugin = pluginInitializer(pluginInitializerContext); + const setup = await plugin.setup(coreSetup, {}); + + return { + pluginInitializerContext, + coreSetup, + coreStart, + plugin, + setup, + doStart: async () => await plugin.start(coreStart, {}), + }; +}; + +export const uiActionsPluginMock = { + createSetupContract, + createStartContract, + createPlugin, +}; diff --git a/src/plugins/bfetch/public/plugin.ts b/src/plugins/bfetch/public/plugin.ts new file mode 100644 index 0000000000000..db18a15afa1e7 --- /dev/null +++ b/src/plugins/bfetch/public/plugin.ts @@ -0,0 +1,81 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { CoreStart, PluginInitializerContext, CoreSetup, Plugin } from 'src/core/public'; +import { fetchStreaming as fetchStreamingStatic, FetchStreamingParams } from './streaming'; +import { removeLeadingSlash } from '../common'; + +// eslint-disable-next-line +export interface BfetchPublicSetupDependencies {} + +// eslint-disable-next-line +export interface BfetchPublicStartDependencies {} + +export interface BfetchPublicApi { + fetchStreaming: (params: FetchStreamingParams) => ReturnType; +} + +export type BfetchPublicSetup = BfetchPublicApi; +export type BfetchPublicStart = BfetchPublicApi; + +export class BfetchPublicPlugin + implements + Plugin< + BfetchPublicSetup, + BfetchPublicStart, + BfetchPublicSetupDependencies, + BfetchPublicStartDependencies + > { + private api!: BfetchPublicApi; + + constructor(private readonly initializerContext: PluginInitializerContext) {} + + public setup(core: CoreSetup, plugins: BfetchPublicSetupDependencies): BfetchPublicSetup { + const { version } = this.initializerContext.env.packageInfo; + const basePath = core.http.basePath.get(); + + const fetchStreaming = this.fetchStreaming(version, basePath); + + this.api = { + fetchStreaming, + }; + + return this.api; + } + + public start(core: CoreStart, plugins: BfetchPublicStartDependencies): BfetchPublicStart { + return this.api; + } + + public stop() {} + + private fetchStreaming = ( + version: string, + basePath: string + ): BfetchPublicSetup['fetchStreaming'] => params => + fetchStreamingStatic({ + ...params, + url: `${basePath}/${removeLeadingSlash(params.url)}`, + headers: { + 'Content-Type': 'application/json', + 'kbn-version': version, + ...(params.headers || {}), + }, + }); +} diff --git a/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts b/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts new file mode 100644 index 0000000000000..e59af71cb76bc --- /dev/null +++ b/src/plugins/bfetch/public/streaming/fetch_streaming.test.ts @@ -0,0 +1,248 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { fetchStreaming } from './fetch_streaming'; +import { mockXMLHttpRequest } from '../test_helpers/xhr'; + +const tick = () => new Promise(resolve => setTimeout(resolve, 1)); + +const setup = () => { + const { xhr, XMLHttpRequest } = mockXMLHttpRequest(); + window.XMLHttpRequest = XMLHttpRequest; + return { xhr }; +}; + +test('returns XHR request', () => { + setup(); + const { xhr } = fetchStreaming({ + url: 'http://example.com', + }); + expect(typeof xhr.readyState).toBe('number'); +}); + +test('returns promise', () => { + setup(); + const { promise } = fetchStreaming({ + url: 'http://example.com', + }); + expect(typeof promise.then).toBe('function'); +}); + +test('returns stream', () => { + setup(); + const { stream } = fetchStreaming({ + url: 'http://example.com', + }); + expect(typeof stream.subscribe).toBe('function'); +}); + +test('promise resolves when request completes', async () => { + const env = setup(); + const { promise } = fetchStreaming({ + url: 'http://example.com', + }); + + let resolved = false; + promise.then(() => (resolved = true)); + + await tick(); + expect(resolved).toBe(false); + + (env.xhr as any).responseText = 'foo'; + env.xhr.onprogress!({} as any); + + await tick(); + expect(resolved).toBe(false); + + (env.xhr as any).responseText = 'foo\nbar'; + env.xhr.onprogress!({} as any); + + await tick(); + expect(resolved).toBe(false); + + (env.xhr as any).readyState = 4; + (env.xhr as any).status = 200; + env.xhr.onreadystatechange!({} as any); + + await tick(); + expect(resolved).toBe(true); +}); + +test('streams incoming text as it comes through', async () => { + const env = setup(); + const { stream } = fetchStreaming({ + url: 'http://example.com', + }); + + const spy = jest.fn(); + stream.subscribe(spy); + + await tick(); + expect(spy).toHaveBeenCalledTimes(0); + + (env.xhr as any).responseText = 'foo'; + env.xhr.onprogress!({} as any); + + await tick(); + expect(spy).toHaveBeenCalledTimes(1); + expect(spy).toHaveBeenCalledWith('foo'); + + (env.xhr as any).responseText = 'foo\nbar'; + env.xhr.onprogress!({} as any); + + await tick(); + expect(spy).toHaveBeenCalledTimes(2); + expect(spy).toHaveBeenCalledWith('\nbar'); + + (env.xhr as any).readyState = 4; + (env.xhr as any).status = 200; + env.xhr.onreadystatechange!({} as any); + + await tick(); + expect(spy).toHaveBeenCalledTimes(2); +}); + +test('completes stream observable when request finishes', async () => { + const env = setup(); + const { stream } = fetchStreaming({ + url: 'http://example.com', + }); + + const spy = jest.fn(); + stream.subscribe({ + complete: spy, + }); + + expect(spy).toHaveBeenCalledTimes(0); + + (env.xhr as any).responseText = 'foo'; + env.xhr.onprogress!({} as any); + (env.xhr as any).readyState = 4; + (env.xhr as any).status = 200; + env.xhr.onreadystatechange!({} as any); + + expect(spy).toHaveBeenCalledTimes(1); +}); + +test('promise throws when request errors', async () => { + const env = setup(); + const { promise } = fetchStreaming({ + url: 'http://example.com', + }); + + const spy = jest.fn(); + promise.catch(spy); + + await tick(); + expect(spy).toHaveBeenCalledTimes(0); + + (env.xhr as any).responseText = 'foo'; + env.xhr.onprogress!({} as any); + (env.xhr as any).readyState = 4; + (env.xhr as any).status = 400; + env.xhr.onreadystatechange!({} as any); + + await tick(); + expect(spy).toHaveBeenCalledTimes(1); + expect(spy.mock.calls[0][0]).toBeInstanceOf(Error); + expect(spy.mock.calls[0][0].message).toMatchInlineSnapshot( + `"Batch request failed with status 400"` + ); +}); + +test('stream observable errors when request errors', async () => { + const env = setup(); + const { promise, stream } = fetchStreaming({ + url: 'http://example.com', + }); + + const spy = jest.fn(); + promise.catch(() => {}); + stream.subscribe({ + error: spy, + }); + + await tick(); + expect(spy).toHaveBeenCalledTimes(0); + + (env.xhr as any).responseText = 'foo'; + env.xhr.onprogress!({} as any); + (env.xhr as any).readyState = 4; + (env.xhr as any).status = 400; + env.xhr.onreadystatechange!({} as any); + + await tick(); + expect(spy).toHaveBeenCalledTimes(1); + expect(spy.mock.calls[0][0]).toBeInstanceOf(Error); + expect(spy.mock.calls[0][0].message).toMatchInlineSnapshot( + `"Batch request failed with status 400"` + ); +}); + +test('sets custom headers', async () => { + const env = setup(); + fetchStreaming({ + url: 'http://example.com', + headers: { + 'Content-Type': 'text/plain', + Authorization: 'Bearer 123', + }, + }); + + expect(env.xhr.setRequestHeader).toHaveBeenCalledWith('Content-Type', 'text/plain'); + expect(env.xhr.setRequestHeader).toHaveBeenCalledWith('Authorization', 'Bearer 123'); +}); + +test('uses credentials', async () => { + const env = setup(); + + expect(env.xhr.withCredentials).toBe(false); + + fetchStreaming({ + url: 'http://example.com', + }); + + expect(env.xhr.withCredentials).toBe(true); +}); + +test('opens XHR request and sends specified body', async () => { + const env = setup(); + + expect(env.xhr.open).toHaveBeenCalledTimes(0); + expect(env.xhr.send).toHaveBeenCalledTimes(0); + + fetchStreaming({ + url: 'http://elastic.co', + method: 'GET', + body: 'foobar', + }); + + expect(env.xhr.open).toHaveBeenCalledTimes(1); + expect(env.xhr.send).toHaveBeenCalledTimes(1); + expect(env.xhr.open).toHaveBeenCalledWith('GET', 'http://elastic.co'); + expect(env.xhr.send).toHaveBeenCalledWith('foobar'); +}); + +test('uses POST request method by default', async () => { + const env = setup(); + fetchStreaming({ + url: 'http://elastic.co', + }); + expect(env.xhr.open).toHaveBeenCalledWith('POST', 'http://elastic.co'); +}); diff --git a/src/plugins/bfetch/public/streaming/fetch_streaming.ts b/src/plugins/bfetch/public/streaming/fetch_streaming.ts new file mode 100644 index 0000000000000..44a3693e7010b --- /dev/null +++ b/src/plugins/bfetch/public/streaming/fetch_streaming.ts @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { defer } from '../../../kibana_utils/common'; +import { fromStreamingXhr } from './from_streaming_xhr'; + +export interface FetchStreamingParams { + url: string; + headers?: Record; + method?: 'GET' | 'POST'; + body?: string; +} + +/** + * Sends an AJAX request to the server, and processes the result as a + * streaming HTTP/1 response. Streams data as text through observable. + */ +export function fetchStreaming({ + url, + headers = {}, + method = 'POST', + body = '', +}: FetchStreamingParams) { + const xhr = new window.XMLHttpRequest(); + const { promise, resolve, reject } = defer(); + + // Begin the request + xhr.open(method, url); + xhr.withCredentials = true; + + // Set the HTTP headers + Object.entries(headers).forEach(([k, v]) => xhr.setRequestHeader(k, v)); + + const stream = fromStreamingXhr(xhr); + + stream.subscribe({ + complete: () => resolve(), + error: error => reject(error), + }); + + // Send the payload to the server + xhr.send(body); + + return { + xhr, + promise, + stream, + }; +} diff --git a/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts b/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts new file mode 100644 index 0000000000000..40eb3d5e2556b --- /dev/null +++ b/src/plugins/bfetch/public/streaming/from_streaming_xhr.test.ts @@ -0,0 +1,217 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { fromStreamingXhr } from './from_streaming_xhr'; + +const createXhr = (): XMLHttpRequest => + (({ + onprogress: () => {}, + onreadystatechange: () => {}, + readyState: 0, + responseText: '', + status: 0, + } as unknown) as XMLHttpRequest); + +test('returns observable', () => { + const xhr = createXhr(); + const observable = fromStreamingXhr(xhr); + expect(typeof observable.subscribe).toBe('function'); +}); + +test('emits an event to observable', () => { + const xhr = createXhr(); + const observable = fromStreamingXhr(xhr); + + const spy = jest.fn(); + observable.subscribe(spy); + + expect(spy).toHaveBeenCalledTimes(0); + + (xhr as any).responseText = 'foo'; + xhr.onprogress!({} as any); + + expect(spy).toHaveBeenCalledTimes(1); + expect(spy).toHaveBeenCalledWith('foo'); +}); + +test('streams multiple events to observable', () => { + const xhr = createXhr(); + const observable = fromStreamingXhr(xhr); + + const spy = jest.fn(); + observable.subscribe(spy); + + expect(spy).toHaveBeenCalledTimes(0); + + (xhr as any).responseText = '1'; + xhr.onprogress!({} as any); + + (xhr as any).responseText = '12'; + xhr.onprogress!({} as any); + + (xhr as any).responseText = '123'; + xhr.onprogress!({} as any); + + expect(spy).toHaveBeenCalledTimes(3); + expect(spy.mock.calls[0][0]).toBe('1'); + expect(spy.mock.calls[1][0]).toBe('2'); + expect(spy.mock.calls[2][0]).toBe('3'); +}); + +test('completes observable when request reaches end state', () => { + const xhr = createXhr(); + const observable = fromStreamingXhr(xhr); + + const next = jest.fn(); + const complete = jest.fn(); + observable.subscribe({ + next, + complete, + }); + + (xhr as any).responseText = '1'; + xhr.onprogress!({} as any); + + (xhr as any).responseText = '2'; + xhr.onprogress!({} as any); + + expect(complete).toHaveBeenCalledTimes(0); + + (xhr as any).readyState = 4; + (xhr as any).status = 200; + xhr.onreadystatechange!({} as any); + + expect(complete).toHaveBeenCalledTimes(1); +}); + +test('errors observable if request returns with error', () => { + const xhr = createXhr(); + const observable = fromStreamingXhr(xhr); + + const next = jest.fn(); + const complete = jest.fn(); + const error = jest.fn(); + observable.subscribe({ + next, + complete, + error, + }); + + (xhr as any).responseText = '1'; + xhr.onprogress!({} as any); + + (xhr as any).responseText = '2'; + xhr.onprogress!({} as any); + + expect(complete).toHaveBeenCalledTimes(0); + + (xhr as any).readyState = 4; + (xhr as any).status = 400; + xhr.onreadystatechange!({} as any); + + expect(complete).toHaveBeenCalledTimes(0); + expect(error).toHaveBeenCalledTimes(1); + expect(error.mock.calls[0][0]).toBeInstanceOf(Error); + expect(error.mock.calls[0][0].message).toMatchInlineSnapshot( + `"Batch request failed with status 400"` + ); +}); + +test('when .onprogress called multiple times with same text, does not create new observable events', () => { + const xhr = createXhr(); + const observable = fromStreamingXhr(xhr); + + const spy = jest.fn(); + observable.subscribe(spy); + + expect(spy).toHaveBeenCalledTimes(0); + + (xhr as any).responseText = '1'; + xhr.onprogress!({} as any); + + (xhr as any).responseText = '1'; + xhr.onprogress!({} as any); + + (xhr as any).responseText = '12'; + xhr.onprogress!({} as any); + + (xhr as any).responseText = '12'; + xhr.onprogress!({} as any); + + (xhr as any).responseText = '123'; + xhr.onprogress!({} as any); + + expect(spy).toHaveBeenCalledTimes(3); + expect(spy.mock.calls[0][0]).toBe('1'); + expect(spy.mock.calls[1][0]).toBe('2'); + expect(spy.mock.calls[2][0]).toBe('3'); +}); + +test('generates new observable events on .onreadystatechange', () => { + const xhr = createXhr(); + const observable = fromStreamingXhr(xhr); + + const spy = jest.fn(); + observable.subscribe(spy); + + expect(spy).toHaveBeenCalledTimes(0); + + (xhr as any).responseText = '{"foo":"bar"}'; + xhr.onreadystatechange!({} as any); + + (xhr as any).responseText = '{"foo":"bar"}\n'; + xhr.onreadystatechange!({} as any); + + (xhr as any).responseText = '{"foo":"bar"}\n123'; + xhr.onreadystatechange!({} as any); + + expect(spy).toHaveBeenCalledTimes(3); + expect(spy.mock.calls[0][0]).toBe('{"foo":"bar"}'); + expect(spy.mock.calls[1][0]).toBe('\n'); + expect(spy.mock.calls[2][0]).toBe('123'); +}); + +test('.onreadystatechange and .onprogress can be called in any order', () => { + const xhr = createXhr(); + const observable = fromStreamingXhr(xhr); + + const spy = jest.fn(); + observable.subscribe(spy); + + expect(spy).toHaveBeenCalledTimes(0); + + (xhr as any).responseText = '{"foo":"bar"}'; + xhr.onreadystatechange!({} as any); + xhr.onprogress!({} as any); + + (xhr as any).responseText = '{"foo":"bar"}\n'; + xhr.onprogress!({} as any); + xhr.onreadystatechange!({} as any); + + (xhr as any).responseText = '{"foo":"bar"}\n123'; + xhr.onreadystatechange!({} as any); + xhr.onprogress!({} as any); + xhr.onreadystatechange!({} as any); + xhr.onprogress!({} as any); + + expect(spy).toHaveBeenCalledTimes(3); + expect(spy.mock.calls[0][0]).toBe('{"foo":"bar"}'); + expect(spy.mock.calls[1][0]).toBe('\n'); + expect(spy.mock.calls[2][0]).toBe('123'); +}); diff --git a/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts b/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts new file mode 100644 index 0000000000000..bba8151958492 --- /dev/null +++ b/src/plugins/bfetch/public/streaming/from_streaming_xhr.ts @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Observable, Subject } from 'rxjs'; + +/** + * Creates observable from streaming XMLHttpRequest, where each event + * corresponds to a streamed chunk. + */ +export const fromStreamingXhr = ( + xhr: Pick< + XMLHttpRequest, + 'onprogress' | 'onreadystatechange' | 'readyState' | 'status' | 'responseText' + > +): Observable => { + const subject = new Subject(); + let index = 0; + + const processBatch = () => { + const { responseText } = xhr; + if (index >= responseText.length) return; + subject.next(responseText.substr(index)); + index = responseText.length; + }; + + xhr.onprogress = processBatch; + + xhr.onreadystatechange = () => { + // Older browsers don't support onprogress, so we need + // to call this here, too. It's safe to call this multiple + // times even for the same progress event. + processBatch(); + + // 4 is the magic number that means the request is done + if (xhr.readyState === 4) { + // 0 indicates a network failure. 400+ messages are considered server errors + if (xhr.status === 0 || xhr.status >= 400) { + subject.error(new Error(`Batch request failed with status ${xhr.status}`)); + } else { + subject.complete(); + } + } + }; + + return subject; +}; diff --git a/src/plugins/bfetch/public/streaming/index.ts b/src/plugins/bfetch/public/streaming/index.ts new file mode 100644 index 0000000000000..6368c0d1ac697 --- /dev/null +++ b/src/plugins/bfetch/public/streaming/index.ts @@ -0,0 +1,22 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export * from './split'; +export * from './from_streaming_xhr'; +export * from './fetch_streaming'; diff --git a/src/plugins/bfetch/public/streaming/split.test.ts b/src/plugins/bfetch/public/streaming/split.test.ts new file mode 100644 index 0000000000000..6eb3e27ad8598 --- /dev/null +++ b/src/plugins/bfetch/public/streaming/split.test.ts @@ -0,0 +1,71 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { split } from './split'; +import { Subject } from 'rxjs'; + +test('splits a single IP address', () => { + const ip = '127.0.0.1'; + const list: string[] = []; + const subject = new Subject(); + const splitted = split('.')(subject); + + splitted.subscribe(value => list.push(value)); + + subject.next(ip); + subject.complete(); + expect(list).toEqual(['127', '0', '0', '1']); +}); + +const streams = [ + 'adsf.asdf.asdf', + 'single.dot', + 'empty..split', + 'trailingdot.', + '.leadingdot', + '.', + '....', + 'no_delimiter', + '1.2.3.4.5', + '1.2.3.4.5.', + '.1.2.3.4.5.', + '.1.2.3.4.5', +]; + +for (const stream of streams) { + test(`splits stream by delimiter correctly "${stream}"`, () => { + const correctResult = stream.split('.').filter(Boolean); + + for (let j = 0; j < 100; j++) { + const list: string[] = []; + const subject = new Subject(); + const splitted = split('.')(subject); + splitted.subscribe(value => list.push(value)); + let i = 0; + while (i < stream.length) { + const len = Math.round(Math.random() * 10); + const chunk = stream.substr(i, len); + subject.next(chunk); + i += len; + } + subject.complete(); + expect(list).toEqual(correctResult); + } + }); +} diff --git a/src/plugins/bfetch/public/streaming/split.ts b/src/plugins/bfetch/public/streaming/split.ts new file mode 100644 index 0000000000000..665411f472ac3 --- /dev/null +++ b/src/plugins/bfetch/public/streaming/split.ts @@ -0,0 +1,59 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Observable, Subject } from 'rxjs'; +import { filter } from 'rxjs/operators'; + +/** + * Receives observable that emits strings, and returns a new observable + * that also returns strings separated by delimiter. + * + * Input stream: + * + * asdf.f -> df..aaa. -> dfsdf + * + * Output stream, assuming "." is used as delimiter: + * + * asdf -> fdf -> aaa -> dfsdf + * + */ +export const split = (delimiter: string = '\n') => ( + in$: Observable +): Observable => { + const out$ = new Subject(); + let startingText = ''; + + in$.subscribe( + chunk => { + const messages = (startingText + chunk).split(delimiter); + + // We don't want to send the last message here, since it may or + // may not be a partial message. + messages.slice(0, -1).forEach(out$.next.bind(out$)); + startingText = messages.length ? messages[messages.length - 1] : ''; + }, + out$.error.bind(out$), + () => { + out$.next(startingText); + out$.complete(); + } + ); + + return out$.pipe(filter(Boolean)); +}; diff --git a/src/plugins/bfetch/public/test_helpers/xhr.ts b/src/plugins/bfetch/public/test_helpers/xhr.ts new file mode 100644 index 0000000000000..d7046e9a89458 --- /dev/null +++ b/src/plugins/bfetch/public/test_helpers/xhr.ts @@ -0,0 +1,73 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* eslint-disable max-classes-per-file */ + +export const mockXMLHttpRequest = (): { + xhr: XMLHttpRequest; + XMLHttpRequest: typeof window.XMLHttpRequest; +} => { + class MockXMLHttpRequest implements XMLHttpRequest { + DONE = 0; + HEADERS_RECEIVED = 0; + LOADING = 0; + OPENED = 0; + UNSENT = 0; + abort = jest.fn(); + addEventListener = jest.fn(); + dispatchEvent = jest.fn(); + getAllResponseHeaders = jest.fn(); + getResponseHeader = jest.fn(); + onabort = jest.fn(); + onerror = jest.fn(); + onload = jest.fn(); + onloadend = jest.fn(); + onloadstart = jest.fn(); + onprogress = jest.fn(); + onreadystatechange = jest.fn(); + ontimeout = jest.fn(); + open = jest.fn(); + overrideMimeType = jest.fn(); + readyState = 0; + removeEventListener = jest.fn(); + response = null; + responseText = ''; + responseType = null as any; + responseURL = ''; + responseXML = null; + send = jest.fn(); + setRequestHeader = jest.fn(); + status = 0; + statusText = ''; + timeout = 0; + upload = null as any; + withCredentials = false; + } + + const xhr = new MockXMLHttpRequest(); + + return { + xhr, + XMLHttpRequest: class { + constructor() { + return xhr; + } + } as any, + }; +}; diff --git a/src/plugins/bfetch/public/types.ts b/src/plugins/bfetch/public/types.ts new file mode 100644 index 0000000000000..9880b336e76e5 --- /dev/null +++ b/src/plugins/bfetch/public/types.ts @@ -0,0 +1,18 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ diff --git a/src/plugins/bfetch/server/index.ts b/src/plugins/bfetch/server/index.ts new file mode 100644 index 0000000000000..f1a3f7fd44cf6 --- /dev/null +++ b/src/plugins/bfetch/server/index.ts @@ -0,0 +1,27 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { PluginInitializerContext } from '../../../core/server'; +import { BfetchServerPlugin } from './plugin'; + +export { BfetchServerSetup, BfetchServerStart } from './plugin'; + +export function plugin(initializerContext: PluginInitializerContext) { + return new BfetchServerPlugin(initializerContext); +} diff --git a/src/plugins/bfetch/server/mocks.ts b/src/plugins/bfetch/server/mocks.ts new file mode 100644 index 0000000000000..8ec68650a60dc --- /dev/null +++ b/src/plugins/bfetch/server/mocks.ts @@ -0,0 +1,61 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { BfetchServerSetup, BfetchServerStart } from '.'; +import { plugin as pluginInitializer } from '.'; +import { coreMock } from '../../../core/server/mocks'; + +export type Setup = jest.Mocked; +export type Start = jest.Mocked; + +const createSetupContract = (): Setup => { + const setupContract: Setup = { + addStreamingResponseRoute: jest.fn(), + }; + return setupContract; +}; + +const createStartContract = (): Start => { + const startContract: Start = {}; + + return startContract; +}; + +const createPlugin = async () => { + const pluginInitializerContext = coreMock.createPluginInitializerContext(); + const coreSetup = coreMock.createSetup(); + const coreStart = coreMock.createStart(); + const plugin = pluginInitializer(pluginInitializerContext); + const setup = await plugin.setup(coreSetup, {}); + + return { + pluginInitializerContext, + coreSetup, + coreStart, + plugin, + setup, + doStart: async () => await plugin.start(coreStart, {}), + }; +}; + +export const uiActionsPluginMock = { + createSetupContract, + createStartContract, + createPlugin, +}; diff --git a/src/plugins/bfetch/server/plugin.ts b/src/plugins/bfetch/server/plugin.ts new file mode 100644 index 0000000000000..75baeafc17669 --- /dev/null +++ b/src/plugins/bfetch/server/plugin.ts @@ -0,0 +1,92 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { CoreStart, PluginInitializerContext, CoreSetup, Plugin, Logger } from 'src/core/server'; +import { schema } from '@kbn/config-schema'; +import { StreamingResponseHandler, removeLeadingSlash } from '../common'; +import { createNDJSONStream } from './streaming'; + +// eslint-disable-next-line +export interface BfetchServerSetupDependencies {} + +// eslint-disable-next-line +export interface BfetchServerStartDependencies {} + +export interface BfetchServerSetup { + addStreamingResponseRoute: (path: string, handler: StreamingResponseHandler) => void; +} + +// eslint-disable-next-line +export interface BfetchServerStart {} + +export class BfetchServerPlugin + implements + Plugin< + BfetchServerSetup, + BfetchServerStart, + BfetchServerSetupDependencies, + BfetchServerStartDependencies + > { + constructor(private readonly initializerContext: PluginInitializerContext) {} + + public setup(core: CoreSetup, plugins: BfetchServerSetupDependencies): BfetchServerSetup { + const logger = this.initializerContext.logger.get(); + const router = core.http.createRouter(); + const addStreamingResponseRoute = this.addStreamingResponseRoute({ router, logger }); + + return { + addStreamingResponseRoute, + }; + } + + public start(core: CoreStart, plugins: BfetchServerStartDependencies): BfetchServerStart { + return {}; + } + + public stop() {} + + private addStreamingResponseRoute = ({ + router, + logger, + }: { + router: ReturnType; + logger: Logger; + }): BfetchServerSetup['addStreamingResponseRoute'] => (path, handler) => { + router.post( + { + path: `/${removeLeadingSlash(path)}`, + validate: { + body: schema.any(), + }, + }, + async (context, request, response) => { + const data = request.body; + return response.ok({ + headers: { + 'Content-Type': 'application/x-ndjson', + Connection: 'keep-alive', + 'Transfer-Encoding': 'chunked', + 'Cache-Control': 'no-cache', + }, + body: createNDJSONStream(data, handler, logger), + }); + } + ); + }; +} diff --git a/src/plugins/bfetch/server/streaming/create_ndjson_stream.ts b/src/plugins/bfetch/server/streaming/create_ndjson_stream.ts new file mode 100644 index 0000000000000..b1f39f4acbcb5 --- /dev/null +++ b/src/plugins/bfetch/server/streaming/create_ndjson_stream.ts @@ -0,0 +1,52 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Logger } from 'src/core/server'; +import { Stream, PassThrough } from 'stream'; +import { StreamingResponseHandler } from '../../common/types'; + +const delimiter = '\n'; + +export const createNDJSONStream = ( + payload: Payload, + handler: StreamingResponseHandler, + logger: Logger +): Stream => { + const stream = new PassThrough(); + const results = handler.onRequest(payload); + + results.subscribe({ + next: (message: Response) => { + try { + const line = JSON.stringify(message); + stream.write(`${line}${delimiter}`); + } catch (error) { + logger.error('Could not serialize or stream a message.'); + logger.error(error); + } + }, + error: error => { + stream.end(); + logger.error(error); + }, + complete: () => stream.end(), + }); + + return stream; +}; diff --git a/src/plugins/bfetch/server/streaming/index.ts b/src/plugins/bfetch/server/streaming/index.ts new file mode 100644 index 0000000000000..5c9904a75786a --- /dev/null +++ b/src/plugins/bfetch/server/streaming/index.ts @@ -0,0 +1,20 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export * from './create_ndjson_stream'; diff --git a/src/plugins/kibana_utils/common/defer.test.ts b/src/plugins/kibana_utils/common/defer.test.ts new file mode 100644 index 0000000000000..e05727847f247 --- /dev/null +++ b/src/plugins/kibana_utils/common/defer.test.ts @@ -0,0 +1,69 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { Defer } from './defer'; + +const tick = () => new Promise(resolve => setTimeout(resolve, 1)); + +describe('new Defer()', () => { + test('has .promise Promise object', () => { + expect(new Defer().promise).toBeInstanceOf(Promise); + }); + + test('has .resolve() method', () => { + expect(typeof new Defer().resolve).toBe('function'); + }); + + test('has .reject() method', () => { + expect(typeof new Defer().reject).toBe('function'); + }); + + test('resolves promise when .reject() is called', async () => { + const defer = new Defer(); + const then = jest.fn(); + defer.promise.then(then); + + await tick(); + expect(then).toHaveBeenCalledTimes(0); + + defer.resolve(123); + + await tick(); + expect(then).toHaveBeenCalledTimes(1); + expect(then).toHaveBeenCalledWith(123); + }); + + test('rejects promise when .reject() is called', async () => { + const defer = new Defer(); + const then = jest.fn(); + const spy = jest.fn(); + defer.promise.then(then).catch(spy); + + await tick(); + expect(then).toHaveBeenCalledTimes(0); + expect(spy).toHaveBeenCalledTimes(0); + + defer.reject('oops'); + + await tick(); + expect(then).toHaveBeenCalledTimes(0); + expect(spy).toHaveBeenCalledTimes(1); + expect(spy).toHaveBeenCalledWith('oops'); + }); +}); diff --git a/src/plugins/kibana_utils/common/defer.ts b/src/plugins/kibana_utils/common/defer.ts new file mode 100644 index 0000000000000..bf8fa836ed172 --- /dev/null +++ b/src/plugins/kibana_utils/common/defer.ts @@ -0,0 +1,41 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * An externally resolvable/rejectable "promise". Use it to resolve/reject + * promise at any time. + * + * ```ts + * const future = new Defer(); + * + * future.promise.then(value => console.log(value)); + * + * future.resolve(123); + * ``` + */ +export class Defer { + public readonly resolve!: (data: T) => void; + public readonly reject!: (error: any) => void; + public readonly promise: Promise = new Promise((resolve, reject) => { + (this as any).resolve = resolve; + (this as any).reject = reject; + }); +} + +export const defer = () => new Defer(); diff --git a/src/plugins/kibana_utils/common/index.ts b/src/plugins/kibana_utils/common/index.ts new file mode 100644 index 0000000000000..d13a250cedf2e --- /dev/null +++ b/src/plugins/kibana_utils/common/index.ts @@ -0,0 +1,20 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export * from './defer'; diff --git a/src/plugins/kibana_utils/public/index.ts b/src/plugins/kibana_utils/public/index.ts index 3f5aeebac54d8..6e6b5c582b0eb 100644 --- a/src/plugins/kibana_utils/public/index.ts +++ b/src/plugins/kibana_utils/public/index.ts @@ -17,9 +17,9 @@ * under the License. */ +export { defer } from '../common'; export * from './core'; export * from './errors'; -export * from './errors'; export * from './field_mapping'; export * from './parse'; export * from './render_complete';