Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-2257] Automatically create tags on snapshots hour-level for iceberg Table Formats #2411

Merged
merged 20 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Map;

Expand Down Expand Up @@ -60,28 +61,22 @@
}

private boolean tagExist() {
if (tagConfig.getTriggerPeriod() == TagConfiguration.Period.DAILY) {
return findTagOfToday() != null;
} else {
throw new IllegalArgumentException(
"unsupported trigger period " + tagConfig.getTriggerPeriod());
}
}

private String findTagOfToday() {
String name = generateTagName();
return table.refs().entrySet().stream()
.filter(entry -> entry.getValue().isTag())
.map(Map.Entry::getKey)
.filter(name::equals)
.findFirst()
.orElse(null);
String tag =
table.refs().entrySet().stream()
.filter(entry -> entry.getValue().isTag())
.map(Map.Entry::getKey)
.filter(name::equals)
.findFirst()
.orElse(null);
return tag != null;
}

private boolean createTag() {
Snapshot snapshot = findSnapshot(table, getTagTriggerTime());
long tagTriggerTimestampMillis = getTagTriggerTimestampMillis();
Snapshot snapshot = findSnapshot(table, tagTriggerTimestampMillis);
if (snapshot == null) {
LOG.info("Found no snapshot at {} for {}", getTagTriggerTime(), table.name());
LOG.info("Found no snapshot at {} for {}", tagTriggerTimestampMillis, table.name());

Check warning on line 79 in ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/AutoCreateIcebergTagAction.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/AutoCreateIcebergTagAction.java#L79

Added line #L79 was not covered by tests
return false;
}
if (exceedMaxDelay(snapshot)) {
Expand All @@ -91,7 +86,7 @@
snapshot.snapshotId(),
snapshot.timestampMillis(),
tagConfig.getMaxDelayMinutes(),
getTagTriggerTime());
tagTriggerTimestampMillis);
return false;
}
String newTagName = generateTagName();
Expand All @@ -109,22 +104,27 @@
if (tagConfig.getMaxDelayMinutes() <= 0) {
return false;
}
long delay = snapshot.timestampMillis() - getTagTriggerTime();
long delay = snapshot.timestampMillis() - getTagTriggerTimestampMillis();
return delay > tagConfig.getMaxDelayMinutes() * 60_000L;
}

private String generateTagName() {
if (tagConfig.getTriggerPeriod() == TagConfiguration.Period.DAILY) {
String tagFormat = tagConfig.getTagFormat();
return now.minusDays(1).format(DateTimeFormatter.ofPattern(tagFormat));
} else {
throw new IllegalArgumentException(
"unsupported trigger period " + tagConfig.getTriggerPeriod());
}
LocalDateTime tagTime =
Copy link
Contributor

@zhoujinsong zhoujinsong Dec 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, when I reviewed the code here, these three time-related variables made it difficult for me to understand. After carefully reviewing the code again, I have summarized the names and meanings of the three parameters:

  • Check time: the timestamp when the action is started.
  • Tag time: the timestamp of the last tag represents.
  • Trigger time: the timestamp of the last tag should be triggered.

And we calculated the 3 variables with the methods:

  • checkTime = now()
  • triggerTime = (checkTime - triggerOffset) % period + triggerOffest
  • TagTime = triggerTime - triggerOffest - period

then,

  • TagTime = (checkTime - triggerOffset) % period - period
  • TriggerTime = TagTime + period + triggerOffset

However, the tag time here does not correspond to the time of the tag. It is easier for us to generate the corresponding tag name based on this tag time, but it does not accurately represent the data within the tag.

We should calculate the 3 variables like :

  • checkTime = now()
  • TagTime = (checkTime - triggerOffset) % period
  • triggerTime = TagTime + triggerOffest

Then the value of the tag time is more accurate and the calculation methods are much simpler.

So I think we can make the code much easier to understand by:

  • Put the 3 variables together and try to calculate them at the beginning, put some comments to tell the meaning and the calculating rules.
  • Calculate the tag time first and then calculate the trigger time based on tag time.

tagConfig
.getTriggerPeriod()
.normalizeToTagTime(
tagConfig
.getTriggerPeriod()
.getTagTriggerTime(now, tagConfig.getTriggerOffsetMinutes()),
tagConfig.getTriggerOffsetMinutes());
String tagFormat = tagConfig.getTagFormat();
return tagTime.format(DateTimeFormatter.ofPattern(tagFormat));
}

