From 0b4bef4489f161553ba3bcb3a69ac87eb7751873 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 25 Jun 2018 14:37:31 +0200 Subject: [PATCH 01/11] add ForecastStats to job stats --- .../core/ml/action/GetJobsStatsAction.java | 14 ++- .../core/ml/action/util/ForecastStats.java | 114 ++++++++++++++++++ .../GetJobStatsActionResponseTests.java | 2 +- .../ml/action/util/ForecastStatsTest.java | 27 +++++ .../xpack/ml/MachineLearningFeatureSet.java | 2 +- .../action/TransportGetJobsStatsAction.java | 46 ++++--- .../xpack/ml/job/persistence/JobProvider.java | 47 ++++++++ .../xpack/ml/stats/CountAccumulator.java | 36 ++++++ .../ml/{utils => stats}/StatsAccumulator.java | 20 ++- .../StatsAccumulatorTests.java | 3 +- 10 files changed, 288 insertions(+), 23 deletions(-) create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStats.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStatsTest.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/CountAccumulator.java rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/{utils => stats}/StatsAccumulator.java (70%) rename x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/{utils => stats}/StatsAccumulatorTests.java (95%) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java index ad34f5611383f..f95b752cefa17 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.Task; +import org.elasticsearch.xpack.core.ml.action.util.ForecastStats; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -154,6 +155,8 @@ public static class JobStats implements ToXContentObject, Writeable { @Nullable private ModelSizeStats modelSizeStats; @Nullable + private ForecastStats forecastStats; + @Nullable private TimeValue openTime; private JobState state; @Nullable @@ -161,11 +164,12 @@ public static class JobStats implements ToXContentObject, Writeable { @Nullable private String assignmentExplanation; - public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, JobState state, + public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, @Nullable ForecastStats forecastStats, JobState state, @Nullable DiscoveryNode node, @Nullable String assignmentExplanation, @Nullable TimeValue opentime) { this.jobId = Objects.requireNonNull(jobId); this.dataCounts = Objects.requireNonNull(dataCounts); this.modelSizeStats = modelSizeStats; + this.forecastStats = forecastStats; this.state = Objects.requireNonNull(state); this.node = node; this.assignmentExplanation = assignmentExplanation; @@ -193,6 +197,10 @@ public DataCounts getDataCounts() { public ModelSizeStats getModelSizeStats() { return modelSizeStats; } + + public ForecastStats getForecastStats() { + return forecastStats; + } public JobState getState() { return state; @@ -226,6 +234,10 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc if (modelSizeStats != null) { builder.field(MODEL_SIZE_STATS, modelSizeStats); } + if (forecastStats != null) { + builder.field("forecast", forecastStats); + } + builder.field(STATE, state.toString()); if (node != null) { builder.startObject(NODE); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStats.java new file mode 100644 index 0000000000000..e54e78c2fe330 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStats.java @@ -0,0 +1,114 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.action.util; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * A class to hold statistics about forecasts. + */ +public class ForecastStats implements ToXContentObject, Writeable { + + public static class Fields { + public static final String total = "total"; + public static final String memory = "memory"; + public static final String runtime = "runtime"; + public static final String records = "records"; + public static final String byStatus = "status"; + } + + private long total; + private final Map memoryStats; + private final Map recordStats; + private final Map runtimeStats; + private final Map statusCounts; + + public ForecastStats() { + this.total = 0; + this.memoryStats = new HashMap(); + this.recordStats = new HashMap(); + this.runtimeStats = new HashMap(); + this.statusCounts = new HashMap(); + } + + public ForecastStats(long total, Map memoryStats, Map recordStats, Map runtimeStats, + Map statusCounts) { + this.total = total; + this.memoryStats = Objects.requireNonNull(memoryStats); + this.recordStats = Objects.requireNonNull(recordStats); + this.runtimeStats = Objects.requireNonNull(runtimeStats); + this.statusCounts = Objects.requireNonNull(statusCounts); + } + + public ForecastStats(StreamInput in) throws IOException { + this.total = in.readLong(); + this.memoryStats = in.readMap(StreamInput::readString, StreamInput::readDouble); + this.recordStats = in.readMap(StreamInput::readString, StreamInput::readDouble); + this.runtimeStats = in.readMap(StreamInput::readString, StreamInput::readDouble); + this.statusCounts = in.readMap(StreamInput::readString, StreamInput::readLong); + } + + public void combine(ForecastStats otherStats) { + this.total += otherStats.total; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + doXContentBody(builder, params); + return builder.endObject(); + } + + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.total, total); + if (total > 0) { + builder.field(Fields.memory, memoryStats); + builder.field(Fields.records, recordStats); + builder.field(Fields.runtime, runtimeStats); + builder.field(Fields.byStatus, statusCounts); + } + return builder; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(total); + out.writeMap(memoryStats, StreamOutput::writeString, StreamOutput::writeDouble); + out.writeMap(recordStats, StreamOutput::writeString, StreamOutput::writeDouble); + out.writeMap(runtimeStats, StreamOutput::writeString, StreamOutput::writeDouble); + out.writeMap(statusCounts, StreamOutput::writeString, StreamOutput::writeLong); + } + + @Override + public int hashCode() { + return Objects.hash(total, memoryStats, recordStats, runtimeStats, statusCounts); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + ForecastStats other = (ForecastStats) obj; + return Objects.equals(total, other.total) && Objects.equals(memoryStats, other.memoryStats) + && Objects.equals(recordStats, other.recordStats) && Objects.equals(runtimeStats, other.runtimeStats) + && Objects.equals(statusCounts, other.statusCounts); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java index ff979a8570aba..14c7dd02a6b7c 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java @@ -56,7 +56,7 @@ protected Response createTestInstance() { if (randomBoolean()) { openTime = parseTimeValue(randomPositiveTimeValue(), "open_time-Test"); } - Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, jobState, node, explanation, openTime); + Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, null, jobState, node, explanation, openTime); jobStatsList.add(jobStats); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStatsTest.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStatsTest.java new file mode 100644 index 0000000000000..7f92d4ecd1fc7 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStatsTest.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.action.util; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.ml.action.util.ForecastStats; + + + +public class ForecastStatsTest extends AbstractWireSerializingTestCase { + + @Override + protected ForecastStats createTestInstance() { + + return null; + } + + @Override + protected Reader instanceReader() { + return null; + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index 05af1ffee17a4..fa4845028fb59 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -32,8 +32,8 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.NativeController; import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; +import org.elasticsearch.xpack.ml.stats.StatsAccumulator; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; -import org.elasticsearch.xpack.ml.utils.StatsAccumulator; import java.io.IOException; import java.util.Arrays; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index 1182953dfc31e..fda8d6bfc6e95 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; +import org.elasticsearch.xpack.core.ml.action.util.ForecastStats; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; @@ -106,9 +107,12 @@ protected void taskOperation(GetJobsStatsAction.Request request, TransportOpenJo JobState jobState = MlMetadata.getJobState(jobId, tasks); String assignmentExplanation = pTask.getAssignment().getExplanation(); TimeValue openTime = durationToTimeValue(processManager.jobOpenTime(task)); - GetJobsStatsAction.Response.JobStats jobStats = new GetJobsStatsAction.Response.JobStats(jobId, stats.get().v1(), - stats.get().v2(), jobState, node, assignmentExplanation, openTime); - listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD)); + gatherForecastStats(jobId, forecastStats -> { + GetJobsStatsAction.Response.JobStats jobStats = new GetJobsStatsAction.Response.JobStats(jobId, stats.get().v1(), + stats.get().v2(), forecastStats, jobState, node, assignmentExplanation, openTime); + listener.onResponse(new QueryPage<>(Collections.singletonList(jobStats), 1, Job.RESULTS_FIELD)); + }, listener::onFailure); + } else { listener.onResponse(new QueryPage<>(Collections.emptyList(), 0, Job.RESULTS_FIELD)); } @@ -131,25 +135,31 @@ void gatherStatsForClosedJobs(MlMetadata mlMetadata, GetJobsStatsAction.Request for (int i = 0; i < jobIds.size(); i++) { int slot = i; String jobId = jobIds.get(i); - gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> { - JobState jobState = MlMetadata.getJobState(jobId, tasks); - PersistentTasksCustomMetaData.PersistentTask pTask = MlMetadata.getJobTask(jobId, tasks); - String assignmentExplanation = null; - if (pTask != null) { - assignmentExplanation = pTask.getAssignment().getExplanation(); - } - jobStats.set(slot, new GetJobsStatsAction.Response.JobStats(jobId, dataCounts, modelSizeStats, jobState, null, - assignmentExplanation, null)); - if (counter.decrementAndGet() == 0) { - List results = response.getResponse().results(); - results.addAll(jobStats.asList()); - listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(), - new QueryPage<>(results, results.size(), Job.RESULTS_FIELD))); - } + gatherForecastStats(jobId, forecastStats -> { + gatherDataCountsAndModelSizeStats(jobId, (dataCounts, modelSizeStats) -> { + JobState jobState = MlMetadata.getJobState(jobId, tasks); + PersistentTasksCustomMetaData.PersistentTask pTask = MlMetadata.getJobTask(jobId, tasks); + String assignmentExplanation = null; + if (pTask != null) { + assignmentExplanation = pTask.getAssignment().getExplanation(); + } + jobStats.set(slot, new GetJobsStatsAction.Response.JobStats(jobId, dataCounts, modelSizeStats, forecastStats, jobState, + null, assignmentExplanation, null)); + if (counter.decrementAndGet() == 0) { + List results = response.getResponse().results(); + results.addAll(jobStats.asList()); + listener.onResponse(new GetJobsStatsAction.Response(response.getTaskFailures(), response.getNodeFailures(), + new QueryPage<>(results, results.size(), Job.RESULTS_FIELD))); + } + }, listener::onFailure); }, listener::onFailure); } } + void gatherForecastStats(String jobId, Consumer handler, Consumer errorHandler) { + jobProvider.getForecastStats(jobId, handler, errorHandler); + } + void gatherDataCountsAndModelSizeStats(String jobId, BiConsumer handler, Consumer errorHandler) { jobProvider.dataCounts(jobId, dataCounts -> { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index 578ddd1efc78a..afca4606170eb 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -63,6 +63,8 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.metrics.stats.Stats; import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.FieldSortBuilder; @@ -73,6 +75,7 @@ import org.elasticsearch.xpack.core.ml.action.GetCategoriesAction; import org.elasticsearch.xpack.core.ml.action.GetInfluencersAction; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; +import org.elasticsearch.xpack.core.ml.action.util.ForecastStats; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.calendars.Calendar; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; @@ -99,6 +102,8 @@ import org.elasticsearch.xpack.ml.job.categorization.GrokPatternCreator; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; +import org.elasticsearch.xpack.ml.stats.CountAccumulator; +import org.elasticsearch.xpack.ml.stats.StatsAccumulator; import java.io.IOException; import java.io.InputStream; @@ -1112,6 +1117,48 @@ public void getForecastRequestStats(String jobId, String forecastId, Consumer handler.accept(result.result), errorHandler, () -> null); } + public void getForecastStats(String jobId, Consumer handler, Consumer errorHandler) { + String indexName = AnomalyDetectorsIndex.jobResultsAliasedName(jobId); + + QueryBuilder termQuery = new TermsQueryBuilder(Result.RESULT_TYPE.getPreferredName(), ForecastRequestStats.RESULT_TYPE_VALUE); + QueryBuilder jobQuery = new TermsQueryBuilder(Job.ID.getPreferredName(), jobId); + QueryBuilder finalQuery = new BoolQueryBuilder().filter(termQuery).filter(jobQuery); + + SearchRequest searchRequest = new SearchRequest(indexName); + searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(searchRequest.indicesOptions())); + SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); + sourceBuilder.query(finalQuery); + sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.memory).field(ForecastRequestStats.MEMORY_USAGE.getPreferredName())); + sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.records).field(ForecastRequestStats.PROCESSED_RECORD_COUNT.getPreferredName())); + sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.runtime).field(ForecastRequestStats.PROCESSING_TIME_MS.getPreferredName())); + sourceBuilder.aggregation(AggregationBuilders.terms(ForecastStats.Fields.byStatus).field(ForecastRequestStats.STATUS.getPreferredName())); + sourceBuilder.size(0); + + searchRequest.source(sourceBuilder); + + executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, + ActionListener.wrap(searchResponse -> { + + long totalHits = searchResponse.getHits().getTotalHits(); + Map aggregations = searchResponse.getAggregations().asMap(); + + StatsAccumulator memoryStats = StatsAccumulator + .fromStatsAggregation((Stats) aggregations.get(ForecastStats.Fields.memory)); + StatsAccumulator recordStats = StatsAccumulator + .fromStatsAggregation((Stats) aggregations.get(ForecastStats.Fields.records)); + StatsAccumulator runtimeStats = StatsAccumulator + .fromStatsAggregation((Stats) aggregations.get(ForecastStats.Fields.runtime)); + CountAccumulator statusCount = CountAccumulator + .fromTermsAggregation((StringTerms) aggregations.get(ForecastStats.Fields.byStatus)); + + ForecastStats forecastStats = new ForecastStats(totalHits, memoryStats.asMap(), recordStats.asMap(), + runtimeStats.asMap(), statusCount.asMap()); + + handler.accept(forecastStats); + }, errorHandler), client::search); + + } + public void updateCalendar(String calendarId, Set jobIdsToAdd, Set jobIdsToRemove, Consumer handler, Consumer errorHandler) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/CountAccumulator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/CountAccumulator.java new file mode 100644 index 0000000000000..6e10b53d6f3ec --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/CountAccumulator.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.ml.stats; + +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; + +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Collectors; + +public class CountAccumulator { + + private final Map counts; + + public CountAccumulator() { + this.counts = new HashMap(); + } + + private CountAccumulator(Map counts) { + this.counts = counts; + } + + public Map asMap() { + return counts; + } + + public static CountAccumulator fromTermsAggregation(StringTerms termsAggregation) { + return new CountAccumulator(termsAggregation.getBuckets().stream() + .collect(Collectors.toMap(bucket -> bucket.getKeyAsString(), bucket -> bucket.getDocCount()))); + } + +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/StatsAccumulator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/StatsAccumulator.java similarity index 70% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/StatsAccumulator.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/StatsAccumulator.java index 1f1df147d80a1..ac13f95790e36 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/utils/StatsAccumulator.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/StatsAccumulator.java @@ -3,7 +3,9 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.utils; +package org.elasticsearch.xpack.ml.stats; + +import org.elasticsearch.search.aggregations.metrics.stats.Stats; import java.util.HashMap; import java.util.Map; @@ -22,6 +24,9 @@ public class StatsAccumulator { private double total; private Double min; private Double max; + + public StatsAccumulator() { + } public void add(double value) { count++; @@ -54,4 +59,17 @@ public Map asMap() { map.put(TOTAL, getTotal()); return map; } + + private StatsAccumulator(long count, double total, double min, double max) { + this.count = count; + this.total = total; + this.min = min; + this.max = max; + } + + public static StatsAccumulator fromStatsAggregation(Stats statsAggregation) { + return new StatsAccumulator(statsAggregation.getCount(), statsAggregation.getSum(), statsAggregation.getMin(), + statsAggregation.getMax()); + } } + diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/StatsAccumulatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/stats/StatsAccumulatorTests.java similarity index 95% rename from x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/StatsAccumulatorTests.java rename to x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/stats/StatsAccumulatorTests.java index ae9b6a7360c13..70733e0cc573e 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/utils/StatsAccumulatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/stats/StatsAccumulatorTests.java @@ -3,9 +3,10 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.utils; +package org.elasticsearch.xpack.ml.stats; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.ml.stats.StatsAccumulator; import java.util.HashMap; import java.util.Map; From 3eceac9e473afba915cde99df832850060c6f689 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 28 Jun 2018 11:19:28 +0200 Subject: [PATCH 02/11] implement stats collection of forecasts, extending the getjobstats api as well as xpack/_usage. --- .../ml/MachineLearningFeatureSetUsage.java | 1 + .../core/ml/action/GetJobsStatsAction.java | 5 +- .../core/ml/action/util/ForecastStats.java | 114 ------------ .../xpack/core/ml/stats/CountAccumulator.java | 82 +++++++++ .../xpack/core/ml/stats/ForecastStats.java | 153 +++++++++++++++++ .../xpack/core/ml/stats/StatsAccumulator.java | 126 ++++++++++++++ .../GetJobStatsActionResponseTests.java | 11 +- .../ml/action/util/ForecastStatsTest.java | 27 --- .../core/ml/stats/CountAccumulatorTests.java | 100 +++++++++++ .../core/ml/stats/ForecastStatsTests.java | 162 ++++++++++++++++++ .../core/ml/stats/StatsAccumulatorTests.java | 160 +++++++++++++++++ .../xpack/ml/MachineLearningFeatureSet.java | 17 +- .../action/TransportGetJobsStatsAction.java | 2 +- .../xpack/ml/job/persistence/JobProvider.java | 26 +-- .../xpack/ml/stats/CountAccumulator.java | 36 ---- .../xpack/ml/stats/StatsAccumulator.java | 75 -------- .../xpack/ml/stats/StatsAccumulatorTests.java | 64 ------- 17 files changed, 824 insertions(+), 337 deletions(-) delete mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStats.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulator.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulator.java delete mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStatsTest.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulatorTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulatorTests.java delete mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/CountAccumulator.java delete mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/StatsAccumulator.java delete mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/stats/StatsAccumulatorTests.java diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java index 1779ca703a5d7..ebcaab8495eba 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/MachineLearningFeatureSetUsage.java @@ -22,6 +22,7 @@ public class MachineLearningFeatureSetUsage extends XPackFeatureSet.Usage { public static final String DATAFEEDS_FIELD = "datafeeds"; public static final String COUNT = "count"; public static final String DETECTORS = "detectors"; + public static final String FORECASTS = "forecasts"; public static final String MODEL_SIZE = "model_size"; private final Map jobsUsage; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java index f95b752cefa17..75160a2fd5ce0 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java @@ -26,12 +26,12 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.tasks.Task; -import org.elasticsearch.xpack.core.ml.action.util.ForecastStats; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import java.io.IOException; @@ -47,6 +47,7 @@ public class GetJobsStatsAction extends Action { private static final String DATA_COUNTS = "data_counts"; private static final String MODEL_SIZE_STATS = "model_size_stats"; + private static final String FORECASTS_STATS = "forecasts_stats"; private static final String STATE = "state"; private static final String NODE = "node"; @@ -235,7 +236,7 @@ public XContentBuilder toUnwrappedXContent(XContentBuilder builder) throws IOExc builder.field(MODEL_SIZE_STATS, modelSizeStats); } if (forecastStats != null) { - builder.field("forecast", forecastStats); + builder.field(FORECASTS_STATS, forecastStats); } builder.field(STATE, state.toString()); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStats.java deleted file mode 100644 index e54e78c2fe330..0000000000000 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStats.java +++ /dev/null @@ -1,114 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.core.ml.action.util; - -import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.io.stream.Writeable; -import org.elasticsearch.common.xcontent.ToXContentObject; -import org.elasticsearch.common.xcontent.XContentBuilder; -import java.io.IOException; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; - -/** - * A class to hold statistics about forecasts. - */ -public class ForecastStats implements ToXContentObject, Writeable { - - public static class Fields { - public static final String total = "total"; - public static final String memory = "memory"; - public static final String runtime = "runtime"; - public static final String records = "records"; - public static final String byStatus = "status"; - } - - private long total; - private final Map memoryStats; - private final Map recordStats; - private final Map runtimeStats; - private final Map statusCounts; - - public ForecastStats() { - this.total = 0; - this.memoryStats = new HashMap(); - this.recordStats = new HashMap(); - this.runtimeStats = new HashMap(); - this.statusCounts = new HashMap(); - } - - public ForecastStats(long total, Map memoryStats, Map recordStats, Map runtimeStats, - Map statusCounts) { - this.total = total; - this.memoryStats = Objects.requireNonNull(memoryStats); - this.recordStats = Objects.requireNonNull(recordStats); - this.runtimeStats = Objects.requireNonNull(runtimeStats); - this.statusCounts = Objects.requireNonNull(statusCounts); - } - - public ForecastStats(StreamInput in) throws IOException { - this.total = in.readLong(); - this.memoryStats = in.readMap(StreamInput::readString, StreamInput::readDouble); - this.recordStats = in.readMap(StreamInput::readString, StreamInput::readDouble); - this.runtimeStats = in.readMap(StreamInput::readString, StreamInput::readDouble); - this.statusCounts = in.readMap(StreamInput::readString, StreamInput::readLong); - } - - public void combine(ForecastStats otherStats) { - this.total += otherStats.total; - } - - @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - doXContentBody(builder, params); - return builder.endObject(); - } - - public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { - builder.field(Fields.total, total); - if (total > 0) { - builder.field(Fields.memory, memoryStats); - builder.field(Fields.records, recordStats); - builder.field(Fields.runtime, runtimeStats); - builder.field(Fields.byStatus, statusCounts); - } - return builder; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - out.writeLong(total); - out.writeMap(memoryStats, StreamOutput::writeString, StreamOutput::writeDouble); - out.writeMap(recordStats, StreamOutput::writeString, StreamOutput::writeDouble); - out.writeMap(runtimeStats, StreamOutput::writeString, StreamOutput::writeDouble); - out.writeMap(statusCounts, StreamOutput::writeString, StreamOutput::writeLong); - } - - @Override - public int hashCode() { - return Objects.hash(total, memoryStats, recordStats, runtimeStats, statusCounts); - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - - if (getClass() != obj.getClass()) { - return false; - } - - ForecastStats other = (ForecastStats) obj; - return Objects.equals(total, other.total) && Objects.equals(memoryStats, other.memoryStats) - && Objects.equals(recordStats, other.recordStats) && Objects.equals(runtimeStats, other.runtimeStats) - && Objects.equals(statusCounts, other.statusCounts); - } -} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulator.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulator.java new file mode 100644 index 0000000000000..5da402f50fd4c --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulator.java @@ -0,0 +1,82 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Map.Entry; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * An accumulator for simple counts where statistical measures + * are not of interest. + */ +public class CountAccumulator implements Writeable { + + private Map counts; + + public CountAccumulator() { + this.counts = new HashMap(); + } + + private CountAccumulator(Map counts) { + this.counts = counts; + } + + public CountAccumulator(StreamInput in) throws IOException { + this.counts = in.readMap(StreamInput::readString, StreamInput::readLong); + } + + public void merge(CountAccumulator other) { + counts = Stream.of(counts, other.counts).flatMap(m -> m.entrySet().stream()) + .collect(Collectors.toMap(Entry::getKey, Entry::getValue, (x, y) -> x + y)); + } + + public void add(String key, Long count) { + counts.put(key, counts.getOrDefault(key, 0l) + count); + } + + public Map asMap() { + return counts; + } + + public static CountAccumulator fromTermsAggregation(StringTerms termsAggregation) { + return new CountAccumulator(termsAggregation.getBuckets().stream() + .collect(Collectors.toMap(bucket -> bucket.getKeyAsString(), bucket -> bucket.getDocCount()))); + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(counts, StreamOutput::writeString, StreamOutput::writeLong); + } + + @Override + public int hashCode() { + return Objects.hash(counts); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + CountAccumulator other = (CountAccumulator) obj; + return Objects.equals(counts, other.counts); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java new file mode 100644 index 0000000000000..89a8bf30c75f0 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java @@ -0,0 +1,153 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * A class to hold statistics about forecasts. + */ +public class ForecastStats implements ToXContentObject, Writeable { + + public static class Fields { + public static final String TOTAL = "total"; + public static final String JOBS = "jobs"; + public static final String MEMORY = "memory_bytes"; + public static final String RUNTIME = "processing_time_ms"; + public static final String RECORDS = "records"; + public static final String STATUSES = "status"; + } + + private long total; + private long jobsWithAtleastOneForecast; + private StatsAccumulator memoryStats; + private StatsAccumulator recordStats; + private StatsAccumulator runtimeStats; + private CountAccumulator statusCounts; + + public ForecastStats() { + this.total = 0; + this.jobsWithAtleastOneForecast = 0; + this.memoryStats = new StatsAccumulator(); + this.recordStats = new StatsAccumulator(); + this.runtimeStats = new StatsAccumulator(); + this.statusCounts = new CountAccumulator(); + } + + /* + * Construct ForecastStats for 1 job. Additional statistics can be added by merging other ForecastStats into it. + */ + public ForecastStats(long total, StatsAccumulator memoryStats, StatsAccumulator recordStats, StatsAccumulator runtimeStats, + CountAccumulator statusCounts) { + this.total = total; + this.jobsWithAtleastOneForecast = total > 0 ? 1 : 0; + this.memoryStats = Objects.requireNonNull(memoryStats); + this.recordStats = Objects.requireNonNull(recordStats); + this.runtimeStats = Objects.requireNonNull(runtimeStats); + this.statusCounts = Objects.requireNonNull(statusCounts); + } + + public ForecastStats(StreamInput in) throws IOException { + this.total = in.readLong(); + this.jobsWithAtleastOneForecast = in.readLong(); + this.memoryStats = new StatsAccumulator(in); + this.recordStats = new StatsAccumulator(in); + this.runtimeStats = new StatsAccumulator(in); + this.statusCounts = new CountAccumulator(in); + } + + public void merge(ForecastStats other) { + if (other == null) { + return; + } + total += other.total; + if (other.total > 0) { + ++jobsWithAtleastOneForecast; + } + memoryStats.merge(other.memoryStats); + recordStats.merge(other.recordStats); + runtimeStats.merge(other.runtimeStats); + statusCounts.merge(other.statusCounts); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + doXContentBody(builder, params); + return builder.endObject(); + } + + public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.TOTAL, total); + if (total > 0) { + builder.field(Fields.MEMORY, memoryStats.asMap()); + builder.field(Fields.RECORDS, recordStats.asMap()); + builder.field(Fields.RUNTIME, runtimeStats.asMap()); + builder.field(Fields.STATUSES, statusCounts.asMap()); + } + if (jobsWithAtleastOneForecast > 1) { + builder.field(Fields.JOBS, jobsWithAtleastOneForecast); + } + + return builder; + } + + public Map asMap() { + Map map = new HashMap<>(); + map.put(Fields.TOTAL, total); + if (total > 0) { + map.put(Fields.MEMORY, memoryStats.asMap()); + map.put(Fields.RECORDS, recordStats.asMap()); + map.put(Fields.RUNTIME, runtimeStats.asMap()); + map.put(Fields.STATUSES, statusCounts.asMap()); + } + if (jobsWithAtleastOneForecast > 1) { + map.put(Fields.JOBS, jobsWithAtleastOneForecast); + } + return map; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(total); + out.writeLong(jobsWithAtleastOneForecast); + memoryStats.writeTo(out); + recordStats.writeTo(out); + runtimeStats.writeTo(out); + statusCounts.writeTo(out); + } + + @Override + public int hashCode() { + return Objects.hash(total, jobsWithAtleastOneForecast, memoryStats, recordStats, runtimeStats, statusCounts); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + ForecastStats other = (ForecastStats) obj; + return Objects.equals(total, other.total) && Objects.equals(jobsWithAtleastOneForecast, other.jobsWithAtleastOneForecast) + && Objects.equals(memoryStats, other.memoryStats) && Objects.equals(recordStats, other.recordStats) + && Objects.equals(runtimeStats, other.runtimeStats) && Objects.equals(statusCounts, other.statusCounts); + } +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulator.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulator.java new file mode 100644 index 0000000000000..fe987db48ce17 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulator.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.search.aggregations.metrics.stats.Stats; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Helper class to collect min, max, avg and total statistics for a quantity + */ +public class StatsAccumulator implements Writeable { + + public static class Fields { + public static final String MIN = "min"; + public static final String MAX = "max"; + public static final String AVG = "avg"; + public static final String TOTAL = "total"; + } + + private long count; + private double total; + private Double min; + private Double max; + + public StatsAccumulator() { + } + + public StatsAccumulator(StreamInput in) throws IOException { + count = in.readLong(); + total = in.readDouble(); + min = in.readOptionalDouble(); + max = in.readOptionalDouble(); + } + + private StatsAccumulator(long count, double total, double min, double max) { + this.count = count; + this.total = total; + this.min = min; + this.max = max; + } + + public void add(double value) { + count++; + total += value; + min = min == null ? value : (value < min ? value : min); + max = max == null ? value : (value > max ? value : max); + } + + public double getMin() { + return min == null ? 0.0 : min; + } + + public double getMax() { + return max == null ? 0.0 : max; + } + + public double getAvg() { + return count == 0.0 ? 0.0 : total/count; + } + + public double getTotal() { + return total; + } + + public void merge(StatsAccumulator other) { + count += other.count; + total += other.total; + + // note: not using Math.min/max as some internal prefetch optimization causes an NPE + min = min == null ? other.min : (other.min == null ? min : other.min < min ? other.min : min); + max = max == null ? other.max : (other.max == null ? max : other.max > max ? other.max : max); + } + + public Map asMap() { + Map map = new HashMap<>(); + map.put(Fields.MIN, getMin()); + map.put(Fields.MAX, getMax()); + map.put(Fields.AVG, getAvg()); + map.put(Fields.TOTAL, getTotal()); + return map; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeLong(count); + out.writeDouble(total); + out.writeOptionalDouble(min); + out.writeOptionalDouble(max); + } + + public static StatsAccumulator fromStatsAggregation(Stats statsAggregation) { + return new StatsAccumulator(statsAggregation.getCount(), statsAggregation.getSum(), statsAggregation.getMin(), + statsAggregation.getMax()); + } + + @Override + public int hashCode() { + return Objects.hash(count, total, min, max); + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + StatsAccumulator other = (StatsAccumulator) obj; + return Objects.equals(count, other.count) && Objects.equals(total, other.total) && Objects.equals(min, other.min) + && Objects.equals(max, other.max); + } +} + diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java index 14c7dd02a6b7c..86a5b990728f8 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetJobStatsActionResponseTests.java @@ -17,6 +17,8 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCountsTests; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests; import java.net.InetAddress; import java.util.ArrayList; @@ -42,6 +44,12 @@ protected Response createTestInstance() { if (randomBoolean()) { sizeStats = new ModelSizeStats.Builder("foo").build(); } + + ForecastStats forecastStats = null; + if (randomBoolean()) { + forecastStats = new ForecastStatsTests().createTestInstance(); + } + JobState jobState = randomFrom(EnumSet.allOf(JobState.class)); DiscoveryNode node = null; @@ -56,7 +64,8 @@ protected Response createTestInstance() { if (randomBoolean()) { openTime = parseTimeValue(randomPositiveTimeValue(), "open_time-Test"); } - Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, null, jobState, node, explanation, openTime); + Response.JobStats jobStats = new Response.JobStats(jobId, dataCounts, sizeStats, forecastStats, jobState, node, explanation, + openTime); jobStatsList.add(jobStats); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStatsTest.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStatsTest.java deleted file mode 100644 index 7f92d4ecd1fc7..0000000000000 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/util/ForecastStatsTest.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.core.ml.action.util; - -import org.elasticsearch.common.io.stream.Writeable.Reader; -import org.elasticsearch.test.AbstractWireSerializingTestCase; -import org.elasticsearch.xpack.core.ml.action.util.ForecastStats; - - - -public class ForecastStatsTest extends AbstractWireSerializingTestCase { - - @Override - protected ForecastStats createTestInstance() { - - return null; - } - - @Override - protected Reader instanceReader() { - return null; - } -} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulatorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulatorTests.java new file mode 100644 index 0000000000000..1b5729c419b00 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulatorTests.java @@ -0,0 +1,100 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.stats; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms.Bucket; +import org.elasticsearch.test.AbstractWireSerializingTestCase; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class CountAccumulatorTests extends AbstractWireSerializingTestCase { + + public void testEmpty() { + CountAccumulator accumulator = new CountAccumulator(); + assertEquals(Collections.emptyMap(), accumulator.asMap()); + } + + public void testAdd() { + CountAccumulator accumulator = new CountAccumulator(); + accumulator.add("a", 22l); + accumulator.add("a", 10l); + accumulator.add("a", 15l); + accumulator.add("a", -12l); + accumulator.add("a", 0l); + + accumulator.add("b", 13l); + accumulator.add("b", 1l); + accumulator.add("b", 40000l); + accumulator.add("b", -2l); + accumulator.add("b", 333l); + + assertEquals(35l, accumulator.asMap().get("a").longValue()); + assertEquals(40345l, accumulator.asMap().get("b").longValue()); + assertEquals(2, accumulator.asMap().size()); + } + + public void testMerge() { + CountAccumulator accumulator = new CountAccumulator(); + accumulator.add("a", 13l); + accumulator.add("b", 42l); + + CountAccumulator accumulator2 = new CountAccumulator(); + accumulator2.add("a", 12l); + accumulator2.add("c", -1l); + + accumulator.merge(accumulator2); + + assertEquals(25l, accumulator.asMap().get("a").longValue()); + assertEquals(42l, accumulator.asMap().get("b").longValue()); + assertEquals(-1l, accumulator.asMap().get("c").longValue()); + assertEquals(3, accumulator.asMap().size()); + } + + public void testFromTermsAggregation() { + StringTerms termsAggregation = mock(StringTerms.class); + + Bucket bucket1 = mock(Bucket.class); + when(bucket1.getKeyAsString()).thenReturn("a"); + when(bucket1.getDocCount()).thenReturn(10l); + + Bucket bucket2 = mock(Bucket.class); + when(bucket2.getKeyAsString()).thenReturn("b"); + when(bucket2.getDocCount()).thenReturn(33l); + + List buckets = Arrays.asList(bucket1, bucket2); + when(termsAggregation.getBuckets()).thenReturn(buckets); + + CountAccumulator accumulator = CountAccumulator.fromTermsAggregation(termsAggregation); + + assertEquals(10l, accumulator.asMap().get("a").longValue()); + assertEquals(33l, accumulator.asMap().get("b").longValue()); + assertEquals(2, accumulator.asMap().size()); + } + + @Override + public CountAccumulator createTestInstance() { + CountAccumulator accumulator = new CountAccumulator(); + for (int i = 0; i < randomInt(10); ++i) { + accumulator.add(randomAlphaOfLengthBetween(1, 20), randomLongBetween(1l, 100l)); + } + + return accumulator; + } + + @Override + protected Reader instanceReader() { + return CountAccumulator::new; + } + +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java new file mode 100644 index 0000000000000..7b0b8f3c1ff76 --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java @@ -0,0 +1,162 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +package org.elasticsearch.xpack.core.ml.stats; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats.Fields; + +import java.io.IOException; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class ForecastStatsTests extends AbstractWireSerializingTestCase { + + public void testEmpty() throws IOException { + ForecastStats forecastStats = new ForecastStats(); + + XContentBuilder builder = JsonXContent.contentBuilder(); + forecastStats.toXContent(builder, ToXContent.EMPTY_PARAMS); + + XContentParser parser = createParser(builder); + Map properties = parser.map(); + assertTrue(properties.containsKey(Fields.TOTAL)); + assertFalse(properties.containsKey(Fields.MEMORY)); + assertFalse(properties.containsKey(Fields.RECORDS)); + assertFalse(properties.containsKey(Fields.RUNTIME)); + assertFalse(properties.containsKey(Fields.JOBS)); + assertFalse(properties.containsKey(Fields.STATUSES)); + } + + public void testMerge() { + StatsAccumulator memoryStats = new StatsAccumulator(); + memoryStats.add(1000); + memoryStats.add(45000); + memoryStats.add(2300); + + StatsAccumulator recordStats = new StatsAccumulator(); + recordStats.add(10); + recordStats.add(0); + recordStats.add(20); + + StatsAccumulator runtimeStats = new StatsAccumulator(); + runtimeStats.add(0); + runtimeStats.add(0); + runtimeStats.add(10); + + CountAccumulator statusStats = new CountAccumulator(); + statusStats.add("finished", 2l); + statusStats.add("failed", 5l); + + ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, statusStats); + + StatsAccumulator memoryStats2 = new StatsAccumulator(); + memoryStats2.add(10); + memoryStats2.add(30); + + StatsAccumulator recordStats2 = new StatsAccumulator(); + recordStats2.add(10); + recordStats2.add(0); + + StatsAccumulator runtimeStats2 = new StatsAccumulator(); + runtimeStats2.add(96); + runtimeStats2.add(0); + + CountAccumulator statusStats2 = new CountAccumulator(); + statusStats2.add("finished", 2l); + statusStats2.add("scheduled", 1l); + + ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, statusStats2); + + forecastStats.merge(forecastStats2); + + Map mergedStats = forecastStats.asMap(); + + assertEquals(2l, mergedStats.get(Fields.JOBS)); + assertEquals(5l, mergedStats.get(Fields.TOTAL)); + + @SuppressWarnings("unchecked") + Map mergedMemoryStats = (Map) mergedStats.get(Fields.MEMORY); + + assertTrue(mergedMemoryStats != null); + assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.AVG), equalTo(9668.0)); + assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MAX), equalTo(45000.0)); + assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MIN), equalTo(10.0)); + + @SuppressWarnings("unchecked") + Map mergedRecordStats = (Map) mergedStats.get(Fields.RECORDS); + + assertTrue(mergedRecordStats != null); + assertThat(mergedRecordStats.get(StatsAccumulator.Fields.AVG), equalTo(8.0)); + assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MAX), equalTo(20.0)); + assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0)); + + @SuppressWarnings("unchecked") + Map mergedRuntimeStats = (Map) mergedStats.get(Fields.RUNTIME); + + assertTrue(mergedRuntimeStats != null); + assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.AVG), equalTo(21.2)); + assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MAX), equalTo(96.0)); + assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0)); + + @SuppressWarnings("unchecked") + Map mergedCountStats = (Map) mergedStats.get(Fields.STATUSES); + + assertTrue(mergedCountStats != null); + assertEquals(3, mergedCountStats.size()); + assertEquals(4, mergedCountStats.get("finished").longValue()); + assertEquals(5, mergedCountStats.get("failed").longValue()); + assertEquals(1, mergedCountStats.get("scheduled").longValue()); + } + + public void testUniqueCountOfJobs() { + ForecastStats forecastStats = createForecastStats(5, 10); + ForecastStats forecastStats2 = createForecastStats(2, 8); + ForecastStats forecastStats3 = createForecastStats(0, 0); + ForecastStats forecastStats4 = createForecastStats(0, 0); + ForecastStats forecastStats5 = createForecastStats(1, 12); + + forecastStats.merge(forecastStats2); + forecastStats.merge(forecastStats3); + forecastStats.merge(forecastStats4); + forecastStats.merge(forecastStats5); + + assertEquals(3l, forecastStats.asMap().get(Fields.JOBS)); + } + + @Override + public ForecastStats createTestInstance() { + return createForecastStats(1, 22); + } + + @Override + protected Reader instanceReader() { + return ForecastStats::new; + } + + private ForecastStats createForecastStats(long minTotal, long maxTotal) { + ForecastStats forecastStats = new ForecastStats(randomLongBetween(minTotal, maxTotal), createStatsAccumulator(), + createStatsAccumulator(), createStatsAccumulator(), createCountAccumulator()); + + return forecastStats; + } + + private StatsAccumulator createStatsAccumulator() { + return new StatsAccumulatorTests().createTestInstance(); + } + + private CountAccumulator createCountAccumulator() { + return new CountAccumulatorTests().createTestInstance(); + + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulatorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulatorTests.java new file mode 100644 index 0000000000000..a81176d3f374a --- /dev/null +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulatorTests.java @@ -0,0 +1,160 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.stats; + +import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.search.aggregations.metrics.stats.Stats; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class StatsAccumulatorTests extends AbstractWireSerializingTestCase { + + public void testGivenNoValues() { + StatsAccumulator accumulator = new StatsAccumulator(); + assertThat(accumulator.getMin(), equalTo(0.0)); + assertThat(accumulator.getMax(), equalTo(0.0)); + assertThat(accumulator.getTotal(), equalTo(0.0)); + assertThat(accumulator.getAvg(), equalTo(0.0)); + } + + public void testGivenPositiveValues() { + StatsAccumulator accumulator = new StatsAccumulator(); + + for (int i = 1; i <= 10; i++) { + accumulator.add(i); + } + + assertThat(accumulator.getMin(), equalTo(1.0)); + assertThat(accumulator.getMax(), equalTo(10.0)); + assertThat(accumulator.getTotal(), equalTo(55.0)); + assertThat(accumulator.getAvg(), equalTo(5.5)); + } + + public void testGivenNegativeValues() { + StatsAccumulator accumulator = new StatsAccumulator(); + + for (int i = 1; i <= 10; i++) { + accumulator.add(-1 * i); + } + + assertThat(accumulator.getMin(), equalTo(-10.0)); + assertThat(accumulator.getMax(), equalTo(-1.0)); + assertThat(accumulator.getTotal(), equalTo(-55.0)); + assertThat(accumulator.getAvg(), equalTo(-5.5)); + } + + public void testAsMap() { + StatsAccumulator accumulator = new StatsAccumulator(); + accumulator.add(5.0); + accumulator.add(10.0); + + Map expectedMap = new HashMap<>(); + expectedMap.put("min", 5.0); + expectedMap.put("max", 10.0); + expectedMap.put("avg", 7.5); + expectedMap.put("total", 15.0); + assertThat(accumulator.asMap(), equalTo(expectedMap)); + } + + public void testMerge() { + StatsAccumulator accumulator = new StatsAccumulator(); + accumulator.add(5.0); + accumulator.add(10.0); + + assertThat(accumulator.getMin(), equalTo(5.0)); + assertThat(accumulator.getMax(), equalTo(10.0)); + assertThat(accumulator.getTotal(), equalTo(15.0)); + assertThat(accumulator.getAvg(), equalTo(7.5)); + + StatsAccumulator accumulator2 = new StatsAccumulator(); + accumulator2.add(1.0); + accumulator2.add(3.0); + accumulator2.add(7.0); + + assertThat(accumulator2.getMin(), equalTo(1.0)); + assertThat(accumulator2.getMax(), equalTo(7.0)); + assertThat(accumulator2.getTotal(), equalTo(11.0)); + assertThat(accumulator2.getAvg(), equalTo(11.0 / 3.0)); + + accumulator.merge(accumulator2); + assertThat(accumulator.getMin(), equalTo(1.0)); + assertThat(accumulator.getMax(), equalTo(10.0)); + assertThat(accumulator.getTotal(), equalTo(26.0)); + assertThat(accumulator.getAvg(), equalTo(5.2)); + + // same as accumulator + StatsAccumulator accumulator3 = new StatsAccumulator(); + accumulator3.add(5.0); + accumulator3.add(10.0); + + // merging the other way should yield the same results + accumulator2.merge(accumulator3); + assertThat(accumulator2.getMin(), equalTo(1.0)); + assertThat(accumulator2.getMax(), equalTo(10.0)); + assertThat(accumulator2.getTotal(), equalTo(26.0)); + assertThat(accumulator2.getAvg(), equalTo(5.2)); + } + + public void testMergeMixedEmpty() { + StatsAccumulator accumulator = new StatsAccumulator(); + + StatsAccumulator accumulator2 = new StatsAccumulator(); + accumulator2.add(1.0); + accumulator2.add(3.0); + accumulator.merge(accumulator2); + assertThat(accumulator.getMin(), equalTo(1.0)); + assertThat(accumulator.getMax(), equalTo(3.0)); + assertThat(accumulator.getTotal(), equalTo(4.0)); + + StatsAccumulator accumulator3 = new StatsAccumulator(); + accumulator.merge(accumulator3); + assertThat(accumulator.getMin(), equalTo(1.0)); + assertThat(accumulator.getMax(), equalTo(3.0)); + assertThat(accumulator.getTotal(), equalTo(4.0)); + + StatsAccumulator accumulator4 = new StatsAccumulator(); + accumulator3.merge(accumulator4); + + assertThat(accumulator3.getMin(), equalTo(0.0)); + assertThat(accumulator3.getMax(), equalTo(0.0)); + assertThat(accumulator3.getTotal(), equalTo(0.0)); + } + + public void testFromStatsAggregation() { + Stats stats = mock(Stats.class); + when(stats.getMax()).thenReturn(25.0); + when(stats.getMin()).thenReturn(2.5); + when(stats.getCount()).thenReturn(4l); + when(stats.getSum()).thenReturn(48.0); + when(stats.getAvg()).thenReturn(12.0); + + StatsAccumulator accumulator = StatsAccumulator.fromStatsAggregation(stats); + assertThat(accumulator.getMin(), equalTo(2.5)); + assertThat(accumulator.getMax(), equalTo(25.0)); + assertThat(accumulator.getTotal(), equalTo(48.0)); + assertThat(accumulator.getAvg(), equalTo(12.0)); + } + + @Override + public StatsAccumulator createTestInstance() { + StatsAccumulator accumulator = new StatsAccumulator(); + for (int i = 0; i < randomInt(10); ++i) { + accumulator.add(randomDoubleBetween(0.0, 1000.0, true)); + } + + return accumulator; + } + + @Override + protected Reader instanceReader() { + return StatsAccumulator::new; + } +} \ No newline at end of file diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index fa4845028fb59..2b862ffe84a75 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -32,8 +32,9 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.ml.job.process.NativeController; import org.elasticsearch.xpack.ml.job.process.NativeControllerHolder; -import org.elasticsearch.xpack.ml.stats.StatsAccumulator; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; +import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator; import java.io.IOException; import java.util.Arrays; @@ -192,10 +193,12 @@ public void execute(ActionListener listener) { private void addJobsUsage(GetJobsStatsAction.Response response) { StatsAccumulator allJobsDetectorsStats = new StatsAccumulator(); StatsAccumulator allJobsModelSizeStats = new StatsAccumulator(); + ForecastStats allJobsForecastStats = new ForecastStats(); Map jobCountByState = new HashMap<>(); Map detectorStatsByState = new HashMap<>(); Map modelSizeStatsByState = new HashMap<>(); + Map forecastStatsByState = new HashMap<>(); Map jobs = mlMetadata.getJobs(); List jobsStats = response.getResponse().results(); @@ -206,6 +209,7 @@ private void addJobsUsage(GetJobsStatsAction.Response response) { double modelSize = modelSizeStats == null ? 0.0 : jobStats.getModelSizeStats().getModelBytes(); + allJobsForecastStats.merge(jobStats.getForecastStats()); allJobsDetectorsStats.add(detectorsCount); allJobsModelSizeStats.add(modelSize); @@ -215,24 +219,29 @@ private void addJobsUsage(GetJobsStatsAction.Response response) { js -> new StatsAccumulator()).add(detectorsCount); modelSizeStatsByState.computeIfAbsent(jobState, js -> new StatsAccumulator()).add(modelSize); + forecastStatsByState.computeIfAbsent(jobState, + js -> new ForecastStats()).merge(jobStats.getForecastStats()); } jobsUsage.put(MachineLearningFeatureSetUsage.ALL, createJobUsageEntry(jobs.size(), allJobsDetectorsStats, - allJobsModelSizeStats)); + allJobsModelSizeStats, allJobsForecastStats)); for (JobState jobState : jobCountByState.keySet()) { jobsUsage.put(jobState.name().toLowerCase(Locale.ROOT), createJobUsageEntry( jobCountByState.get(jobState).get(), detectorStatsByState.get(jobState), - modelSizeStatsByState.get(jobState))); + modelSizeStatsByState.get(jobState), + forecastStatsByState.get(jobState))); } } private Map createJobUsageEntry(long count, StatsAccumulator detectorStats, - StatsAccumulator modelSizeStats) { + StatsAccumulator modelSizeStats, + ForecastStats forecastStats) { Map usage = new HashMap<>(); usage.put(MachineLearningFeatureSetUsage.COUNT, count); usage.put(MachineLearningFeatureSetUsage.DETECTORS, detectorStats.asMap()); usage.put(MachineLearningFeatureSetUsage.MODEL_SIZE, modelSizeStats.asMap()); + usage.put(MachineLearningFeatureSetUsage.FORECASTS, forecastStats.asMap()); return usage; } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java index fda8d6bfc6e95..31f918dfc2571 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsAction.java @@ -24,12 +24,12 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction; -import org.elasticsearch.xpack.core.ml.action.util.ForecastStats; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.xpack.ml.job.persistence.JobProvider; import org.elasticsearch.xpack.ml.job.process.autodetect.AutodetectProcessManager; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index afca4606170eb..ceba95720a2d7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -75,7 +75,6 @@ import org.elasticsearch.xpack.core.ml.action.GetCategoriesAction; import org.elasticsearch.xpack.core.ml.action.GetInfluencersAction; import org.elasticsearch.xpack.core.ml.action.GetRecordsAction; -import org.elasticsearch.xpack.core.ml.action.util.ForecastStats; import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.calendars.Calendar; import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent; @@ -96,14 +95,15 @@ import org.elasticsearch.xpack.core.ml.job.results.Influencer; import org.elasticsearch.xpack.core.ml.job.results.ModelPlot; import org.elasticsearch.xpack.core.ml.job.results.Result; +import org.elasticsearch.xpack.core.ml.stats.CountAccumulator; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; +import org.elasticsearch.xpack.core.ml.stats.StatsAccumulator; import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper; import org.elasticsearch.xpack.core.ml.utils.MlIndicesUtils; import org.elasticsearch.xpack.core.security.support.Exceptions; import org.elasticsearch.xpack.ml.job.categorization.GrokPatternCreator; import org.elasticsearch.xpack.ml.job.persistence.InfluencersQueryBuilder.InfluencersQuery; import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams; -import org.elasticsearch.xpack.ml.stats.CountAccumulator; -import org.elasticsearch.xpack.ml.stats.StatsAccumulator; import java.io.IOException; import java.io.InputStream; @@ -1128,10 +1128,10 @@ public void getForecastStats(String jobId, Consumer handler, Cons searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(searchRequest.indicesOptions())); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(finalQuery); - sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.memory).field(ForecastRequestStats.MEMORY_USAGE.getPreferredName())); - sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.records).field(ForecastRequestStats.PROCESSED_RECORD_COUNT.getPreferredName())); - sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.runtime).field(ForecastRequestStats.PROCESSING_TIME_MS.getPreferredName())); - sourceBuilder.aggregation(AggregationBuilders.terms(ForecastStats.Fields.byStatus).field(ForecastRequestStats.STATUS.getPreferredName())); + sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.MEMORY).field(ForecastRequestStats.MEMORY_USAGE.getPreferredName())); + sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.RECORDS).field(ForecastRequestStats.PROCESSED_RECORD_COUNT.getPreferredName())); + sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.RUNTIME).field(ForecastRequestStats.PROCESSING_TIME_MS.getPreferredName())); + sourceBuilder.aggregation(AggregationBuilders.terms(ForecastStats.Fields.STATUSES).field(ForecastRequestStats.STATUS.getPreferredName())); sourceBuilder.size(0); searchRequest.source(sourceBuilder); @@ -1143,16 +1143,16 @@ public void getForecastStats(String jobId, Consumer handler, Cons Map aggregations = searchResponse.getAggregations().asMap(); StatsAccumulator memoryStats = StatsAccumulator - .fromStatsAggregation((Stats) aggregations.get(ForecastStats.Fields.memory)); + .fromStatsAggregation((Stats) aggregations.get(ForecastStats.Fields.MEMORY)); StatsAccumulator recordStats = StatsAccumulator - .fromStatsAggregation((Stats) aggregations.get(ForecastStats.Fields.records)); + .fromStatsAggregation((Stats) aggregations.get(ForecastStats.Fields.RECORDS)); StatsAccumulator runtimeStats = StatsAccumulator - .fromStatsAggregation((Stats) aggregations.get(ForecastStats.Fields.runtime)); + .fromStatsAggregation((Stats) aggregations.get(ForecastStats.Fields.RUNTIME)); CountAccumulator statusCount = CountAccumulator - .fromTermsAggregation((StringTerms) aggregations.get(ForecastStats.Fields.byStatus)); + .fromTermsAggregation((StringTerms) aggregations.get(ForecastStats.Fields.STATUSES)); - ForecastStats forecastStats = new ForecastStats(totalHits, memoryStats.asMap(), recordStats.asMap(), - runtimeStats.asMap(), statusCount.asMap()); + ForecastStats forecastStats = new ForecastStats(totalHits, memoryStats, recordStats, + runtimeStats, statusCount); handler.accept(forecastStats); }, errorHandler), client::search); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/CountAccumulator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/CountAccumulator.java deleted file mode 100644 index 6e10b53d6f3ec..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/CountAccumulator.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -package org.elasticsearch.xpack.ml.stats; - -import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; - -import java.util.HashMap; -import java.util.Map; -import java.util.stream.Collectors; - -public class CountAccumulator { - - private final Map counts; - - public CountAccumulator() { - this.counts = new HashMap(); - } - - private CountAccumulator(Map counts) { - this.counts = counts; - } - - public Map asMap() { - return counts; - } - - public static CountAccumulator fromTermsAggregation(StringTerms termsAggregation) { - return new CountAccumulator(termsAggregation.getBuckets().stream() - .collect(Collectors.toMap(bucket -> bucket.getKeyAsString(), bucket -> bucket.getDocCount()))); - } - -} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/StatsAccumulator.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/StatsAccumulator.java deleted file mode 100644 index ac13f95790e36..0000000000000 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/stats/StatsAccumulator.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.stats; - -import org.elasticsearch.search.aggregations.metrics.stats.Stats; - -import java.util.HashMap; -import java.util.Map; - -/** - * Helper class to collect min, max, avg and total statistics for a quantity - */ -public class StatsAccumulator { - - private static final String MIN = "min"; - private static final String MAX = "max"; - private static final String AVG = "avg"; - private static final String TOTAL = "total"; - - private long count; - private double total; - private Double min; - private Double max; - - public StatsAccumulator() { - } - - public void add(double value) { - count++; - total += value; - min = min == null ? value : (value < min ? value : min); - max = max == null ? value : (value > max ? value : max); - } - - public double getMin() { - return min == null ? 0.0 : min; - } - - public double getMax() { - return max == null ? 0.0 : max; - } - - public double getAvg() { - return count == 0.0 ? 0.0 : total/count; - } - - public double getTotal() { - return total; - } - - public Map asMap() { - Map map = new HashMap<>(); - map.put(MIN, getMin()); - map.put(MAX, getMax()); - map.put(AVG, getAvg()); - map.put(TOTAL, getTotal()); - return map; - } - - private StatsAccumulator(long count, double total, double min, double max) { - this.count = count; - this.total = total; - this.min = min; - this.max = max; - } - - public static StatsAccumulator fromStatsAggregation(Stats statsAggregation) { - return new StatsAccumulator(statsAggregation.getCount(), statsAggregation.getSum(), statsAggregation.getMin(), - statsAggregation.getMax()); - } -} - diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/stats/StatsAccumulatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/stats/StatsAccumulatorTests.java deleted file mode 100644 index 70733e0cc573e..0000000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/stats/StatsAccumulatorTests.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.stats; - -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.ml.stats.StatsAccumulator; - -import java.util.HashMap; -import java.util.Map; - -import static org.hamcrest.Matchers.equalTo; - -public class StatsAccumulatorTests extends ESTestCase { - - public void testGivenNoValues() { - StatsAccumulator accumulator = new StatsAccumulator(); - assertThat(accumulator.getMin(), equalTo(0.0)); - assertThat(accumulator.getMax(), equalTo(0.0)); - assertThat(accumulator.getTotal(), equalTo(0.0)); - assertThat(accumulator.getAvg(), equalTo(0.0)); - } - - public void testGivenPositiveValues() { - StatsAccumulator accumulator = new StatsAccumulator(); - - for (int i = 1; i <= 10; i++) { - accumulator.add(i); - } - - assertThat(accumulator.getMin(), equalTo(1.0)); - assertThat(accumulator.getMax(), equalTo(10.0)); - assertThat(accumulator.getTotal(), equalTo(55.0)); - assertThat(accumulator.getAvg(), equalTo(5.5)); - } - - public void testGivenNegativeValues() { - StatsAccumulator accumulator = new StatsAccumulator(); - - for (int i = 1; i <= 10; i++) { - accumulator.add(-1 * i); - } - - assertThat(accumulator.getMin(), equalTo(-10.0)); - assertThat(accumulator.getMax(), equalTo(-1.0)); - assertThat(accumulator.getTotal(), equalTo(-55.0)); - assertThat(accumulator.getAvg(), equalTo(-5.5)); - } - - public void testAsMap() { - StatsAccumulator accumulator = new StatsAccumulator(); - accumulator.add(5.0); - accumulator.add(10.0); - - Map expectedMap = new HashMap<>(); - expectedMap.put("min", 5.0); - expectedMap.put("max", 10.0); - expectedMap.put("avg", 7.5); - expectedMap.put("total", 15.0); - assertThat(accumulator.asMap(), equalTo(expectedMap)); - } -} \ No newline at end of file From 11ff0179bb58bc446d361e2c69daba932a00d3ed Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Thu, 28 Jun 2018 13:06:45 +0200 Subject: [PATCH 03/11] fix stylecheck issues --- .../core/ml/action/GetJobsStatsAction.java | 5 +- .../xpack/core/ml/stats/CountAccumulator.java | 2 +- .../core/ml/stats/CountAccumulatorTests.java | 52 +++++++++---------- .../core/ml/stats/ForecastStatsTests.java | 14 ++--- .../core/ml/stats/StatsAccumulatorTests.java | 2 +- .../xpack/ml/job/persistence/JobProvider.java | 12 +++-- .../TransportGetJobsStatsActionTests.java | 17 +++--- .../ml/JobStatsMonitoringDocTests.java | 5 +- 8 files changed, 58 insertions(+), 51 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java index 75160a2fd5ce0..cc556a2fe4dde 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java @@ -165,8 +165,9 @@ public static class JobStats implements ToXContentObject, Writeable { @Nullable private String assignmentExplanation; - public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, @Nullable ForecastStats forecastStats, JobState state, - @Nullable DiscoveryNode node, @Nullable String assignmentExplanation, @Nullable TimeValue opentime) { + public JobStats(String jobId, DataCounts dataCounts, @Nullable ModelSizeStats modelSizeStats, + @Nullable ForecastStats forecastStats, JobState state, @Nullable DiscoveryNode node, + @Nullable String assignmentExplanation, @Nullable TimeValue opentime) { this.jobId = Objects.requireNonNull(jobId); this.dataCounts = Objects.requireNonNull(dataCounts); this.modelSizeStats = modelSizeStats; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulator.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulator.java index 5da402f50fd4c..638aa8a2fa6be 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulator.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulator.java @@ -45,7 +45,7 @@ public void merge(CountAccumulator other) { } public void add(String key, Long count) { - counts.put(key, counts.getOrDefault(key, 0l) + count); + counts.put(key, counts.getOrDefault(key, 0L) + count); } public Map asMap() { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulatorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulatorTests.java index 1b5729c419b00..4e18a70a3a0a2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulatorTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/CountAccumulatorTests.java @@ -27,37 +27,37 @@ public void testEmpty() { public void testAdd() { CountAccumulator accumulator = new CountAccumulator(); - accumulator.add("a", 22l); - accumulator.add("a", 10l); - accumulator.add("a", 15l); - accumulator.add("a", -12l); - accumulator.add("a", 0l); - - accumulator.add("b", 13l); - accumulator.add("b", 1l); - accumulator.add("b", 40000l); - accumulator.add("b", -2l); - accumulator.add("b", 333l); - - assertEquals(35l, accumulator.asMap().get("a").longValue()); - assertEquals(40345l, accumulator.asMap().get("b").longValue()); + accumulator.add("a", 22L); + accumulator.add("a", 10L); + accumulator.add("a", 15L); + accumulator.add("a", -12L); + accumulator.add("a", 0L); + + accumulator.add("b", 13L); + accumulator.add("b", 1L); + accumulator.add("b", 40000L); + accumulator.add("b", -2L); + accumulator.add("b", 333L); + + assertEquals(35L, accumulator.asMap().get("a").longValue()); + assertEquals(40345L, accumulator.asMap().get("b").longValue()); assertEquals(2, accumulator.asMap().size()); } public void testMerge() { CountAccumulator accumulator = new CountAccumulator(); - accumulator.add("a", 13l); - accumulator.add("b", 42l); + accumulator.add("a", 13L); + accumulator.add("b", 42L); CountAccumulator accumulator2 = new CountAccumulator(); - accumulator2.add("a", 12l); - accumulator2.add("c", -1l); + accumulator2.add("a", 12L); + accumulator2.add("c", -1L); accumulator.merge(accumulator2); - assertEquals(25l, accumulator.asMap().get("a").longValue()); - assertEquals(42l, accumulator.asMap().get("b").longValue()); - assertEquals(-1l, accumulator.asMap().get("c").longValue()); + assertEquals(25L, accumulator.asMap().get("a").longValue()); + assertEquals(42L, accumulator.asMap().get("b").longValue()); + assertEquals(-1L, accumulator.asMap().get("c").longValue()); assertEquals(3, accumulator.asMap().size()); } @@ -66,19 +66,19 @@ public void testFromTermsAggregation() { Bucket bucket1 = mock(Bucket.class); when(bucket1.getKeyAsString()).thenReturn("a"); - when(bucket1.getDocCount()).thenReturn(10l); + when(bucket1.getDocCount()).thenReturn(10L); Bucket bucket2 = mock(Bucket.class); when(bucket2.getKeyAsString()).thenReturn("b"); - when(bucket2.getDocCount()).thenReturn(33l); + when(bucket2.getDocCount()).thenReturn(33L); List buckets = Arrays.asList(bucket1, bucket2); when(termsAggregation.getBuckets()).thenReturn(buckets); CountAccumulator accumulator = CountAccumulator.fromTermsAggregation(termsAggregation); - assertEquals(10l, accumulator.asMap().get("a").longValue()); - assertEquals(33l, accumulator.asMap().get("b").longValue()); + assertEquals(10L, accumulator.asMap().get("a").longValue()); + assertEquals(33L, accumulator.asMap().get("b").longValue()); assertEquals(2, accumulator.asMap().size()); } @@ -86,7 +86,7 @@ public void testFromTermsAggregation() { public CountAccumulator createTestInstance() { CountAccumulator accumulator = new CountAccumulator(); for (int i = 0; i < randomInt(10); ++i) { - accumulator.add(randomAlphaOfLengthBetween(1, 20), randomLongBetween(1l, 100l)); + accumulator.add(randomAlphaOfLengthBetween(1, 20), randomLongBetween(1L, 100L)); } return accumulator; diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java index 7b0b8f3c1ff76..45c91f3c55cf2 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java @@ -55,8 +55,8 @@ public void testMerge() { runtimeStats.add(10); CountAccumulator statusStats = new CountAccumulator(); - statusStats.add("finished", 2l); - statusStats.add("failed", 5l); + statusStats.add("finished", 2L); + statusStats.add("failed", 5L); ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, statusStats); @@ -73,8 +73,8 @@ public void testMerge() { runtimeStats2.add(0); CountAccumulator statusStats2 = new CountAccumulator(); - statusStats2.add("finished", 2l); - statusStats2.add("scheduled", 1l); + statusStats2.add("finished", 2L); + statusStats2.add("scheduled", 1L); ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, statusStats2); @@ -82,8 +82,8 @@ public void testMerge() { Map mergedStats = forecastStats.asMap(); - assertEquals(2l, mergedStats.get(Fields.JOBS)); - assertEquals(5l, mergedStats.get(Fields.TOTAL)); + assertEquals(2L, mergedStats.get(Fields.JOBS)); + assertEquals(5L, mergedStats.get(Fields.TOTAL)); @SuppressWarnings("unchecked") Map mergedMemoryStats = (Map) mergedStats.get(Fields.MEMORY); @@ -131,7 +131,7 @@ public void testUniqueCountOfJobs() { forecastStats.merge(forecastStats4); forecastStats.merge(forecastStats5); - assertEquals(3l, forecastStats.asMap().get(Fields.JOBS)); + assertEquals(3L, forecastStats.asMap().get(Fields.JOBS)); } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulatorTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulatorTests.java index a81176d3f374a..bd2df0823ae17 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulatorTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/StatsAccumulatorTests.java @@ -132,7 +132,7 @@ public void testFromStatsAggregation() { Stats stats = mock(Stats.class); when(stats.getMax()).thenReturn(25.0); when(stats.getMin()).thenReturn(2.5); - when(stats.getCount()).thenReturn(4l); + when(stats.getCount()).thenReturn(4L); when(stats.getSum()).thenReturn(48.0); when(stats.getAvg()).thenReturn(12.0); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index ceba95720a2d7..a757556a0d2b7 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -1128,10 +1128,14 @@ public void getForecastStats(String jobId, Consumer handler, Cons searchRequest.indicesOptions(MlIndicesUtils.addIgnoreUnavailable(searchRequest.indicesOptions())); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(finalQuery); - sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.MEMORY).field(ForecastRequestStats.MEMORY_USAGE.getPreferredName())); - sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.RECORDS).field(ForecastRequestStats.PROCESSED_RECORD_COUNT.getPreferredName())); - sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.RUNTIME).field(ForecastRequestStats.PROCESSING_TIME_MS.getPreferredName())); - sourceBuilder.aggregation(AggregationBuilders.terms(ForecastStats.Fields.STATUSES).field(ForecastRequestStats.STATUS.getPreferredName())); + sourceBuilder.aggregation( + AggregationBuilders.stats(ForecastStats.Fields.MEMORY).field(ForecastRequestStats.MEMORY_USAGE.getPreferredName())); + sourceBuilder.aggregation(AggregationBuilders.stats(ForecastStats.Fields.RECORDS) + .field(ForecastRequestStats.PROCESSED_RECORD_COUNT.getPreferredName())); + sourceBuilder.aggregation( + AggregationBuilders.stats(ForecastStats.Fields.RUNTIME).field(ForecastRequestStats.PROCESSING_TIME_MS.getPreferredName())); + sourceBuilder.aggregation( + AggregationBuilders.terms(ForecastStats.Fields.STATUSES).field(ForecastRequestStats.STATUS.getPreferredName())); sourceBuilder.size(0); searchRequest.source(sourceBuilder); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java index 40bc82c6048c7..2e00ad71251db 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/action/TransportGetJobsStatsActionTests.java @@ -37,7 +37,7 @@ public void testDetermineJobIds() { result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Collections.singletonList("id1"), Collections.singletonList( - new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.OPENED, null, null, null))); + new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null))); assertEquals(0, result.size()); result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, @@ -49,7 +49,7 @@ public void testDetermineJobIds() { result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"), - Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, + Collections.singletonList(new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.CLOSED, null, null, null)) ); assertEquals(2, result.size()); @@ -58,17 +58,16 @@ public void testDetermineJobIds() { result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"), Arrays.asList( - new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.OPENED, null, null, null), - new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.OPENED, null, null, null) + new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null), + new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null) )); assertEquals(1, result.size()); assertEquals("id2", result.get(0)); - result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"), - Arrays.asList( - new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, JobState.OPENED, null, null, null), - new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, JobState.OPENED, null, null, null), - new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, JobState.OPENED, null, null, null))); + result = determineNonDeletedJobIdsWithoutLiveStats(mlMetadata, Arrays.asList("id1", "id2", "id3"), Arrays.asList( + new GetJobsStatsAction.Response.JobStats("id1", new DataCounts("id1"), null, null, JobState.OPENED, null, null, null), + new GetJobsStatsAction.Response.JobStats("id2", new DataCounts("id2"), null, null, JobState.OPENED, null, null, null), + new GetJobsStatsAction.Response.JobStats("id3", new DataCounts("id3"), null, null, JobState.OPENED, null, null, null))); assertEquals(0, result.size()); // No jobs running, but job 4 is being deleted diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java index 88f34c4577c1c..266d321db0699 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.monitoring.MonitoredSystem; import org.elasticsearch.xpack.core.monitoring.exporter.MonitoringDoc; import org.elasticsearch.xpack.monitoring.exporter.BaseMonitoringDocTestCase; @@ -100,7 +101,9 @@ public void testToXContent() throws IOException { .build(); final DataCounts dataCounts = new DataCounts("_job_id", 0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, date3, date4, date5, date6, date7); - final JobStats jobStats = new JobStats("_job", dataCounts, modelStats, JobState.OPENED, discoveryNode, "_explanation", time); + final ForecastStats forecastStats = new ForecastStats(); + final JobStats jobStats = new JobStats("_job", dataCounts, modelStats, forecastStats, JobState.OPENED, discoveryNode, + "_explanation", time); final MonitoringDoc.Node node = new MonitoringDoc.Node("_uuid", "_host", "_addr", "_ip", "_name", 1504169190855L); final JobStatsMonitoringDoc document = new JobStatsMonitoringDoc("_cluster", 1502266739402L, 1506593717631L, node, jobStats); From 1b93000a47c19cbe1c6c5f7ec82ad96b71159794 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 29 Jun 2018 09:50:12 +0200 Subject: [PATCH 04/11] add forecastStats Writable interface implementation with BWC layer --- .../xpack/core/ml/action/GetJobsStatsAction.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java index cc556a2fe4dde..807c09363759b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/GetJobsStatsAction.java @@ -186,6 +186,9 @@ public JobStats(StreamInput in) throws IOException { node = in.readOptionalWriteable(DiscoveryNode::new); assignmentExplanation = in.readOptionalString(); openTime = in.readOptionalTimeValue(); + if (in.getVersion().onOrAfter(Version.V_6_4_0)) { + forecastStats = in.readOptionalWriteable(ForecastStats::new); + } } public String getJobId() { @@ -273,11 +276,14 @@ public void writeTo(StreamOutput out) throws IOException { out.writeOptionalWriteable(node); out.writeOptionalString(assignmentExplanation); out.writeOptionalTimeValue(openTime); + if (out.getVersion().onOrAfter(Version.V_6_4_0)) { + out.writeOptionalWriteable(forecastStats); + } } @Override public int hashCode() { - return Objects.hash(jobId, dataCounts, modelSizeStats, state, node, assignmentExplanation, openTime); + return Objects.hash(jobId, dataCounts, modelSizeStats, forecastStats, state, node, assignmentExplanation, openTime); } @Override @@ -292,6 +298,7 @@ public boolean equals(Object obj) { return Objects.equals(jobId, other.jobId) && Objects.equals(this.dataCounts, other.dataCounts) && Objects.equals(this.modelSizeStats, other.modelSizeStats) + && Objects.equals(this.forecastStats, other.forecastStats) && Objects.equals(this.state, other.state) && Objects.equals(this.node, other.node) && Objects.equals(this.assignmentExplanation, other.assignmentExplanation) From b10eb4c6876bc469ea2c17085bf37ac51cc3ffbf Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 29 Jun 2018 10:37:24 +0200 Subject: [PATCH 05/11] change field to forecasted_jobs, always write it and fix chained merges --- .../xpack/core/ml/stats/ForecastStats.java | 31 +++--- .../core/ml/stats/ForecastStatsTests.java | 98 ++++++++++++++++++- 2 files changed, 109 insertions(+), 20 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java index 89a8bf30c75f0..4f4176414fa6d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java @@ -24,7 +24,7 @@ public class ForecastStats implements ToXContentObject, Writeable { public static class Fields { public static final String TOTAL = "total"; - public static final String JOBS = "jobs"; + public static final String FORECASTED_JOBS = "forecasted_jobs"; public static final String MEMORY = "memory_bytes"; public static final String RUNTIME = "processing_time_ms"; public static final String RECORDS = "records"; @@ -32,7 +32,7 @@ public static class Fields { } private long total; - private long jobsWithAtleastOneForecast; + private long forecastedJobs; private StatsAccumulator memoryStats; private StatsAccumulator recordStats; private StatsAccumulator runtimeStats; @@ -40,7 +40,7 @@ public static class Fields { public ForecastStats() { this.total = 0; - this.jobsWithAtleastOneForecast = 0; + this.forecastedJobs = 0; this.memoryStats = new StatsAccumulator(); this.recordStats = new StatsAccumulator(); this.runtimeStats = new StatsAccumulator(); @@ -53,7 +53,7 @@ public ForecastStats() { public ForecastStats(long total, StatsAccumulator memoryStats, StatsAccumulator recordStats, StatsAccumulator runtimeStats, CountAccumulator statusCounts) { this.total = total; - this.jobsWithAtleastOneForecast = total > 0 ? 1 : 0; + this.forecastedJobs = total > 0 ? 1 : 0; this.memoryStats = Objects.requireNonNull(memoryStats); this.recordStats = Objects.requireNonNull(recordStats); this.runtimeStats = Objects.requireNonNull(runtimeStats); @@ -62,7 +62,7 @@ public ForecastStats(long total, StatsAccumulator memoryStats, StatsAccumulator public ForecastStats(StreamInput in) throws IOException { this.total = in.readLong(); - this.jobsWithAtleastOneForecast = in.readLong(); + this.forecastedJobs = in.readLong(); this.memoryStats = new StatsAccumulator(in); this.recordStats = new StatsAccumulator(in); this.runtimeStats = new StatsAccumulator(in); @@ -74,9 +74,7 @@ public void merge(ForecastStats other) { return; } total += other.total; - if (other.total > 0) { - ++jobsWithAtleastOneForecast; - } + forecastedJobs += other.forecastedJobs; memoryStats.merge(other.memoryStats); recordStats.merge(other.recordStats); runtimeStats.merge(other.runtimeStats); @@ -92,15 +90,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { builder.field(Fields.TOTAL, total); + builder.field(Fields.FORECASTED_JOBS, forecastedJobs); + if (total > 0) { builder.field(Fields.MEMORY, memoryStats.asMap()); builder.field(Fields.RECORDS, recordStats.asMap()); builder.field(Fields.RUNTIME, runtimeStats.asMap()); builder.field(Fields.STATUSES, statusCounts.asMap()); } - if (jobsWithAtleastOneForecast > 1) { - builder.field(Fields.JOBS, jobsWithAtleastOneForecast); - } return builder; } @@ -108,22 +105,22 @@ public XContentBuilder doXContentBody(XContentBuilder builder, Params params) th public Map asMap() { Map map = new HashMap<>(); map.put(Fields.TOTAL, total); + map.put(Fields.FORECASTED_JOBS, forecastedJobs); + if (total > 0) { map.put(Fields.MEMORY, memoryStats.asMap()); map.put(Fields.RECORDS, recordStats.asMap()); map.put(Fields.RUNTIME, runtimeStats.asMap()); map.put(Fields.STATUSES, statusCounts.asMap()); } - if (jobsWithAtleastOneForecast > 1) { - map.put(Fields.JOBS, jobsWithAtleastOneForecast); - } + return map; } @Override public void writeTo(StreamOutput out) throws IOException { out.writeLong(total); - out.writeLong(jobsWithAtleastOneForecast); + out.writeLong(forecastedJobs); memoryStats.writeTo(out); recordStats.writeTo(out); runtimeStats.writeTo(out); @@ -132,7 +129,7 @@ public void writeTo(StreamOutput out) throws IOException { @Override public int hashCode() { - return Objects.hash(total, jobsWithAtleastOneForecast, memoryStats, recordStats, runtimeStats, statusCounts); + return Objects.hash(total, forecastedJobs, memoryStats, recordStats, runtimeStats, statusCounts); } @Override @@ -146,7 +143,7 @@ public boolean equals(Object obj) { } ForecastStats other = (ForecastStats) obj; - return Objects.equals(total, other.total) && Objects.equals(jobsWithAtleastOneForecast, other.jobsWithAtleastOneForecast) + return Objects.equals(total, other.total) && Objects.equals(forecastedJobs, other.forecastedJobs) && Objects.equals(memoryStats, other.memoryStats) && Objects.equals(recordStats, other.recordStats) && Objects.equals(runtimeStats, other.runtimeStats) && Objects.equals(statusCounts, other.statusCounts); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java index 45c91f3c55cf2..4003e7d7c439b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java @@ -31,10 +31,10 @@ public void testEmpty() throws IOException { XContentParser parser = createParser(builder); Map properties = parser.map(); assertTrue(properties.containsKey(Fields.TOTAL)); + assertTrue(properties.containsKey(Fields.FORECASTED_JOBS)); assertFalse(properties.containsKey(Fields.MEMORY)); assertFalse(properties.containsKey(Fields.RECORDS)); assertFalse(properties.containsKey(Fields.RUNTIME)); - assertFalse(properties.containsKey(Fields.JOBS)); assertFalse(properties.containsKey(Fields.STATUSES)); } @@ -82,7 +82,7 @@ public void testMerge() { Map mergedStats = forecastStats.asMap(); - assertEquals(2L, mergedStats.get(Fields.JOBS)); + assertEquals(2L, mergedStats.get(Fields.FORECASTED_JOBS)); assertEquals(5L, mergedStats.get(Fields.TOTAL)); @SuppressWarnings("unchecked") @@ -119,6 +119,98 @@ public void testMerge() { assertEquals(1, mergedCountStats.get("scheduled").longValue()); } + public void testChainedMerge() { + StatsAccumulator memoryStats = new StatsAccumulator(); + memoryStats.add(1000); + memoryStats.add(45000); + memoryStats.add(2300); + StatsAccumulator recordStats = new StatsAccumulator(); + recordStats.add(10); + recordStats.add(0); + recordStats.add(20); + StatsAccumulator runtimeStats = new StatsAccumulator(); + runtimeStats.add(0); + runtimeStats.add(0); + runtimeStats.add(10); + CountAccumulator statusStats = new CountAccumulator(); + statusStats.add("finished", 2L); + statusStats.add("failed", 5L); + ForecastStats forecastStats = new ForecastStats(3, memoryStats, recordStats, runtimeStats, statusStats); + + StatsAccumulator memoryStats2 = new StatsAccumulator(); + memoryStats2.add(10); + memoryStats2.add(30); + StatsAccumulator recordStats2 = new StatsAccumulator(); + recordStats2.add(10); + recordStats2.add(0); + StatsAccumulator runtimeStats2 = new StatsAccumulator(); + runtimeStats2.add(96); + runtimeStats2.add(0); + CountAccumulator statusStats2 = new CountAccumulator(); + statusStats2.add("finished", 2L); + statusStats2.add("scheduled", 1L); + ForecastStats forecastStats2 = new ForecastStats(2, memoryStats2, recordStats2, runtimeStats2, statusStats2); + + StatsAccumulator memoryStats3 = new StatsAccumulator(); + memoryStats3.add(500); + StatsAccumulator recordStats3 = new StatsAccumulator(); + recordStats3.add(50); + StatsAccumulator runtimeStats3 = new StatsAccumulator(); + runtimeStats3.add(32); + CountAccumulator statusStats3 = new CountAccumulator(); + statusStats3.add("finished", 1L); + ForecastStats forecastStats3 = new ForecastStats(1, memoryStats3, recordStats3, runtimeStats3, statusStats3); + + ForecastStats forecastStats4 = new ForecastStats(); + + // merge 4 into 3 + forecastStats3.merge(forecastStats4); + + // merge 3 into 2 + forecastStats2.merge(forecastStats3); + + // merger 2 into 1 + forecastStats.merge(forecastStats2); + + Map mergedStats = forecastStats.asMap(); + + assertEquals(3L, mergedStats.get(Fields.FORECASTED_JOBS)); + assertEquals(6L, mergedStats.get(Fields.TOTAL)); + + @SuppressWarnings("unchecked") + Map mergedMemoryStats = (Map) mergedStats.get(Fields.MEMORY); + + assertTrue(mergedMemoryStats != null); + assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.AVG), equalTo(8140.0)); + assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MAX), equalTo(45000.0)); + assertThat(mergedMemoryStats.get(StatsAccumulator.Fields.MIN), equalTo(10.0)); + + @SuppressWarnings("unchecked") + Map mergedRecordStats = (Map) mergedStats.get(Fields.RECORDS); + + assertTrue(mergedRecordStats != null); + assertThat(mergedRecordStats.get(StatsAccumulator.Fields.AVG), equalTo(15.0)); + assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MAX), equalTo(50.0)); + assertThat(mergedRecordStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0)); + + @SuppressWarnings("unchecked") + Map mergedRuntimeStats = (Map) mergedStats.get(Fields.RUNTIME); + + assertTrue(mergedRuntimeStats != null); + assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.AVG), equalTo(23.0)); + assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MAX), equalTo(96.0)); + assertThat(mergedRuntimeStats.get(StatsAccumulator.Fields.MIN), equalTo(0.0)); + + @SuppressWarnings("unchecked") + Map mergedCountStats = (Map) mergedStats.get(Fields.STATUSES); + + assertTrue(mergedCountStats != null); + assertEquals(3, mergedCountStats.size()); + assertEquals(5, mergedCountStats.get("finished").longValue()); + assertEquals(5, mergedCountStats.get("failed").longValue()); + assertEquals(1, mergedCountStats.get("scheduled").longValue()); + } + public void testUniqueCountOfJobs() { ForecastStats forecastStats = createForecastStats(5, 10); ForecastStats forecastStats2 = createForecastStats(2, 8); @@ -131,7 +223,7 @@ public void testUniqueCountOfJobs() { forecastStats.merge(forecastStats4); forecastStats.merge(forecastStats5); - assertEquals(3L, forecastStats.asMap().get(Fields.JOBS)); + assertEquals(3L, forecastStats.asMap().get(Fields.FORECASTED_JOBS)); } @Override From a0f6ed521138755983e619061220980492873e19 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Fri, 29 Jun 2018 11:29:39 +0200 Subject: [PATCH 06/11] add a test for forecast by status for the usage endpoint --- .../xpack/core/ml/stats/ForecastStats.java | 6 ++-- .../core/ml/stats/ForecastStatsTests.java | 2 +- .../xpack/ml/MachineLearningFeatureSet.java | 3 +- .../ml/MachineLearningFeatureSetTests.java | 28 ++++++++++++++++--- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java index 4f4176414fa6d..d490e4b98a44a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/stats/ForecastStats.java @@ -69,9 +69,9 @@ public ForecastStats(StreamInput in) throws IOException { this.statusCounts = new CountAccumulator(in); } - public void merge(ForecastStats other) { + public ForecastStats merge(ForecastStats other) { if (other == null) { - return; + return this; } total += other.total; forecastedJobs += other.forecastedJobs; @@ -79,6 +79,8 @@ public void merge(ForecastStats other) { recordStats.merge(other.recordStats); runtimeStats.merge(other.runtimeStats); statusCounts.merge(other.statusCounts); + + return this; } @Override diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java index 4003e7d7c439b..f7f5d16c5e578 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/stats/ForecastStatsTests.java @@ -236,7 +236,7 @@ protected Reader instanceReader() { return ForecastStats::new; } - private ForecastStats createForecastStats(long minTotal, long maxTotal) { + public ForecastStats createForecastStats(long minTotal, long maxTotal) { ForecastStats forecastStats = new ForecastStats(randomLongBetween(minTotal, maxTotal), createStatsAccumulator(), createStatsAccumulator(), createStatsAccumulator(), createCountAccumulator()); diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java index 2b862ffe84a75..14ac43fae101c 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSet.java @@ -219,8 +219,7 @@ private void addJobsUsage(GetJobsStatsAction.Response response) { js -> new StatsAccumulator()).add(detectorsCount); modelSizeStatsByState.computeIfAbsent(jobState, js -> new StatsAccumulator()).add(modelSize); - forecastStatsByState.computeIfAbsent(jobState, - js -> new ForecastStats()).merge(jobStats.getForecastStats()); + forecastStatsByState.merge(jobState, jobStats.getForecastStats(), (f1, f2) -> f1.merge(f2)); } jobsUsage.put(MachineLearningFeatureSetUsage.ALL, createJobUsageEntry(jobs.size(), allJobsDetectorsStats, diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java index eba2054054c0d..a675a7c0b3599 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java @@ -39,6 +39,9 @@ import org.elasticsearch.xpack.core.ml.job.config.Job; import org.elasticsearch.xpack.core.ml.job.config.JobState; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStats; +import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests; +import org.elasticsearch.xpack.core.ml.stats.StatsAccumulatorTests; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.junit.Before; @@ -138,11 +141,11 @@ public void testUsage() throws Exception { settings.put("xpack.ml.enabled", true); Job opened1 = buildJob("opened1", Arrays.asList(buildMinDetector("foo"))); - GetJobsStatsAction.Response.JobStats opened1JobStats = buildJobStats("opened1", JobState.OPENED, 100L); + GetJobsStatsAction.Response.JobStats opened1JobStats = buildJobStats("opened1", JobState.OPENED, 100L, 3L); Job opened2 = buildJob("opened2", Arrays.asList(buildMinDetector("foo"), buildMinDetector("bar"))); - GetJobsStatsAction.Response.JobStats opened2JobStats = buildJobStats("opened2", JobState.OPENED, 200L); + GetJobsStatsAction.Response.JobStats opened2JobStats = buildJobStats("opened2", JobState.OPENED, 200L, 8L); Job closed1 = buildJob("closed1", Arrays.asList(buildMinDetector("foo"), buildMinDetector("bar"), buildMinDetector("foobar"))); - GetJobsStatsAction.Response.JobStats closed1JobStats = buildJobStats("closed1", JobState.CLOSED, 300L); + GetJobsStatsAction.Response.JobStats closed1JobStats = buildJobStats("closed1", JobState.CLOSED, 300L, 0); givenJobs(Arrays.asList(opened1, opened2, closed1), Arrays.asList(opened1JobStats, opened2JobStats, closed1JobStats)); @@ -210,6 +213,15 @@ public void testUsage() throws Exception { assertThat(source.getValue("datafeeds._all.count"), equalTo(3)); assertThat(source.getValue("datafeeds.started.count"), equalTo(2)); assertThat(source.getValue("datafeeds.stopped.count"), equalTo(1)); + + assertThat(source.getValue("jobs._all.forecasts.total"), equalTo(11)); + assertThat(source.getValue("jobs._all.forecasts.forecasted_jobs"), equalTo(2)); + + assertThat(source.getValue("jobs.closed.forecasts.total"), equalTo(0)); + assertThat(source.getValue("jobs.closed.forecasts.forecasted_jobs"), equalTo(0)); + + assertThat(source.getValue("jobs.opened.forecasts.total"), equalTo(11)); + assertThat(source.getValue("jobs.opened.forecasts.forecasted_jobs"), equalTo(2)); } } @@ -301,12 +313,16 @@ private static Job buildJob(String jobId, List detectors) { .build(new Date(randomNonNegativeLong())); } - private static GetJobsStatsAction.Response.JobStats buildJobStats(String jobId, JobState state, long modelBytes) { + private static GetJobsStatsAction.Response.JobStats buildJobStats(String jobId, JobState state, long modelBytes, + long numberOfForecasts) { ModelSizeStats.Builder modelSizeStats = new ModelSizeStats.Builder(jobId); modelSizeStats.setModelBytes(modelBytes); GetJobsStatsAction.Response.JobStats jobStats = mock(GetJobsStatsAction.Response.JobStats.class); + ForecastStats forecastStats = buildForecastStats(numberOfForecasts); + when(jobStats.getJobId()).thenReturn(jobId); when(jobStats.getModelSizeStats()).thenReturn(modelSizeStats.build()); + when(jobStats.getForecastStats()).thenReturn(forecastStats); when(jobStats.getState()).thenReturn(state); return jobStats; } @@ -316,4 +332,8 @@ private static GetDatafeedsStatsAction.Response.DatafeedStats buildDatafeedStats when(stats.getDatafeedState()).thenReturn(state); return stats; } + + private static ForecastStats buildForecastStats(long numberOfForecasts) { + return new ForecastStatsTests().createForecastStats(numberOfForecasts, numberOfForecasts); + } } From f9804dca36d28db9dfe7bc3437675258e1724fda Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 2 Jul 2018 10:40:53 +0200 Subject: [PATCH 07/11] add documentation about forecast statistics shown as part of jobcounts. --- x-pack/docs/en/rest-api/ml/jobcounts.asciidoc | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc b/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc index b2e24a298cbd0..df4362bb84ce6 100644 --- a/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc +++ b/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc @@ -20,6 +20,10 @@ progress of a job. (object) An object that provides information about the size and contents of the model. See <> +`forecasts_stats`:: + (object) An object that provides statistical information about forecasts + of this job. See <> + `node`:: (object) For open jobs only, contains information about the node where the job runs. See <>. @@ -177,6 +181,33 @@ NOTE: The `over` field values are counted separately for each detector and parti `timestamp`:: (date) The timestamp of the `model_size_stats` according to the timestamp of the data. +[float] +[[ml-forecastsstats]] +==== Forecasts Stats Objects + +The `forecasts_stats` object shows statistics about forecasts. It has the following properties: + +`total`:: + (long) The number of forecasts currently available for this model. + +`forecasted_jobs`:: + (long) Number of jobs that have at least one forecast. + +`memory_bytes`:: + (object) Statistics about the memory usage: min, max, avg and total. + +`records`:: + (object) Statistics about the number of forecast records: min, max, avg and total. + +`processing_time_ms`:: + (object) Statistics about the forecast runtime in milliseconds: min, max, avg and total. + +`status`:: + (object) Counts per forecast status, for example: {"finished" : 2}. + +NOTE: `memory_bytes`, `records`, `processing_time_ms`, `status` require at least 1 forecast, otherwise +these fields are ommitted. + [float] [[ml-stats-node]] ==== Node Objects From d6dd3656eeeeadfaf6af7c2987ef1a5184ebf2ae Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Mon, 2 Jul 2018 22:34:35 +0200 Subject: [PATCH 08/11] fix NPE when search fails --- .../xpack/ml/job/persistence/JobProvider.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java index a757556a0d2b7..7513cb5a5bbc0 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/persistence/JobProvider.java @@ -63,6 +63,7 @@ import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.metrics.stats.Stats; import org.elasticsearch.search.aggregations.metrics.stats.extended.ExtendedStats; @@ -1142,22 +1143,23 @@ public void getForecastStats(String jobId, Consumer handler, Cons executeAsyncWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN, searchRequest, ActionListener.wrap(searchResponse -> { - long totalHits = searchResponse.getHits().getTotalHits(); - Map aggregations = searchResponse.getAggregations().asMap(); - + Aggregations aggregations = searchResponse.getAggregations(); + if (totalHits == 0 || aggregations == null) { + handler.accept(new ForecastStats()); + return; + } + Map aggregationsAsMap = aggregations.asMap(); StatsAccumulator memoryStats = StatsAccumulator - .fromStatsAggregation((Stats) aggregations.get(ForecastStats.Fields.MEMORY)); + .fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.MEMORY)); StatsAccumulator recordStats = StatsAccumulator - .fromStatsAggregation((Stats) aggregations.get(ForecastStats.Fields.RECORDS)); + .fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.RECORDS)); StatsAccumulator runtimeStats = StatsAccumulator - .fromStatsAggregation((Stats) aggregations.get(ForecastStats.Fields.RUNTIME)); + .fromStatsAggregation((Stats) aggregationsAsMap.get(ForecastStats.Fields.RUNTIME)); CountAccumulator statusCount = CountAccumulator - .fromTermsAggregation((StringTerms) aggregations.get(ForecastStats.Fields.STATUSES)); - - ForecastStats forecastStats = new ForecastStats(totalHits, memoryStats, recordStats, - runtimeStats, statusCount); + .fromTermsAggregation((StringTerms) aggregationsAsMap.get(ForecastStats.Fields.STATUSES)); + ForecastStats forecastStats = new ForecastStats(totalHits, memoryStats, recordStats, runtimeStats, statusCount); handler.accept(forecastStats); }, errorHandler), client::search); From d1fa2fcb5b9d3ddf5832116fc497d4c36cb8ad48 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 3 Jul 2018 07:46:41 +0200 Subject: [PATCH 09/11] fix checkstyle failure --- .../elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java index a675a7c0b3599..5893a863fe38f 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/MachineLearningFeatureSetTests.java @@ -41,7 +41,6 @@ import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStats; import org.elasticsearch.xpack.core.ml.stats.ForecastStatsTests; -import org.elasticsearch.xpack.core.ml.stats.StatsAccumulatorTests; import org.elasticsearch.xpack.core.watcher.support.xcontent.XContentSource; import org.junit.Before; From d1ed68d5e9e14df5d802bcc339c665cdc1df5478 Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Tue, 3 Jul 2018 09:26:40 +0200 Subject: [PATCH 10/11] fix JobStatsMonitoringDocTests --- .../monitoring/collector/ml/JobStatsMonitoringDocTests.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java index 266d321db0699..9d37073a426cc 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/ml/JobStatsMonitoringDocTests.java @@ -155,6 +155,9 @@ public void testToXContent() throws IOException { + "\"log_time\":1483315322002," + "\"timestamp\":1483228861001" + "}," + + "\"forecasts_stats\":{" + + "\"total\":0,\"forecasted_jobs\":0" + + "}," + "\"state\":\"opened\"," + "\"node\":{" + "\"id\":\"_node_id\"," From 5f91cbcb7572a52b2aad929cbdb9fd3794c237ef Mon Sep 17 00:00:00 2001 From: Hendrik Muhs Date: Wed, 4 Jul 2018 08:12:42 +0200 Subject: [PATCH 11/11] improve documentation --- x-pack/docs/en/rest-api/ml/jobcounts.asciidoc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc b/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc index df4362bb84ce6..d343cc23ae0ad 100644 --- a/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc +++ b/x-pack/docs/en/rest-api/ml/jobcounts.asciidoc @@ -191,21 +191,21 @@ The `forecasts_stats` object shows statistics about forecasts. It has the follow (long) The number of forecasts currently available for this model. `forecasted_jobs`:: - (long) Number of jobs that have at least one forecast. + (long) The number of jobs that have at least one forecast. `memory_bytes`:: - (object) Statistics about the memory usage: min, max, avg and total. + (object) Statistics about the memory usage: minimum, maximum, average and total. `records`:: - (object) Statistics about the number of forecast records: min, max, avg and total. + (object) Statistics about the number of forecast records: minimum, maximum, average and total. `processing_time_ms`:: - (object) Statistics about the forecast runtime in milliseconds: min, max, avg and total. + (object) Statistics about the forecast runtime in milliseconds: minimum, maximum, average and total. `status`:: (object) Counts per forecast status, for example: {"finished" : 2}. -NOTE: `memory_bytes`, `records`, `processing_time_ms`, `status` require at least 1 forecast, otherwise +NOTE: `memory_bytes`, `records`, `processing_time_ms` and `status` require at least 1 forecast, otherwise these fields are ommitted. [float]