Skip to content

Commit

Permalink
[AMORO-2257] Automatically create tags on snapshots hour-level for i…
Browse files Browse the repository at this point in the history
…ceberg Table Formats (#2411)

* Automatically create tags on snapshots hour-level for iceberg Table Formats

* Update configurations.md

* fixed

* spotless:apply

* Adjust some code, add test cases

* add `handleConfigChanged` and optimize `getTagTriggerTime`

* remove period size

* Optimize the code

* fix

* Add little comment

* calculate tag time etc at the beginning

* Update docs/user-guides/configurations.md

---------

Co-authored-by: huyuanfeng <huyuanfeng@huya.com>
Co-authored-by: wangtaohz <103108928+wangtaohz@users.noreply.github.com>
Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>
  • Loading branch information
4 people authored Jan 31, 2024
1 parent 0cddd95 commit c020c1c
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.ZoneId;
import java.util.Map;

/** Action to auto create tag for Iceberg Table. */
Expand All @@ -34,12 +34,20 @@ public class AutoCreateIcebergTagAction {

private final Table table;
private final TagConfiguration tagConfig;
private final LocalDateTime now;
private final LocalDateTime triggerTime;
private final String tagName;

public AutoCreateIcebergTagAction(Table table, TagConfiguration tagConfig, LocalDateTime now) {
public AutoCreateIcebergTagAction(
Table table, TagConfiguration tagConfig, LocalDateTime checkTime) {
this.table = table;
this.tagConfig = tagConfig;
this.now = now;

LocalDateTime tagTime =
tagConfig.getTriggerPeriod().getTagTime(checkTime, tagConfig.getTriggerOffsetMinutes());
// triggerTime = TagTime + triggerOffset
// The trigger time of the tag, which is the time when the tag is expected to be created.
this.triggerTime = tagTime.plusMinutes(tagConfig.getTriggerOffsetMinutes());
this.tagName = tagConfig.getTriggerPeriod().generateTagName(tagTime, tagConfig.getTagFormat());
}

public void execute() {
Expand All @@ -60,28 +68,21 @@ public void execute() {
}

private boolean tagExist() {
if (tagConfig.getTriggerPeriod() == TagConfiguration.Period.DAILY) {
return findTagOfToday() != null;
} else {
throw new IllegalArgumentException(
"unsupported trigger period " + tagConfig.getTriggerPeriod());
}
}

private String findTagOfToday() {
String name = generateTagName();
return table.refs().entrySet().stream()
.filter(entry -> entry.getValue().isTag())
.map(Map.Entry::getKey)
.filter(name::equals)
.findFirst()
.orElse(null);
String tag =
table.refs().entrySet().stream()
.filter(entry -> entry.getValue().isTag())
.map(Map.Entry::getKey)
.filter(tagName::equals)
.findFirst()
.orElse(null);
return tag != null;
}

private boolean createTag() {
Snapshot snapshot = findSnapshot(table, getTagTriggerTime());
long tagTriggerTimestampMillis = getTagTriggerTimestampMillis();
Snapshot snapshot = findSnapshot(table, tagTriggerTimestampMillis);
if (snapshot == null) {
LOG.info("Found no snapshot at {} for {}", getTagTriggerTime(), table.name());
LOG.info("Found no snapshot at {} for {}", tagTriggerTimestampMillis, table.name());
return false;
}
if (exceedMaxDelay(snapshot)) {
Expand All @@ -91,14 +92,13 @@ private boolean createTag() {
snapshot.snapshotId(),
snapshot.timestampMillis(),
tagConfig.getMaxDelayMinutes(),
getTagTriggerTime());
tagTriggerTimestampMillis);
return false;
}
String newTagName = generateTagName();
table.manageSnapshots().createTag(newTagName, snapshot.snapshotId()).commit();
table.manageSnapshots().createTag(tagName, snapshot.snapshotId()).commit();
LOG.info(
"Created a tag {} for {} on snapshot {} at {}",
newTagName,
tagName,
table.name(),
snapshot.snapshotId(),
snapshot.timestampMillis());
Expand All @@ -109,22 +109,12 @@ private boolean exceedMaxDelay(Snapshot snapshot) {
if (tagConfig.getMaxDelayMinutes() <= 0) {
return false;
}
long delay = snapshot.timestampMillis() - getTagTriggerTime();
long delay = snapshot.timestampMillis() - getTagTriggerTimestampMillis();
return delay > tagConfig.getMaxDelayMinutes() * 60_000L;
}

private String generateTagName() {
if (tagConfig.getTriggerPeriod() == TagConfiguration.Period.DAILY) {
String tagFormat = tagConfig.getTagFormat();
return now.minusDays(1).format(DateTimeFormatter.ofPattern(tagFormat));
} else {
throw new IllegalArgumentException(
"unsupported trigger period " + tagConfig.getTriggerPeriod());
}
}

private long getTagTriggerTime() {
return tagConfig.getTriggerPeriod().getTagTriggerTime(now, tagConfig.getTriggerOffsetMinutes());
private long getTagTriggerTimestampMillis() {
return triggerTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
}

private static Snapshot findSnapshot(Table table, long tagTriggerTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@
import com.netease.arctic.utils.CompatiblePropertyUtil;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.Locale;
import java.util.Map;

Expand All @@ -35,7 +36,7 @@
public class TagConfiguration {
// tag.auto-create.enabled
private boolean autoCreateTag = false;
// tag.auto-create.daily.tag-format
// tag.auto-create.tag-format
private String tagFormat;
// tag.auto-create.trigger.period
private Period triggerPeriod;
Expand All @@ -48,10 +49,25 @@ public class TagConfiguration {
public enum Period {
DAILY("daily") {
@Override
public long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
LocalTime offsetTime = LocalTime.ofSecondOfDay(triggerOffsetMinutes * 60L);
LocalDateTime triggerTime = LocalDateTime.of(checkTime.toLocalDate(), offsetTime);
return triggerTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
protected Duration periodDuration() {
return Duration.ofDays(1);
}

@Override
public LocalDateTime getTagTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
return checkTime.minusMinutes(triggerOffsetMinutes).truncatedTo(ChronoUnit.DAYS);
}
},

HOURLY("hourly") {
@Override
protected Duration periodDuration() {
return Duration.ofHours(1);
}

@Override
public LocalDateTime getTagTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
return checkTime.minusMinutes(triggerOffsetMinutes).truncatedTo(ChronoUnit.HOURS);
}
};

Expand All @@ -65,14 +81,25 @@ public String propertyName() {
return propertyName;
}

protected abstract Duration periodDuration();

/**
* Obtain the trigger time for creating a tag, which is the idea time of the last tag before the
* Obtain the tag time for creating a tag, which is the ideal time of the last tag before the
* check time.
*
* <p>For example, when creating a daily tag, the check time is 2022-08-08 11:00:00 and the
* offset is set to be 5 min, the idea trigger time is 2022-08-08 00:05:00.
* offset is set to be 5 min, the idea tag time is 2022-08-08 00:00:00.
*
* <p>For example, when creating a daily tag, the offset is set to be 30 min, if the check time
* is 2022-08-08 02:00:00, the ideal tag time is 2022-08-08 00:00:00; if the check time is
* 2022-08-09 00:20:00 (before 00:30 of the next day), the ideal tag time is still 2022-08-08
* 00:00:00.
*/
public abstract long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes);
public abstract LocalDateTime getTagTime(LocalDateTime checkTime, int triggerOffsetMinutes);

public String generateTagName(LocalDateTime tagTime, String tagFormat) {
return tagTime.minus(periodDuration()).format(DateTimeFormatter.ofPattern(tagFormat));
}
}

public static TagConfiguration parse(Map<String, String> tableProperties) {
Expand All @@ -82,18 +109,29 @@ public static TagConfiguration parse(Map<String, String> tableProperties) {
tableProperties,
TableProperties.ENABLE_AUTO_CREATE_TAG,
TableProperties.ENABLE_AUTO_CREATE_TAG_DEFAULT));
tagConfig.setTagFormat(
CompatiblePropertyUtil.propertyAsString(
tableProperties,
TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT,
TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT_DEFAULT));
tagConfig.setTriggerPeriod(
Period.valueOf(
CompatiblePropertyUtil.propertyAsString(
tableProperties,
TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD,
TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD_DEFAULT)
.toUpperCase(Locale.ROOT)));

String defaultFormat;
switch (tagConfig.getTriggerPeriod()) {
case DAILY:
defaultFormat = TableProperties.AUTO_CREATE_TAG_FORMAT_DAILY_DEFAULT;
break;
case HOURLY:
defaultFormat = TableProperties.AUTO_CREATE_TAG_FORMAT_HOURLY_DEFAULT;
break;
default:
throw new IllegalArgumentException(
"Unsupported trigger period: " + tagConfig.getTriggerPeriod());
}
tagConfig.setTagFormat(
CompatiblePropertyUtil.propertyAsString(
tableProperties, TableProperties.AUTO_CREATE_TAG_FORMAT, defaultFormat));
tagConfig.setTriggerOffsetMinutes(
CompatiblePropertyUtil.propertyAsInt(
tableProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.server.optimizing.maintainer.TableMaintainer;
import com.netease.arctic.server.table.TableConfiguration;
import com.netease.arctic.server.table.TableManager;
import com.netease.arctic.server.table.TableRuntime;
import org.slf4j.Logger;
Expand Down Expand Up @@ -60,4 +61,9 @@ protected void execute(TableRuntime tableRuntime) {
LOG.error("Failed to create tags on {}", tableRuntime.getTableIdentifier(), t);
}
}

@Override
public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) {
scheduleIfNecessary(tableRuntime, getStartDelay());
}
}
Loading

0 comments on commit c020c1c

Please sign in to comment.