Skip to content

Commit

Permalink
add handleConfigChanged and optimize getTagTriggerTime
Browse files Browse the repository at this point in the history
  • Loading branch information
huyuanfeng committed Dec 14, 2023
1 parent f65ffa8 commit 9e3a862
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
import com.netease.arctic.utils.CompatiblePropertyUtil;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.Locale;
import java.util.Map;

Expand All @@ -50,30 +51,24 @@ public class TagConfiguration {
public enum Period {
DAILY("daily") {
@Override
public long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
long periodSize = 60 * 60 * 24 * 1000L;
return getPeriodEndWithOffset(checkTime, triggerOffsetMinutes, periodSize);
protected TemporalUnit getPeriodUnit() {
return ChronoUnit.DAYS;
}

@Override
public LocalDateTime normalizeToTagTime(long triggerTime, int triggerOffsetMinutes) {
return LocalDateTime.ofInstant(Instant.ofEpochMilli(triggerTime), ZoneId.systemDefault())
.minus(triggerOffsetMinutes, ChronoUnit.MINUTES)
.minus(1, ChronoUnit.DAYS);
protected Duration periodSize() {
return Duration.ofDays(1);
}
},
HOURLY("hourly") {
@Override
public long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
long periodSize = 60 * 60 * 1000L;
return getPeriodEndWithOffset(checkTime, triggerOffsetMinutes, periodSize);
protected TemporalUnit getPeriodUnit() {
return ChronoUnit.HOURS;
}

@Override
public LocalDateTime normalizeToTagTime(long triggerTime, int triggerOffsetMinutes) {
return LocalDateTime.ofInstant(Instant.ofEpochMilli(triggerTime), ZoneId.systemDefault())
.minus(triggerOffsetMinutes, ChronoUnit.MINUTES)
.minus(1, ChronoUnit.HOURS);
protected Duration periodSize() {
return Duration.ofHours(1);
}
};

Expand All @@ -87,39 +82,39 @@ public String propertyName() {
return propertyName;
}

protected abstract TemporalUnit getPeriodUnit();

protected abstract Duration periodSize();

/**
* Obtain the trigger time for creating a tag, which is the idea time of the last tag before the
* check time.
* Obtain the trigger time for creating a tag, which is the ideal time of the last tag before
* the check time.
*
* <p>For example, when creating a daily tag, the check time is 2022-08-08 11:00:00 and the
* offset is set to be 5 min, the idea trigger time is 2022-08-08 00:05:00.
*/
public abstract long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes);

public abstract LocalDateTime normalizeToTagTime(long triggerTime, int triggerOffsetMinutes);

/**
* calculate the end of the period adjusted with a specified offset.
*
* <p>The calculation takes into account the provided period size and offset. It first
* calculates an offset timestamp, which is essentially `checkTime` minus offset minutes. Then
* it calculates the delta of the division of that timestamp by the period size, then applies
* the delta to the original timestamp to get the period start timestamp.
* <p>For example, when creating a daily tag, the offset is set to be 30 min, if the check time
* is 2022-08-08 02:00:00, the ideal trigger time is 2022-08-08 00:30:00; if the check time is
* 2022-08-09 00:20:00 (before 00:30 of the next day), the ideal trigger time is still
* 2022-08-08 00:30:00.
*/
protected Long getPeriodEndWithOffset(
LocalDateTime checkTime, int triggerOffsetMinutes, long periodSize) {
long timestamp = checkTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli();
long offset = triggerOffsetMinutes * 60_000L;
final long remainder = (timestamp - offset) % periodSize;
long windowStartTimestamp;
if (remainder < 0) {
windowStartTimestamp = timestamp - (remainder + periodSize);
} else {
windowStartTimestamp = timestamp - remainder;
}
LocalDateTime windowStart =
LocalDateTime.ofInstant(Instant.ofEpochMilli(windowStartTimestamp), ZoneOffset.UTC);
return windowStart.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
public long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
checkTime = checkTime.plus(periodSize());

return checkTime
.minusMinutes(triggerOffsetMinutes)
.truncatedTo(getPeriodUnit())
.plus(periodSize())
.plusMinutes(triggerOffsetMinutes)
.atZone(ZoneId.systemDefault())
.toInstant()
.toEpochMilli();
}

public LocalDateTime normalizeToTagTime(long triggerTime, int triggerOffsetMinutes) {
return LocalDateTime.ofInstant(Instant.ofEpochMilli(triggerTime), ZoneId.systemDefault())
.minus(triggerOffsetMinutes, ChronoUnit.MINUTES)
.minus(1, getPeriodUnit());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.server.optimizing.maintainer.TableMaintainer;
import com.netease.arctic.server.table.TableConfiguration;
import com.netease.arctic.server.table.TableManager;
import com.netease.arctic.server.table.TableRuntime;
import org.slf4j.Logger;
Expand Down Expand Up @@ -60,4 +61,9 @@ protected void execute(TableRuntime tableRuntime) {
LOG.error("Failed to create tags on {}", tableRuntime.getTableIdentifier(), t);
}
}

@Override
public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) {
scheduleIfNecessary(tableRuntime, getStartDelay());
}
}

0 comments on commit 9e3a862

Please sign in to comment.