Skip to content

Commit

Permalink
fix: 🐛 fix tests and inject fetchStreaming() method as dep
Browse files Browse the repository at this point in the history
  • Loading branch information
streamich committed Dec 12, 2019
1 parent 377b6d0 commit c0d2027
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,35 @@
*/

import { batchedFetch, Request } from './batched_fetch';
import { defer } from '../../../../../plugins/kibana_utils/public';
import { Subject } from 'rxjs';

const serialize = (o: any) => JSON.stringify(o);

const ajaxStream = jest.fn(async ({ body, onResponse }) => {
const fetchStreaming = jest.fn(({ body }) => {
const { functions } = JSON.parse(body);
functions.map(({ id, functionName, context, args }: Request) =>
onResponse({
id,
statusCode: context,
result: Number(context) >= 400 ? { err: {} } : `${functionName}${context}${args}`,
})
);
});
const { promise, resolve } = defer<void>();
const stream = new Subject<any>();

setTimeout(() => {
functions.map(({ id, functionName, context, args }: Request) =>
stream.next(
JSON.stringify({
id,
statusCode: context,
result: Number(context) >= 400 ? { err: {} } : `${functionName}${context}${args}`,
}) + '\n'
)
);
resolve();
}, 1);

return { promise, stream };
}) as any;

describe('batchedFetch', () => {
it('resolves the correct promise', async () => {
const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 });
const ajax = batchedFetch({ fetchStreaming, serialize, ms: 1 });

const result = await Promise.all([
ajax({ functionName: 'a', context: 1, args: 'aaa' }),
Expand All @@ -45,7 +57,7 @@ describe('batchedFetch', () => {
});

it('dedupes duplicate calls', async () => {
const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 });
const ajax = batchedFetch({ fetchStreaming, serialize, ms: 1 });

const result = await Promise.all([
ajax({ functionName: 'a', context: 1, args: 'aaa' }),
Expand All @@ -55,11 +67,11 @@ describe('batchedFetch', () => {
]);

expect(result).toEqual(['a1aaa', 'b2bbb', 'a1aaa', 'a1aaa']);
expect(ajaxStream).toHaveBeenCalledTimes(2);
expect(fetchStreaming).toHaveBeenCalledTimes(2);
});

it('rejects responses whose statusCode is >= 300', async () => {
const ajax = batchedFetch({ ajaxStream, serialize, ms: 1 });
const ajax = batchedFetch({ fetchStreaming, serialize, ms: 1 });

const result = await Promise.all([
ajax({ functionName: 'a', context: 500, args: 'aaa' }).catch(() => 'fail'),
Expand Down
17 changes: 10 additions & 7 deletions src/legacy/core_plugins/interpreter/public/canvas/batched_fetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
*/

import _ from 'lodash';
import { npStart } from 'ui/new_platform';
import { filter, map } from 'rxjs/operators';
// eslint-disable-next-line
import { split } from '../../../../../plugins/bfetch/public/streaming';
import { BfetchPublicApi } from '../../../../../plugins/bfetch/public';
import { defer } from '../../../../../plugins/kibana_utils/public';
import { FUNCTIONS_URL } from './consts';

export interface Options {
fetchStreaming: BfetchPublicApi['fetchStreaming'];
serialize: any;
ms?: number;
}
Expand All @@ -48,7 +49,7 @@ export interface Request {
* Create a function which executes an Expression function on the
* server as part of a larger batch of executions.
*/
export function batchedFetch({ serialize, ms = 10 }: Options) {
export function batchedFetch({ fetchStreaming, serialize, ms = 10 }: Options) {
// Uniquely identifies each function call in a batch operation
// so that the appropriate promise can be resolved / rejected later.
let id = 0;
Expand All @@ -67,7 +68,7 @@ export function batchedFetch({ serialize, ms = 10 }: Options) {
};

const runBatch = () => {
processBatch(batch);
processBatch(fetchStreaming, batch);
reset();
};

Expand Down Expand Up @@ -110,8 +111,8 @@ export function batchedFetch({ serialize, ms = 10 }: Options) {
* Runs the specified batch of functions on the server, then resolves
* the related promises.
*/
async function processBatch(batch: Batch) {
const { stream, promise } = npStart.plugins.bfetch.fetchStreaming({
async function processBatch(fetchStreaming: BfetchPublicApi['fetchStreaming'], batch: Batch) {
const { stream, promise } = fetchStreaming({
url: FUNCTIONS_URL,
body: JSON.stringify({
functions: Object.values(batch).map(({ request }) => request),
Expand All @@ -135,9 +136,11 @@ async function processBatch(batch: Batch) {
}
});

promise.catch(error => {
try {
await promise;
} catch (error) {
Object.values(batch).forEach(({ future }) => {
future.reject(error);
});
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

import { get, identity } from 'lodash';
// @ts-ignore
import { npSetup } from 'ui/new_platform';
import { npSetup, npStart } from 'ui/new_platform';
import { FUNCTIONS_URL } from './consts';
import { batchedFetch } from './batched_fetch';

Expand Down Expand Up @@ -67,7 +67,10 @@ export const loadLegacyServerFunctionWrappers = async () => {
const serverFunctionList = await npSetup.core.http.get(FUNCTIONS_URL);
const types = npSetup.plugins.expressions.__LEGACY.types.toJS();
const { serialize } = serializeProvider(types);
const batch = batchedFetch({ serialize });
const batch = batchedFetch({
fetchStreaming: npStart.plugins.bfetch.fetchStreaming,
serialize,
});

// For every sever-side function, register a client-side
// function that matches its definition, but which simply
Expand Down
8 changes: 6 additions & 2 deletions src/plugins/bfetch/public/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@ export type Setup = jest.Mocked<BfetchPublicSetup>;
export type Start = jest.Mocked<BfetchPublicStart>;

const createSetupContract = (): Setup => {
const setupContract: Setup = {};
const setupContract: Setup = {
fetchStreaming: jest.fn(),
};
return setupContract;
};

const createStartContract = (): Start => {
const startContract: Start = {};
const startContract: Start = {
fetchStreaming: jest.fn(),
};

return startContract;
};
Expand Down
1 change: 0 additions & 1 deletion src/plugins/bfetch/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/

import { CoreStart, PluginInitializerContext, CoreSetup, Plugin } from 'src/core/server';
import { ReplaySubject } from 'rxjs';
import { schema } from '@kbn/config-schema';
import { StreamingResponseHandler, removeLeadingSlash } from '../common';
import { createStreamingResponseStream } from './streaming';
Expand Down

0 comments on commit c0d2027

Please sign in to comment.