Skip to content

Commit

Permalink
Optimize Canvas REST calls via batching (#29847) (#30380)
Browse files Browse the repository at this point in the history
Optimize the expression interpreter by introducing batching of remote function calls.
  • Loading branch information
chrisdavies authored Feb 7, 2019
1 parent 8b5ec47 commit 659e0b0
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 40 deletions.
111 changes: 111 additions & 0 deletions packages/kbn-interpreter/src/public/batched_fetch.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { FUNCTIONS_URL } from './consts';

/**
* Create a function which executes an Expression function on the
* server as part of a larger batch of executions.
*/
export function batchedFetch({ kfetch, serialize, ms = 10 }) {
// Uniquely identifies each function call in a batch operation
// so that the appropriate promise can be resolved / rejected later.
let id = 0;

// A map like { id: { future, request } }, which is used to
// track all of the function calls in a batch operation.
let batch = {};
let timeout;

const nextId = () => ++id;

const reset = () => {
id = 0;
batch = {};
timeout = undefined;
};

const runBatch = () => {
processBatch(kfetch, batch);
reset();
};

return ({ functionName, context, args }) => {
if (!timeout) {
timeout = setTimeout(runBatch, ms);
}

const id = nextId();
const future = createFuture();

batch[id] = {
future,
request: { id, functionName, args, context: serialize(context) },
};

return future.promise;
};
}

/**
* An externally resolvable / rejectable promise, used to make sure
* individual batch responses go to the correct caller.
*/
function createFuture() {
let resolve;
let reject;

return {
resolve(val) { return resolve(val); },
reject(val) { return reject(val); },
promise: new Promise((res, rej) => {
resolve = res;
reject = rej;
}),
};
}

/**
* Runs the specified batch of functions on the server, then resolves
* the related promises.
*/
async function processBatch(kfetch, batch) {
try {
const { results } = await kfetch({
pathname: FUNCTIONS_URL,
method: 'POST',
body: JSON.stringify({
functions: Object.values(batch).map(({ request }) => request),
}),
});

results.forEach(({ id, result }) => {
const { future } = batch[id];
if (result.statusCode && result.err) {
future.reject(result);
} else {
future.resolve(result);
}
});
} catch (err) {
Object.values(batch).forEach(({ future }) => {
future.reject(err);
});
}
}
21 changes: 21 additions & 0 deletions packages/kbn-interpreter/src/public/consts.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

// The server endpoint for retrieiving and running Canvas functions.
export const FUNCTIONS_URL = '/api/canvas/fns';
22 changes: 6 additions & 16 deletions packages/kbn-interpreter/src/public/interpreter.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,22 @@
import { interpreterProvider } from '../common/interpreter/interpret';
import { serializeProvider } from '../common/lib/serialize';
import { createHandlers } from './create_handlers';

export const FUNCTIONS_URL = '/api/canvas/fns';
import { batchedFetch } from './batched_fetch';
import { FUNCTIONS_URL } from './consts';

export async function initializeInterpreter(kfetch, typesRegistry, functionsRegistry) {
const serverFunctionList = await kfetch({ pathname: FUNCTIONS_URL });
const types = typesRegistry.toJS();
const { serialize } = serializeProvider(types);
const batch = batchedFetch({ kfetch, serialize });

// For every sever-side function, register a client-side
// function that matches its definition, but which simply
// calls the server-side function endpoint.
Object.keys(serverFunctionList).forEach(functionName => {
functionsRegistry.register(() => ({
...serverFunctionList[functionName],
async fn(context, args) {
const types = typesRegistry.toJS();
const { serialize } = serializeProvider(types);
const result = await kfetch({
pathname: `${FUNCTIONS_URL}/${functionName}`,
method: 'POST',
body: JSON.stringify({
args,
context: serialize(context),
}),
});

return result;
},
fn: (context, args) => batch({ functionName, args, context }),
}));
});

Expand Down
39 changes: 31 additions & 8 deletions packages/kbn-interpreter/src/public/interpreter.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
* under the License.
*/

import { initializeInterpreter, FUNCTIONS_URL } from './interpreter';
import { FUNCTIONS_URL } from './consts';
import { initializeInterpreter } from './interpreter';

jest.mock('../common/interpreter/interpret', () => ({
interpreterProvider: () => () => ({}),
Expand All @@ -42,10 +43,23 @@ describe('kbn-interpreter/interpreter', () => {
});

it('registers client-side functions that pass through to the server', async () => {
const kfetch = jest.fn(async () => ({
hello: { name: 'hello' },
world: { name: 'world' },
}));
const kfetch = jest.fn(async ({ method }) => {
if (method === 'POST') {
return {
results: [{
id: 1,
result: {
hello: 'world',
},
}],
};
}

return {
hello: { name: 'hello' },
world: { name: 'world' },
};
});

const register = jest.fn();

Expand All @@ -63,12 +77,21 @@ describe('kbn-interpreter/interpreter', () => {
const context = {};
const args = { quote: 'All we have to decide is what to do with the time that is given us.' };

await hello.fn(context, args);
const result = await hello.fn(context, args);

expect(result).toEqual({ hello: 'world' });

expect(kfetch).toHaveBeenCalledWith({
pathname: `${FUNCTIONS_URL}/hello`,
pathname: FUNCTIONS_URL,
method: 'POST',
body: JSON.stringify({ args, context }),
body: JSON.stringify({
functions: [{
id: 1,
functionName: 'hello',
args,
context,
}]
}),
});
});

Expand Down
1 change: 0 additions & 1 deletion src/dev/license_checker/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ export const DEV_ONLY_LICENSE_WHITELIST = [

// Globally overrides a license for a given package@version
export const LICENSE_OVERRIDES = {
'scriptjs@2.5.8': ['MIT'], // license header appended in the dist
'react-lib-adler32@1.0.1': ['BSD'], // adler32 extracted from react source,
'cycle@1.0.3': ['CC0-1.0'], // conversion to a public-domain like license
'jsts@1.1.2': ['Eclipse Distribution License - v 1.0'], //cf. https://github.com/bjornharrtell/jsts
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,69 @@ import Boom from 'boom';
import { serializeProvider } from '@kbn/interpreter/common';
import { API_ROUTE } from '../../common/constants';
import { createHandlers } from '../lib/create_handlers';
import Joi from 'joi';

/**
* Register the Canvas function endopints.
*
* @param {*} server - The Kibana server
*/
export function registerServerFunctions(server) {
// Execute functions, kind of RPC like.
getServerFunctions(server);
runServerFunctions(server);
}

/**
* Register the endpoint that executes a batch of functions, and sends the result back as a single response.
*
* @param {*} server - The Kibana server
*/
function runServerFunctions(server) {
server.route({
method: 'POST',
path: `${API_ROUTE}/fns/{functionName}`,
path: `${API_ROUTE}/fns`,
options: {
validate: {
payload: Joi.object({
functions: Joi.array().items(
Joi.object()
.keys({
id: Joi.number().required(),
functionName: Joi.string().required(),
args: Joi.object().default({}),
context: Joi.object().default({}),
}),
).required(),
}).required(),
},
},
async handler(req) {
const types = server.plugins.interpreter.types.toJS();
const { deserialize } = serializeProvider(types);
const { functionName } = req.params;
const { args, context } = req.payload;
const fnDef = server.plugins.interpreter.serverFunctions.toJS()[functionName];

if (!fnDef) {
throw Boom.notFound(`Function "${functionName}" could not be found.`);
}

const handlers = await createHandlers(req, server);
const result = await fnDef.fn(deserialize(context), args, handlers);
const { functions } = req.payload;

return result;
// Process each function individually, and bundle up respones / errors into
// the format expected by the front-end batcher.
const results = await Promise.all(functions.map(async ({ id, ...fnCall }) => {
const result = await runFunction(server, handlers, fnCall)
.catch(err => {
if (Boom.isBoom(err)) {
return { err, statusCode: err.statusCode, message: err.output.payload };
}
return { err: 'Internal Server Error', statusCode: 500, message: 'See server logs for details.' };
});
return { id, result };
}));

return { results };
},
});
}

// Give the client the list of server-functions.
/**
* Register the endpoint that returns the list of server-only functions.
* @param {*} server - The Kibana server
*/
function getServerFunctions(server) {
server.route({
method: 'GET',
path: `${API_ROUTE}/fns`,
Expand All @@ -54,3 +92,23 @@ export function registerServerFunctions(server) {
},
});
}

/**
* Run a single Canvas function.
*
* @param {*} server - The Kibana server object
* @param {*} handlers - The Canvas handlers
* @param {*} fnCall - Describes the function being run `{ functionName, args, context }`
*/
async function runFunction(server, handlers, fnCall) {
const { functionName, args, context } = fnCall;
const types = server.plugins.interpreter.types.toJS();
const { deserialize } = serializeProvider(types);
const fnDef = server.plugins.interpreter.serverFunctions.toJS()[functionName];

if (!fnDef) {
throw Boom.notFound(`Function "${functionName}" could not be found.`);
}

return fnDef.fn(deserialize(context), args, handlers);
}

0 comments on commit 659e0b0

Please sign in to comment.