Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Account for gaps in data counts after job is reopened #30294

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ Do not ignore request analysis/similarity settings on index resize operations wh

Fix NPE when CumulativeSum agg encounters null value/empty bucket ({pull}29641[#29641])

Machine Learning::

* Account for gaps in data counts after job is reopened ({pull}30294[#30294])

//[float]
//=== Regressions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobData

totalRecordStats = counts;
incrementalRecordStats = new DataCounts(job.getId());
diagnostics = new DataStreamDiagnostics(job);
diagnostics = new DataStreamDiagnostics(job, counts);

acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings);
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
package org.elasticsearch.xpack.ml.job.process.diagnostics;

import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.utils.Intervals;

import java.util.Date;

/**
* A moving window of buckets that allow keeping
* track of some statistics like the bucket count,
Expand All @@ -33,12 +36,17 @@ class BucketDiagnostics {
private long latestFlushedBucketStartMs = -1;
private final BucketFlushListener bucketFlushListener;

BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) {
BucketDiagnostics(Job job, DataCounts dataCounts, BucketFlushListener bucketFlushListener) {
bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis();
latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis();
maxSize = Math.max((int) (Intervals.alignToCeil(latencyMs, bucketSpanMs) / bucketSpanMs), MIN_BUCKETS);
buckets = new long[maxSize];
this.bucketFlushListener = bucketFlushListener;

Date latestRecordTimestamp = dataCounts.getLatestRecordTimeStamp();
if (latestRecordTimestamp != null) {
addRecord(latestRecordTimestamp.getTime());
}
}

void addRecord(long recordTimestampMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;

import java.util.Date;

Expand All @@ -32,8 +33,8 @@ public class DataStreamDiagnostics {
private long sparseBucketCount = 0;
private long latestSparseBucketTime = -1;

public DataStreamDiagnostics(Job job) {
bucketDiagnostics = new BucketDiagnostics(job, createBucketFlushListener());
public DataStreamDiagnostics(Job job, DataCounts dataCounts) {
bucketDiagnostics = new BucketDiagnostics(job, dataCounts, createBucketFlushListener());
}

private BucketDiagnostics.BucketFlushListener createBucketFlushListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.junit.Before;

import java.util.Arrays;
Expand All @@ -20,6 +21,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {

private static final long BUCKET_SPAN = 60000;
private Job job;
private DataCounts dataCounts;

@Before
public void setUpMocks() {
Expand All @@ -32,10 +34,11 @@ public void setUpMocks() {
builder.setAnalysisConfig(acBuilder);
builder.setDataDescription(new DataDescription.Builder());
job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), null);
dataCounts = new DataCounts(job.getId());
}

public void testIncompleteBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

d.checkRecord(1000);
d.checkRecord(2000);
Expand Down Expand Up @@ -81,7 +84,7 @@ public void testIncompleteBuckets() {
}

public void testSimple() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

d.checkRecord(70000);
d.checkRecord(130000);
Expand All @@ -103,7 +106,7 @@ public void testSimple() {
}

public void testSimpleReverse() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

d.checkRecord(610000);
d.checkRecord(550000);
Expand All @@ -126,7 +129,7 @@ public void testSimpleReverse() {

public void testWithLatencyLessThanTenBuckets() {
job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(3 * BUCKET_SPAN));
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

long timestamp = 70000;
while (timestamp < 70000 + 20 * BUCKET_SPAN) {
Expand All @@ -141,7 +144,7 @@ public void testWithLatencyLessThanTenBuckets() {

public void testWithLatencyGreaterThanTenBuckets() {
job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(13 * BUCKET_SPAN + 10000));
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

long timestamp = 70000;
while (timestamp < 70000 + 20 * BUCKET_SPAN) {
Expand All @@ -155,7 +158,7 @@ public void testWithLatencyGreaterThanTenBuckets() {
}

public void testEmptyBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

d.checkRecord(10000);
d.checkRecord(70000);
Expand All @@ -177,7 +180,7 @@ public void testEmptyBuckets() {
}

public void testEmptyBucketsStartLater() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

d.checkRecord(1110000);
d.checkRecord(1170000);
Expand All @@ -199,7 +202,7 @@ public void testEmptyBucketsStartLater() {
}

public void testSparseBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
Expand Down Expand Up @@ -227,7 +230,7 @@ public void testSparseBuckets() {
* signal
*/
public void testSparseBucketsLast() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
Expand Down Expand Up @@ -255,7 +258,7 @@ public void testSparseBucketsLast() {
* signal on the 2nd to last
*/
public void testSparseBucketsLastTwo() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
Expand All @@ -280,7 +283,7 @@ public void testSparseBucketsLastTwo() {
}

public void testMixedEmptyAndSparseBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
Expand Down Expand Up @@ -308,7 +311,7 @@ public void testMixedEmptyAndSparseBuckets() {
* whether counts are right.
*/
public void testEmptyBucketsLongerOutage() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

d.checkRecord(10000);
d.checkRecord(70000);
Expand Down Expand Up @@ -336,7 +339,7 @@ public void testEmptyBucketsLongerOutage() {
* The number of sparse buckets should not be to much, it could be normal.
*/
public void testSparseBucketsLongerPeriod() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
Expand Down Expand Up @@ -374,7 +377,7 @@ private static Job createJob(TimeValue bucketSpan, TimeValue latency) {
}

public void testFlushAfterZeroRecords() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
d.flush();
assertEquals(0, d.getBucketCount());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void testMiniFarequoteReopen() throws Exception {
assertEquals(0, responseBody.get("invalid_date_count"));
assertEquals(0, responseBody.get("missing_field_count"));
assertEquals(0, responseBody.get("out_of_order_timestamp_count"));
assertEquals(0, responseBody.get("bucket_count"));
assertEquals(1000, responseBody.get("bucket_count"));

// unintuitive: should return the earliest record timestamp of this feed???
assertEquals(null, responseBody.get("earliest_record_timestamp"));
Expand All @@ -266,7 +266,7 @@ public void testMiniFarequoteReopen() throws Exception {
assertEquals(0, dataCountsDoc.get("invalid_date_count"));
assertEquals(0, dataCountsDoc.get("missing_field_count"));
assertEquals(0, dataCountsDoc.get("out_of_order_timestamp_count"));
assertEquals(0, dataCountsDoc.get("bucket_count"));
assertEquals(1000, dataCountsDoc.get("bucket_count"));
assertEquals(1403481600000L, dataCountsDoc.get("earliest_record_timestamp"));
assertEquals(1407082000000L, dataCountsDoc.get("latest_record_timestamp"));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.junit.After;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;

/**
* Tests that after reopening a job and sending more
* data after a gap, data counts are reported correctly.
*/
public class ReopenJobWithGapIT extends MlNativeAutodetectIntegTestCase {

private static final String JOB_ID = "reopen-job-with-gap-test";
private static final long BUCKET_SPAN_SECONDS = 3600;

@After
public void cleanUpTest() {
cleanUp();
}

public void test() throws Exception {
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(
Collections.singletonList(new Detector.Builder("count", null).build()));
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS));
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch");
Job.Builder job = new Job.Builder(JOB_ID);
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);

registerJob(job);
putJob(job);
openJob(job.getId());

long timestamp = 1483228800L; // 2017-01-01T00:00:00Z
List<String> data = new ArrayList<>();
for (int i = 0; i < 10; i++) {
data.add(createJsonRecord(createRecord(timestamp)));
timestamp += BUCKET_SPAN_SECONDS;
}

postData(job.getId(), data.stream().collect(Collectors.joining()));
flushJob(job.getId(), true);
closeJob(job.getId());

GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId());
request.setExcludeInterim(true);
assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(9L));
assertThat(getJobStats(job.getId()).get(0).getDataCounts().getBucketCount(), equalTo(9L));

timestamp += 10 * BUCKET_SPAN_SECONDS;
data = new ArrayList<>();
for (int i = 0; i < 10; i++) {
data.add(createJsonRecord(createRecord(timestamp)));
timestamp += BUCKET_SPAN_SECONDS;
}

openJob(job.getId());
postData(job.getId(), data.stream().collect(Collectors.joining()));
flushJob(job.getId(), true);
closeJob(job.getId());

assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(29L));
DataCounts dataCounts = getJobStats(job.getId()).get(0).getDataCounts();
assertThat(dataCounts.getBucketCount(), equalTo(29L));
assertThat(dataCounts.getEmptyBucketCount(), equalTo(10L));
}

private static Map<String, Object> createRecord(long timestamp) {
Map<String, Object> record = new HashMap<>();
record.put("time", timestamp);
return record;
}
}