Skip to content

Commit

Permalink
[core] add support for negative completenessDelay
Browse files Browse the repository at this point in the history
  • Loading branch information
cyrilou242 committed Oct 11, 2024
1 parent bd681a3 commit 34783a6
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static ai.startree.thirdeye.spi.util.AlertMetadataUtils.getDateTimeZone;
import static ai.startree.thirdeye.spi.util.AlertMetadataUtils.getDelay;
import static ai.startree.thirdeye.spi.util.AlertMetadataUtils.getGranularity;
import static com.google.common.base.Preconditions.checkState;

import ai.startree.thirdeye.spi.datalayer.dto.AlertMetadataDTO;
import ai.startree.thirdeye.spi.datalayer.dto.AlertTemplateDTO;
Expand Down Expand Up @@ -60,20 +61,37 @@ public static Interval computeCorrectedInterval(final @Nullable Long alertId, fi
// apply delay correction
final Period delay = getDelay(metadata);
final DateTime dataWatermark = DateTime.now(chronology).minus(delay);
if (correctedEnd.isAfter(dataWatermark)) {
correctedEnd = dataWatermark;
LOG.info(
"Applied delay correction of {} for id {} between {} and {}. Corrected end time is {}",
delay, alertId, taskStart, taskEnd, correctedEnd);
}
if (correctedEnd.isBefore(correctedStart)) {
correctedStart = correctedStart.minus(delay);
LOG.warn(
"EndTime with delay correction {} is before startTime {}. This can happen if delay configuration is changed to a bigger value. "
+ "Applied delay correction to startTime. Detection may rerun on a timeframe on which it already run with a different config",
correctedEnd,
correctedStart);
final boolean completenessDelayIsPositive = delay.toStandardDuration().getMillis() >= 0;
if (completenessDelayIsPositive) {
if (correctedEnd.isAfter(dataWatermark)) {
correctedEnd = dataWatermark;
LOG.info(
"Applied delay correction of {} for id {} between {} and {}. Corrected end time is {}",
delay, alertId, taskStart, taskEnd, correctedEnd);
}
if (correctedEnd.isBefore(correctedStart)) {
correctedStart = correctedStart.minus(delay);
LOG.warn(
"EndTime with delay correction {} is before startTime {}. This can happen if delay configuration is changed to a bigger value. "
+ "Applied delay correction to startTime. Detection may rerun on a timeframe on which it already run with a different config",
correctedEnd,
correctedStart);
}
} else {
// use case: completenessDelay is negative: -6 hours. Cron runs at 18:00. DataWatermark is 18 - -6=24 --> the day can be considered complete
// useful if we know the data is ready to analyze at 6pm and the rest of the data is not relevant
if (correctedEnd.isBefore(dataWatermark)) {
correctedEnd = dataWatermark;
LOG.info(
"Applied delay correction of {} for id {} between {} and {}. Corrected end time is {}",
delay, alertId, taskStart, taskEnd, correctedEnd);
} else {
LOG.error("Task end is after the data watermark, even if the data watermark is in the future because the completenessDelay is negative. This should not happen, except for detection tasks created manually. Please reach out to support.");
}
// should always be true because correctedEnd can only get bigger with the logic above
checkState(correctedStart.isBefore(correctedEnd));
}


// apply granularity correction
final Period granularity = getGranularity(metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,35 @@ public void testGetCorrectedIntervalWithDelayAppliedAndGranularity() {
assertThat(output).isEqualTo(expected);
}

@Test
public void testGetCorrectedIntervalWithNegativeDelayAppliedAndGranularity() {
// test that timeframe is changed when there is a negative delay
final DateTime inputTaskEnd = new DateTime(Constants.DEFAULT_CHRONOLOGY);
final DateTime inputTaskStart = inputTaskEnd.minus(Period.days(1));
final Period granularity = isoPeriod("P1D");
final Period completenessDelay = isoPeriod("PT-24H");

final DatasetConfigDTO datasetConfigDTO = new DatasetConfigDTO()
.setDataset(DATASET_NAME)
// delay of -6 hours will be applied
.setCompletenessDelay(completenessDelay.toString());
final AlertTemplateDTO inputAlertTemplate = new AlertTemplateDTO()
.setMetadata(new AlertMetadataDTO()
.setDataset(datasetConfigDTO)
.setGranularity(granularity.toString()) // granularity of 1 day
);
final Interval output = computeCorrectedInterval(ALERT_ID,
inputTaskStart.getMillis(), inputTaskEnd.getMillis(), inputAlertTemplate);

final Interval expected = new Interval(
TimeUtils.floorByPeriod(inputTaskStart, granularity),
TimeUtils.floorByPeriod(inputTaskEnd.minus(completenessDelay), granularity));

assertThat(output).isEqualTo(expected);
// end is in the future because completenessDelay is negative
assertThat(output.getEnd()).isGreaterThan(DateTime.now());
}

@Test
public void testGetCorrectedIntervalWithStartTimeGreaterThanEndTimeMinusDelay() {
// test that both start and endTime are changed when end-delay < start
Expand Down

0 comments on commit 34783a6

Please sign in to comment.