private long getTagTriggerTime() {
return tagConfig.getTriggerPeriod().getTagTriggerTime(now, tagConfig.getTriggerOffsetMinutes());
private long getTagTriggerTimestampMillis() {
LocalDateTime tagTriggerTime =
tagConfig.getTriggerPeriod().getTagTriggerTime(now, tagConfig.getTriggerOffsetMinutes());
return tagTriggerTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
}

private static Snapshot findSnapshot(Table table, long tagTriggerTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@
import com.netease.arctic.utils.CompatiblePropertyUtil;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.Locale;
import java.util.Map;

Expand All @@ -35,7 +35,7 @@
public class TagConfiguration {
// tag.auto-create.enabled
private boolean autoCreateTag = false;
// tag.auto-create.daily.tag-format
// tag.auto-create.tag-format
private String tagFormat;
// tag.auto-create.trigger.period
private Period triggerPeriod;
Expand All @@ -48,10 +48,30 @@
public enum Period {
DAILY("daily") {
@Override
public long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
LocalTime offsetTime = LocalTime.ofSecondOfDay(triggerOffsetMinutes * 60L);
LocalDateTime triggerTime = LocalDateTime.of(checkTime.toLocalDate(), offsetTime);
return triggerTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
protected Duration periodDuration() {
return Duration.ofDays(1);
}

@Override
public LocalDateTime getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
return checkTime
.minusMinutes(triggerOffsetMinutes)
.truncatedTo(ChronoUnit.DAYS)
.plusMinutes(triggerOffsetMinutes);
}
},
HOURLY("hourly") {
@Override
protected Duration periodDuration() {
return Duration.ofHours(1);
}

@Override
public LocalDateTime getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes) {
return checkTime
.minusMinutes(triggerOffsetMinutes)
.truncatedTo(ChronoUnit.HOURS)
.plusMinutes(triggerOffsetMinutes);
}
};

Expand All @@ -65,14 +85,26 @@
return propertyName;
}

protected abstract Duration periodDuration();

/**
* Obtain the trigger time for creating a tag, which is the idea time of the last tag before the
* check time.
* Obtain the trigger time for creating a tag, which is the ideal time of the last tag before
* the check time.
*
* <p>For example, when creating a daily tag, the check time is 2022-08-08 11:00:00 and the
* offset is set to be 5 min, the idea trigger time is 2022-08-08 00:05:00.
*
* <p>For example, when creating a daily tag, the offset is set to be 30 min, if the check time
* is 2022-08-08 02:00:00, the ideal trigger time is 2022-08-08 00:30:00; if the check time is
* 2022-08-09 00:20:00 (before 00:30 of the next day), the ideal trigger time is still
* 2022-08-08 00:30:00.
*/
public abstract long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes);
public abstract LocalDateTime getTagTriggerTime(
LocalDateTime checkTime, int triggerOffsetMinutes);

public LocalDateTime normalizeToTagTime(LocalDateTime triggerTime, int triggerOffsetMinutes) {
wangtaohz marked this conversation as resolved.
Show resolved Hide resolved
return triggerTime.minus(triggerOffsetMinutes, ChronoUnit.MINUTES).minus(periodDuration());
}
}

public static TagConfiguration parse(Map<String, String> tableProperties) {
Expand All @@ -82,18 +114,29 @@
tableProperties,
TableProperties.ENABLE_AUTO_CREATE_TAG,
TableProperties.ENABLE_AUTO_CREATE_TAG_DEFAULT));
tagConfig.setTagFormat(
CompatiblePropertyUtil.propertyAsString(
tableProperties,
TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT,
TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT_DEFAULT));
tagConfig.setTriggerPeriod(
Period.valueOf(
CompatiblePropertyUtil.propertyAsString(
tableProperties,
TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD,
TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD_DEFAULT)
.toUpperCase(Locale.ROOT)));

String defaultFormat;
switch (tagConfig.getTriggerPeriod()) {
case DAILY:
defaultFormat = TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT_DAILY_DEFAULT;
break;
case HOURLY:
defaultFormat = TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT_HOURLY_DEFAULT;
break;
default:
throw new IllegalArgumentException(
"Unsupported trigger period: " + tagConfig.getTriggerPeriod());

Check warning on line 135 in ams/server/src/main/java/com/netease/arctic/server/table/TagConfiguration.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/TagConfiguration.java#L134-L135

Added lines #L134 - L135 were not covered by tests
}
tagConfig.setTagFormat(
CompatiblePropertyUtil.propertyAsString(
tableProperties, TableProperties.AUTO_CREATE_TAG_FORMAT, defaultFormat));
tagConfig.setTriggerOffsetMinutes(
CompatiblePropertyUtil.propertyAsInt(
tableProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netease.arctic.AmoroTable;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.server.optimizing.maintainer.TableMaintainer;
import com.netease.arctic.server.table.TableConfiguration;
import com.netease.arctic.server.table.TableManager;
import com.netease.arctic.server.table.TableRuntime;
import org.slf4j.Logger;
Expand Down Expand Up @@ -60,4 +61,9 @@
LOG.error("Failed to create tags on {}", tableRuntime.getTableIdentifier(), t);
}
}

@Override
public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) {
scheduleIfNecessary(tableRuntime, getStartDelay());
}

Check warning on line 68 in ams/server/src/main/java/com/netease/arctic/server/table/executor/TagsAutoCreatingExecutor.java

View check run for this annotation

Codecov / codecov/patch

ams/server/src/main/java/com/netease/arctic/server/table/executor/TagsAutoCreatingExecutor.java#L67-L68

Added lines #L67 - L68 were not covered by tests
}
Loading