Skip to content

Commit

Permalink
[ML] Account for gaps in data counts after job is reopened (#30294)
Browse files Browse the repository at this point in the history
This commit fixes an issue with the data diagnostics were
empty buckets are not reported even though they should. Once
a job is reopened, the diagnostics do not get initialized from
the current data counts (especially the latest record timestamp).
The result is that if the data that is sent have a time gap compared
to the previous ones, that gap is not accounted for in the empty bucket
count.

This commit fixes that by initializing the diagnostics with the current
data counts.

Closes #30080
  • Loading branch information
dimitris-athanasiou committed May 3, 2018
1 parent b83ad84 commit cc254e3
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 20 deletions.
4 changes: 4 additions & 0 deletions docs/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ Security::
* Fixed `saml-metadata` env file such that it sources the appropriate
environment file.

Machine Learning::

* Account for gaps in data counts after job is reopened ({pull}30294[#30294])

//[float]
//=== Regressions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public DataCountsReporter(Settings settings, Job job, DataCounts counts, JobData

totalRecordStats = counts;
incrementalRecordStats = new DataCounts(job.getId());
diagnostics = new DataStreamDiagnostics(job);
diagnostics = new DataStreamDiagnostics(job, counts);

acceptablePercentDateParseErrors = ACCEPTABLE_PERCENTAGE_DATE_PARSE_ERRORS_SETTING.get(settings);
acceptablePercentOutOfOrderErrors = ACCEPTABLE_PERCENTAGE_OUT_OF_ORDER_ERRORS_SETTING.get(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
package org.elasticsearch.xpack.ml.job.process.diagnostics;

import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.utils.Intervals;

import java.util.Date;

/**
* A moving window of buckets that allow keeping
* track of some statistics like the bucket count,
Expand All @@ -33,12 +36,17 @@ class BucketDiagnostics {
private long latestFlushedBucketStartMs = -1;
private final BucketFlushListener bucketFlushListener;

BucketDiagnostics(Job job, BucketFlushListener bucketFlushListener) {
BucketDiagnostics(Job job, DataCounts dataCounts, BucketFlushListener bucketFlushListener) {
bucketSpanMs = job.getAnalysisConfig().getBucketSpan().millis();
latencyMs = job.getAnalysisConfig().getLatency() == null ? 0 : job.getAnalysisConfig().getLatency().millis();
maxSize = Math.max((int) (Intervals.alignToCeil(latencyMs, bucketSpanMs) / bucketSpanMs), MIN_BUCKETS);
buckets = new long[maxSize];
this.bucketFlushListener = bucketFlushListener;

Date latestRecordTimestamp = dataCounts.getLatestRecordTimeStamp();
if (latestRecordTimestamp != null) {
addRecord(latestRecordTimestamp.getTime());
}
}

void addRecord(long recordTimestampMs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;

import java.util.Date;

Expand All @@ -32,8 +33,8 @@ public class DataStreamDiagnostics {
private long sparseBucketCount = 0;
private long latestSparseBucketTime = -1;

public DataStreamDiagnostics(Job job) {
bucketDiagnostics = new BucketDiagnostics(job, createBucketFlushListener());
public DataStreamDiagnostics(Job job, DataCounts dataCounts) {
bucketDiagnostics = new BucketDiagnostics(job, dataCounts, createBucketFlushListener());
}

private BucketDiagnostics.BucketFlushListener createBucketFlushListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.junit.Before;

import java.util.Arrays;
Expand All @@ -20,6 +21,7 @@ public class DataStreamDiagnosticsTests extends ESTestCase {

private static final long BUCKET_SPAN = 60000;
private Job job;
private DataCounts dataCounts;

@Before
public void setUpMocks() {
Expand All @@ -32,10 +34,11 @@ public void setUpMocks() {
builder.setAnalysisConfig(acBuilder);
builder.setDataDescription(new DataDescription.Builder());
job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), null);
dataCounts = new DataCounts(job.getId());
}

public void testIncompleteBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

d.checkRecord(1000);
d.checkRecord(2000);
Expand Down Expand Up @@ -81,7 +84,7 @@ public void testIncompleteBuckets() {
}

public void testSimple() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

d.checkRecord(70000);
d.checkRecord(130000);
Expand All @@ -103,7 +106,7 @@ public void testSimple() {
}

public void testSimpleReverse() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

d.checkRecord(610000);
d.checkRecord(550000);
Expand All @@ -126,7 +129,7 @@ public void testSimpleReverse() {

public void testWithLatencyLessThanTenBuckets() {
job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(3 * BUCKET_SPAN));
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

long timestamp = 70000;
while (timestamp < 70000 + 20 * BUCKET_SPAN) {
Expand All @@ -141,7 +144,7 @@ public void testWithLatencyLessThanTenBuckets() {

public void testWithLatencyGreaterThanTenBuckets() {
job = createJob(TimeValue.timeValueMillis(BUCKET_SPAN), TimeValue.timeValueMillis(13 * BUCKET_SPAN + 10000));
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

long timestamp = 70000;
while (timestamp < 70000 + 20 * BUCKET_SPAN) {
Expand All @@ -155,7 +158,7 @@ public void testWithLatencyGreaterThanTenBuckets() {
}

public void testEmptyBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

d.checkRecord(10000);
d.checkRecord(70000);
Expand All @@ -177,7 +180,7 @@ public void testEmptyBuckets() {
}

public void testEmptyBucketsStartLater() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

d.checkRecord(1110000);
d.checkRecord(1170000);
Expand All @@ -199,7 +202,7 @@ public void testEmptyBucketsStartLater() {
}

public void testSparseBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
Expand Down Expand Up @@ -227,7 +230,7 @@ public void testSparseBuckets() {
* signal
*/
public void testSparseBucketsLast() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
Expand Down Expand Up @@ -255,7 +258,7 @@ public void testSparseBucketsLast() {
* signal on the 2nd to last
*/
public void testSparseBucketsLastTwo() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
Expand All @@ -280,7 +283,7 @@ public void testSparseBucketsLastTwo() {
}

public void testMixedEmptyAndSparseBuckets() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
Expand Down Expand Up @@ -308,7 +311,7 @@ public void testMixedEmptyAndSparseBuckets() {
* whether counts are right.
*/
public void testEmptyBucketsLongerOutage() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

d.checkRecord(10000);
d.checkRecord(70000);
Expand Down Expand Up @@ -336,7 +339,7 @@ public void testEmptyBucketsLongerOutage() {
* The number of sparse buckets should not be to much, it could be normal.
*/
public void testSparseBucketsLongerPeriod() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);

sendManyDataPoints(d, 10000, 69000, 1000);
sendManyDataPoints(d, 70000, 129000, 1200);
Expand Down Expand Up @@ -374,7 +377,7 @@ private static Job createJob(TimeValue bucketSpan, TimeValue latency) {
}

public void testFlushAfterZeroRecords() {
DataStreamDiagnostics d = new DataStreamDiagnostics(job);
DataStreamDiagnostics d = new DataStreamDiagnostics(job, dataCounts);
d.flush();
assertEquals(0, d.getBucketCount());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public void testMiniFarequoteReopen() throws Exception {
assertEquals(0, responseBody.get("invalid_date_count"));
assertEquals(0, responseBody.get("missing_field_count"));
assertEquals(0, responseBody.get("out_of_order_timestamp_count"));
assertEquals(0, responseBody.get("bucket_count"));
assertEquals(1000, responseBody.get("bucket_count"));

// unintuitive: should return the earliest record timestamp of this feed???
assertEquals(null, responseBody.get("earliest_record_timestamp"));
Expand All @@ -266,7 +266,7 @@ public void testMiniFarequoteReopen() throws Exception {
assertEquals(0, dataCountsDoc.get("invalid_date_count"));
assertEquals(0, dataCountsDoc.get("missing_field_count"));
assertEquals(0, dataCountsDoc.get("out_of_order_timestamp_count"));
assertEquals(0, dataCountsDoc.get("bucket_count"));
assertEquals(1000, dataCountsDoc.get("bucket_count"));
assertEquals(1403481600000L, dataCountsDoc.get("earliest_record_timestamp"));
assertEquals(1407082000000L, dataCountsDoc.get("latest_record_timestamp"));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.integration;

import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.xpack.core.ml.action.GetBucketsAction;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Detector;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.junit.After;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.equalTo;

/**
* Tests that after reopening a job and sending more
* data after a gap, data counts are reported correctly.
*/
public class ReopenJobWithGapIT extends MlNativeAutodetectIntegTestCase {

private static final String JOB_ID = "reopen-job-with-gap-test";
private static final long BUCKET_SPAN_SECONDS = 3600;

@After
public void cleanUpTest() {
cleanUp();
}

public void test() throws Exception {
AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(
Collections.singletonList(new Detector.Builder("count", null).build()));
analysisConfig.setBucketSpan(TimeValue.timeValueSeconds(BUCKET_SPAN_SECONDS));
DataDescription.Builder dataDescription = new DataDescription.Builder();
dataDescription.setTimeFormat("epoch");
Job.Builder job = new Job.Builder(JOB_ID);
job.setAnalysisConfig(analysisConfig);
job.setDataDescription(dataDescription);

registerJob(job);
putJob(job);
openJob(job.getId());

long timestamp = 1483228800L; // 2017-01-01T00:00:00Z
List<String> data = new ArrayList<>();
for (int i = 0; i < 10; i++) {
data.add(createJsonRecord(createRecord(timestamp)));
timestamp += BUCKET_SPAN_SECONDS;
}

postData(job.getId(), data.stream().collect(Collectors.joining()));
flushJob(job.getId(), true);
closeJob(job.getId());

GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId());
request.setExcludeInterim(true);
assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(9L));
assertThat(getJobStats(job.getId()).get(0).getDataCounts().getBucketCount(), equalTo(9L));

timestamp += 10 * BUCKET_SPAN_SECONDS;
data = new ArrayList<>();
for (int i = 0; i < 10; i++) {
data.add(createJsonRecord(createRecord(timestamp)));
timestamp += BUCKET_SPAN_SECONDS;
}

openJob(job.getId());
postData(job.getId(), data.stream().collect(Collectors.joining()));
flushJob(job.getId(), true);
closeJob(job.getId());

assertThat(client().execute(GetBucketsAction.INSTANCE, request).actionGet().getBuckets().count(), equalTo(29L));
DataCounts dataCounts = getJobStats(job.getId()).get(0).getDataCounts();
assertThat(dataCounts.getBucketCount(), equalTo(29L));
assertThat(dataCounts.getEmptyBucketCount(), equalTo(10L));
}

private static Map<String, Object> createRecord(long timestamp) {
Map<String, Object> record = new HashMap<>();
record.put("time", timestamp);
return record;
}
}

0 comments on commit cc254e3

Please sign in to comment.