Skip to content

Commit

Permalink
[ML] Improve sampling and normalization of population chart. (#24402)
Browse files Browse the repository at this point in the history
This optimizes how contextual data is fetched for the population analysis chart.
  • Loading branch information
walterra authored Oct 24, 2018
1 parent f53541d commit 2c7caee
Showing 1 changed file with 52 additions and 23 deletions.
75 changes: 52 additions & 23 deletions x-pack/plugins/ml/public/services/results_service.js
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,7 @@ function getEventRateData(
// Extra query object can be supplied, or pass null if no additional query.
// Returned response contains a results property, which is an object
// of document counts against time (epoch millis).
const SAMPLER_TOP_TERMS_SHARD_SIZE = 200;
const SAMPLER_TOP_TERMS_SHARD_SIZE = 50000;
const ENTITY_AGGREGATION_SIZE = 10;
const AGGREGATION_MIN_DOC_COUNT = 1;
function getEventDistributionData(
Expand All @@ -1422,7 +1422,6 @@ function getEventDistributionData(
// Add criteria for the types, time range, entity fields,
// plus any additional supplied query.
const mustCriteria = [];
const shouldCriteria = [];

if (types && types.length) {
mustCriteria.push({ terms: { _type: types } });
Expand Down Expand Up @@ -1452,25 +1451,41 @@ function getEventDistributionData(

const body = {
query: {
bool: {
must: mustCriteria
// using function_score and random_score to get a random sample of documents.
// otherwise all documents would have the same score and the sampler aggregation
// would pick the first N documents instead of a random set.
function_score: {
query: {
bool: {
must: mustCriteria
}
},
functions: [
{
random_score: {
// static seed to get same randomized results on every request
seed: 10,
field: '_seq_no'
}
}
]
}
},
size: 0,
_source: {
excludes: []
},
aggs: {
byTime: {
date_histogram: {
field: timeFieldName,
interval: interval,
min_doc_count: AGGREGATION_MIN_DOC_COUNT
sample: {
sampler: {
shard_size: SAMPLER_TOP_TERMS_SHARD_SIZE
},
aggs: {
sample: {
sampler: {
shard_size: SAMPLER_TOP_TERMS_SHARD_SIZE
byTime: {
date_histogram: {
field: timeFieldName,
interval: interval,
min_doc_count: AGGREGATION_MIN_DOC_COUNT
},
aggs: {
entities: {
Expand All @@ -1487,13 +1502,8 @@ function getEventDistributionData(
}
};

if (shouldCriteria.length > 0) {
body.query.bool.should = shouldCriteria;
body.query.bool.minimum_should_match = shouldCriteria.length / 2;
}

if (metricFieldName !== undefined && metricFieldName !== '') {
body.aggs.byTime.aggs.sample.aggs.entities.aggs = {};
body.aggs.sample.aggs.byTime.aggs.entities.aggs = {};

const metricAgg = {
[metricFunction]: {
Expand All @@ -1504,21 +1514,40 @@ function getEventDistributionData(
if (metricFunction === 'percentiles') {
metricAgg[metricFunction].percents = [ML_MEDIAN_PERCENTS];
}
body.aggs.byTime.aggs.sample.aggs.entities.aggs.metric = metricAgg;
body.aggs.sample.aggs.byTime.aggs.entities.aggs.metric = metricAgg;
}

ml.esSearch({
index,
body
})
.then((resp) => {
// normalize data
const dataByTime = _.get(resp, ['aggregations', 'byTime', 'buckets'], []);
// Because of the sampling, results of metricFunctions which use sum or count
// can be significantly skewed. Taking into account totalHits we calculate a
// a factor to normalize results for these metricFunctions.
const totalHits = _.get(resp, ['hits', 'total'], 0);
const successfulShards = _.get(resp, ['_shards', 'successful'], 0);

let normalizeFactor = 1;
if (totalHits > (successfulShards * SAMPLER_TOP_TERMS_SHARD_SIZE)) {
normalizeFactor = totalHits / (successfulShards * SAMPLER_TOP_TERMS_SHARD_SIZE);
}

const dataByTime = _.get(resp, ['aggregations', 'sample', 'byTime', 'buckets'], []);
const data = dataByTime.reduce((d, dataForTime) => {
const date = +dataForTime.key;
const entities = _.get(dataForTime, ['sample', 'entities', 'buckets'], []);
const entities = _.get(dataForTime, ['entities', 'buckets'], []);
entities.forEach((entity) => {
const value = (metricFunction === 'count') ? entity.doc_count : entity.metric.value;
let value = (metricFunction === 'count') ? entity.doc_count : entity.metric.value;

if (
metricFunction === 'count'
|| metricFunction === 'distinct_count'
|| metricFunction === 'sum'
) {
value = value * normalizeFactor;
}

d.push({
date,
entity: entity.key,
Expand Down

0 comments on commit 2c7caee

Please sign in to comment.