From 2dc728d8d4828587ac25d306df059f3063a801d8 Mon Sep 17 00:00:00 2001 From: CJ Cenizal Date: Tue, 7 Aug 2018 16:29:09 -0700 Subject: [PATCH] Add support for multiple rollup searches. (#21755) --- .../default_search_strategy.js | 7 +- .../public/search/rollup_search_strategy.js | 76 +++++++++++++++---- .../rollup/server/routes/api/search.js | 12 +-- 3 files changed, 70 insertions(+), 25 deletions(-) diff --git a/src/ui/public/courier/search_strategy/default_search_strategy.js b/src/ui/public/courier/search_strategy/default_search_strategy.js index f5bd529b35c0b..564056518fbf6 100644 --- a/src/ui/public/courier/search_strategy/default_search_strategy.js +++ b/src/ui/public/courier/search_strategy/default_search_strategy.js @@ -33,24 +33,23 @@ function getAllFetchParams(searchRequests, Promise) { } async function serializeAllFetchParams(fetchParams, searchRequests, serializeFetchParams) { - const searcRequestsWithFetchParams = []; + const searchRequestsWithFetchParams = []; const failedSearchRequests = []; // Gather the fetch param responses from all the successful requests. fetchParams.forEach((result, index) => { if (result.resolved) { - searcRequestsWithFetchParams.push(result.resolved); + searchRequestsWithFetchParams.push(result.resolved); } else { const searchRequest = searchRequests[index]; - // TODO: All strategies will need to implement this. searchRequest.handleFailure(result.rejected); failedSearchRequests.push(searchRequest); } }); return { - serializedFetchParams: await serializeFetchParams(searcRequestsWithFetchParams), + serializedFetchParams: await serializeFetchParams(searchRequestsWithFetchParams), failedSearchRequests, }; } diff --git a/x-pack/plugins/rollup/public/search/rollup_search_strategy.js b/x-pack/plugins/rollup/public/search/rollup_search_strategy.js index 2ea87622b90f3..df99c7b41f34a 100644 --- a/x-pack/plugins/rollup/public/search/rollup_search_strategy.js +++ b/x-pack/plugins/rollup/public/search/rollup_search_strategy.js @@ -7,21 +7,52 @@ import { kfetchAbortable } from 'ui/kfetch'; import { SearchError } from 'ui/courier'; -export const rollupSearchStrategy = { - id: 'rollup', +function getAllFetchParams(searchRequests, Promise) { + return Promise.map(searchRequests, (searchRequest) => { + return Promise.try(searchRequest.getFetchParams, void 0, searchRequest) + .then((fetchParams) => { + return (searchRequest.fetchParams = fetchParams); + }) + .then(value => ({ resolved: value })) + .catch(error => ({ rejected: error })); + }); +} - search: async ({ searchRequests, Promise }) => { - // TODO: Batch together requests to hit a bulk rollup search endpoint. - const searchRequest = searchRequests[0]; - const searchParams = await searchRequest.getFetchParams(); - const indexPattern = searchParams.index.title || searchParams.index; +async function serializeAllFetchParams(fetchParams, searchRequests) { + const searchRequestsWithFetchParams = []; + const failedSearchRequests = []; + + // Gather the fetch param responses from all the successful requests. + fetchParams.forEach((result, index) => { + if (result.resolved) { + searchRequestsWithFetchParams.push(result.resolved); + } else { + const searchRequest = searchRequests[index]; + + searchRequest.handleFailure(result.rejected); + failedSearchRequests.push(searchRequest); + } + }); + + const serializedFetchParams = serializeFetchParams(searchRequestsWithFetchParams); + + return { + serializedFetchParams, + failedSearchRequests, + }; +} + +function serializeFetchParams(searchRequestsWithFetchParams) { + return JSON.stringify(searchRequestsWithFetchParams.map(searchRequestWithFetchParams => { + const indexPattern = searchRequestWithFetchParams.index.title || searchRequestWithFetchParams.index; const { body: { size, aggs, query: _query, }, - } = searchParams; + index, + } = searchRequestWithFetchParams; // TODO: Temporarily automatically assign same timezone and interval as what's defined by // the rollup job. This should be done by the visualization itself. @@ -30,7 +61,7 @@ export const rollupSearchStrategy = { Object.keys(subAggs).forEach(subAggName => { if (subAggName === 'date_histogram') { - const dateHistogramAgg = searchRequest.source.getField('index').typeMeta.aggs.date_histogram; + const dateHistogramAgg = index.typeMeta.aggs.date_histogram; const subAgg = subAggs[subAggName]; const field = subAgg.field; subAgg.time_zone = dateHistogramAgg[field].time_zone; @@ -39,30 +70,43 @@ export const rollupSearchStrategy = { }); }); - const index = indexPattern; const query = { 'size': size, 'aggregations': aggs, 'query': _query, }; + return { index: indexPattern, query }; + })); +} + +export const rollupSearchStrategy = { + id: 'rollup', + + search: async ({ searchRequests, Promise }) => { + // Flatten the searchSource within each searchRequest to get the fetch params, + // e.g. body, filters, index pattern, query. + const allFetchParams = await getAllFetchParams(searchRequests, Promise); + + // Serialize the fetch params into a format suitable for the body of an ES query. + const { + serializedFetchParams, + failedSearchRequests, + } = await serializeAllFetchParams(allFetchParams, searchRequests); + const { fetching, abort, } = kfetchAbortable({ pathname: '../api/rollup/search', method: 'POST', - body: JSON.stringify({ index, query }), + body: serializedFetchParams, }); - // TODO: Implement this. Search requests which can't be sent. - const failedSearchRequests = []; - return { - // Munge data into shape expected by consumer. searching: new Promise((resolve, reject) => { fetching.then(result => { - resolve([ result ]); + resolve(result); }).catch(error => { const { body: { statusText, error: title, message }, diff --git a/x-pack/plugins/rollup/server/routes/api/search.js b/x-pack/plugins/rollup/server/routes/api/search.js index a77040daad78a..ca7c8b4e63036 100644 --- a/x-pack/plugins/rollup/server/routes/api/search.js +++ b/x-pack/plugins/rollup/server/routes/api/search.js @@ -14,15 +14,17 @@ export function registerSearchRoute(server) { path: '/api/rollup/search', method: 'POST', handler: async (request, reply) => { - const { index, query } = request.payload; const callWithRequest = callWithRequestFactory(server, request); try { - const results = await callWithRequest('rollup.search', { - index, - body: query, - }); + const requests = request.payload.map(({ index, query }) => ( + callWithRequest('rollup.search', { + index, + body: query, + }) + )); + const results = await Promise.all(requests); reply(results); } catch(err) { if (isEsError(err)) {