Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-2257] Automatically create tags on snapshots hour-level for iceberg Table Formats #2411

Merged
merged 20 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;

Expand Down Expand Up @@ -60,28 +61,22 @@ public void execute() {
}

private boolean tagExist() {
if (tagConfig.getTriggerPeriod() == TagConfiguration.Period.DAILY) {
return findTagOfToday() != null;
} else {
throw new IllegalArgumentException(
"unsupported trigger period " + tagConfig.getTriggerPeriod());
}
}

private String findTagOfToday() {
String name = generateTagName();
return table.refs().entrySet().stream()
.filter(entry -> entry.getValue().isTag())
.map(Map.Entry::getKey)
.filter(name::equals)
.findFirst()
.orElse(null);
String tag =
table.refs().entrySet().stream()
.filter(entry -> entry.getValue().isTag())
.map(Map.Entry::getKey)
.filter(name::equals)
.findFirst()
.orElse(null);
return tag != null;
}

private boolean createTag() {
Snapshot snapshot = findSnapshot(table, getTagTriggerTime());
long tagTriggerTime = getTagTriggerTime();
Snapshot snapshot = findSnapshot(table, tagTriggerTime);
if (snapshot == null) {
LOG.info("Found no snapshot at {} for {}", getTagTriggerTime(), table.name());
LOG.info("Found no snapshot at {} for {}", tagTriggerTime, table.name());
return false;
}
if (exceedMaxDelay(snapshot)) {
Expand Down Expand Up @@ -114,17 +109,18 @@ private boolean exceedMaxDelay(Snapshot snapshot) {
}

private String generateTagName() {
if (tagConfig.getTriggerPeriod() == TagConfiguration.Period.DAILY) {
String tagFormat = tagConfig.getTagFormat();
return now.minusDays(1).format(DateTimeFormatter.ofPattern(tagFormat));
} else {
throw new IllegalArgumentException(
"unsupported trigger period " + tagConfig.getTriggerPeriod());
}
LocalDateTime tagTime =
Copy link
Contributor

@zhoujinsong zhoujinsong Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, when I reviewed the code here, these three time-related variables made it difficult for me to understand. After carefully reviewing the code again, I have summarized the names and meanings of the three parameters:

  • Check time: the timestamp when the action is started.
  • Tag time: the timestamp of the last tag represents.
  • Trigger time: the timestamp of the last tag should be triggered.

And we calculated the 3 variables with the methods:

  • checkTime = now()
  • triggerTime = (checkTime - triggerOffset) % period + triggerOffest
  • TagTime = triggerTime - triggerOffest - period

then,

  • TagTime = (checkTime - triggerOffset) % period - period
  • TriggerTime = TagTime + period + triggerOffset

However, the tag time here does not correspond to the time of the tag. It is easier for us to generate the corresponding tag name based on this tag time, but it does not accurately represent the data within the tag.

We should calculate the 3 variables like :

  • checkTime = now()
  • TagTime = (checkTime - triggerOffset) % period
  • triggerTime = TagTime + triggerOffest

Then the value of the tag time is more accurate and the calculation methods are much simpler.

So I think we can make the code much easier to understand by:

  • Put the 3 variables together and try to calculate them at the beginning, put some comments to tell the meaning and the calculating rules.
  • Calculate the tag time first and then calculate the trigger time based on tag time.

tagConfig
.getTriggerPeriod()
.normalizeToTagTime(getTagTriggerTime(), tagConfig.getTriggerOffsetMinutes());
String tagFormat = tagConfig.getTagFormat();
return tagTime.format(DateTimeFormatter.ofPattern(tagFormat));
}

private long getTagTriggerTime() {
return tagConfig.getTriggerPeriod().getTagTriggerTime(now, tagConfig.getTriggerOffsetMinutes());
LocalDateTime tagTriggerTime =
tagConfig.getTriggerPeriod().getTagTriggerTime(now, tagConfig.getTriggerOffsetMinutes());
return tagTriggerTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
}

private static Snapshot findSnapshot(Table table, long tagTriggerTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import com.netease.arctic.utils.CompatiblePropertyUtil;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Locale;
import java.util.Map;

Expand All @@ -48,10 +50,30 @@ public class TagConfiguration {
public enum Period {
DAILY("daily") {
@Override
public long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
LocalTime offsetTime = LocalTime.ofSecondOfDay(triggerOffsetMinutes * 60L);
LocalDateTime triggerTime = LocalDateTime.of(checkTime.toLocalDate(), offsetTime);
return triggerTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
protected Duration onePeriodSize() {
return Duration.ofDays(1);
}

@Override
public LocalDateTime getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
return checkTime
.minusMinutes(triggerOffsetMinutes)
.truncatedTo(ChronoUnit.DAYS)
.plusMinutes(triggerOffsetMinutes);
}
},
HOURLY("hourly") {
@Override
protected Duration onePeriodSize() {
return Duration.ofHours(1);
}

@Override
public LocalDateTime getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
return checkTime
.minusMinutes(triggerOffsetMinutes)
.truncatedTo(ChronoUnit.HOURS)
.plusMinutes(triggerOffsetMinutes);
}
};

Expand All @@ -65,14 +87,28 @@ public String propertyName() {
return propertyName;
}

protected abstract Duration onePeriodSize();
huyuanfeng2018 marked this conversation as resolved.
Show resolved Hide resolved

/**
* Obtain the trigger time for creating a tag, which is the idea time of the last tag before the
* check time.
* Obtain the trigger time for creating a tag, which is the ideal time of the last tag before
* the check time.
*
* <p>For example, when creating a daily tag, the check time is 2022-08-08 11:00:00 and the
* offset is set to be 5 min, the idea trigger time is 2022-08-08 00:05:00.
*
* <p>For example, when creating a daily tag, the offset is set to be 30 min, if the check time
* is 2022-08-08 02:00:00, the ideal trigger time is 2022-08-08 00:30:00; if the check time is
* 2022-08-09 00:20:00 (before 00:30 of the next day), the ideal trigger time is still
* 2022-08-08 00:30:00.
*/
public abstract long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes);
public abstract LocalDateTime getTagTriggerTime(
LocalDateTime checkTime, int triggerOffsetMinutes);

public LocalDateTime normalizeToTagTime(long triggerTime, int triggerOffsetMinutes) {
huyuanfeng2018 marked this conversation as resolved.
Show resolved Hide resolved
return LocalDateTime.ofInstant(Instant.ofEpochMilli(triggerTime), ZoneId.systemDefault())
.minus(triggerOffsetMinutes, ChronoUnit.MINUTES)
.minus(onePeriodSize());
}
}

public static TagConfiguration parse(Map<String, String> tableProperties) {
Expand All @@ -82,18 +118,29 @@ public static TagConfiguration parse(Map<String, String> tableProperties) {
tableProperties,
TableProperties.ENABLE_AUTO_CREATE_TAG,
TableProperties.ENABLE_AUTO_CREATE_TAG_DEFAULT));
tagConfig.setTagFormat(
CompatiblePropertyUtil.propertyAsString(
tableProperties,
TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT,
TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT_DEFAULT));
tagConfig.setTriggerPeriod(
Period.valueOf(
CompatiblePropertyUtil.propertyAsString(
tableProperties,
TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD,
TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD_DEFAULT)
.toUpperCase(Locale.ROOT)));

String defaultFormat;
switch (tagConfig.getTriggerPeriod()) {
case DAILY:
defaultFormat = TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT_DAILY_DEFAULT;
break;
case HOURLY:
defaultFormat = TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT_HOURLY_DEFAULT;
break;
default:
throw new IllegalArgumentException(
"Unsupported trigger period: " + tagConfig.getTriggerPeriod());
}
tagConfig.setTagFormat(
CompatiblePropertyUtil.propertyAsString(
tableProperties, TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT, defaultFormat));
tagConfig.setTriggerOffsetMinutes(
CompatiblePropertyUtil.propertyAsInt(
tableProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.server.optimizing.maintainer.TableMaintainer;
import com.netease.arctic.server.table.TableConfiguration;
import com.netease.arctic.server.table.TableManager;
import com.netease.arctic.server.table.TableRuntime;
import org.slf4j.Logger;
Expand Down Expand Up @@ -60,4 +61,9 @@ protected void execute(TableRuntime tableRuntime) {
LOG.error("Failed to create tags on {}", tableRuntime.getTableIdentifier(), t);
}
}

@Override
public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) {
scheduleIfNecessary(tableRuntime, getStartDelay());
}
}
Loading