Skip to content

Commit 4c04803

Browse files
authored
[ML] fixes bug with composite agg datafeed extraction (#71221) (#71246)
If a `max` value in a given composite aggregation bucket is the same as the current after page floor, the datafeed could cancel processing composite aggregation pages too early. It will see that `max` timestamp aligned with the interval and stop processing. This is a bug. There may be still other terms to process within that `date_histogram` bucket in subsequent pages as the order of the buckets are done by term NOT by max timestamp. This commit corrects this to verify that if the process is canceled, the datafeed continues to finish out the current date_histogram bucket, regardless if the first timestamp seen after cancelling aligns with the current page or not. closes #71212
1 parent 3a625c9 commit 4c04803

File tree

2 files changed

+11
-8
lines changed

2 files changed

+11
-8
lines changed

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractor.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,20 +168,24 @@ private InputStream processAggs(Aggregations aggs) throws IOException {
168168
));
169169
aggregationToJsonProcessor.process(aggs);
170170
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
171-
final boolean hasAfterKey = afterKey != null && (afterKey.get(context.compositeAggDateHistogramGroupSourceName) instanceof Long);
171+
final Long afterKeyTimeBucket = afterKey != null ? (Long)afterKey.get(context.compositeAggDateHistogramGroupSourceName) : null ;
172172
boolean cancellable = aggregationToJsonProcessor.writeAllDocsCancellable(
173173
timestamp -> {
174174
if (isCancelled) {
175175
// If we have not processed a single composite agg page yet and we are cancelled
176176
// We should not process anything
177-
if (hasAfterKey == false) {
177+
if (afterKeyTimeBucket == null) {
178178
return true;
179179
}
180+
// We want to stop processing once a timestamp enters the next time bucket.
181+
// This could occur in any page. One benefit we have is that even though the paging order is not sorted
182+
// by max timestamp, our iteration of the page results is. So, once we cross over to the next bucket within
183+
// a given page, we know the previous bucket has been exhausted.
180184
if (nextBucketOnCancel == 0L) {
181-
// If we have been cancelled, record the bucket above our latest timestamp
182-
// This indicates when we have completed the current bucket of this timestamp and thus will move to the next
183-
// date_histogram bucket
184-
nextBucketOnCancel = Intervals.alignToCeil(timestamp, interval);
185+
// This simple equation handles two unique scenarios:
186+
// If the timestamp is the current floor, this means we need to keep processing until the next timebucket
187+
// If we are not matching the current bucket floor, then this simply aligns to the next bucket
188+
nextBucketOnCancel = Intervals.alignToFloor(timestamp + interval, interval);
185189
LOGGER.debug(() -> new ParameterizedMessage(
186190
"[{}] set future timestamp cancel to [{}] via timestamp [{}]",
187191
context.jobId,
@@ -200,7 +204,7 @@ private InputStream processAggs(Aggregations aggs) throws IOException {
200204
"[{}] cancelled before bucket [{}] on date_histogram page [{}]",
201205
context.jobId,
202206
nextBucketOnCancel,
203-
hasAfterKey ? afterKey.get(context.compositeAggDateHistogramGroupSourceName) : "__null__"
207+
afterKeyTimeBucket != null ? afterKeyTimeBucket : "__null__"
204208
)
205209
);
206210
hasNext = false;

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/CompositeAggregationDataExtractorTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,6 @@ public void testExtractionCancelOnFirstPage() throws IOException {
267267
assertThat(extractor.hasNext(), is(false));
268268
}
269269

270-
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/71212")
271270
public void testExtractionGivenCancelHalfWay() throws IOException {
272271
int numBuckets = 10;
273272
List<CompositeAggregation.Bucket> buckets = new ArrayList<>(numBuckets);

0 commit comments

Comments
 (0)