diff --git a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/AutoCreateIcebergTagAction.java b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/AutoCreateIcebergTagAction.java index b20a12279d..bca5998601 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/AutoCreateIcebergTagAction.java +++ b/ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/AutoCreateIcebergTagAction.java @@ -25,7 +25,7 @@ import org.slf4j.LoggerFactory; import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; +import java.time.ZoneId; import java.util.Map; /** Action to auto create tag for Iceberg Table. */ @@ -34,12 +34,20 @@ public class AutoCreateIcebergTagAction { private final Table table; private final TagConfiguration tagConfig; - private final LocalDateTime now; + private final LocalDateTime triggerTime; + private final String tagName; - public AutoCreateIcebergTagAction(Table table, TagConfiguration tagConfig, LocalDateTime now) { + public AutoCreateIcebergTagAction( + Table table, TagConfiguration tagConfig, LocalDateTime checkTime) { this.table = table; this.tagConfig = tagConfig; - this.now = now; + + LocalDateTime tagTime = + tagConfig.getTriggerPeriod().getTagTime(checkTime, tagConfig.getTriggerOffsetMinutes()); + // triggerTime = TagTime + triggerOffset + // The trigger time of the tag, which is the time when the tag is expected to be created. + this.triggerTime = tagTime.plusMinutes(tagConfig.getTriggerOffsetMinutes()); + this.tagName = tagConfig.getTriggerPeriod().generateTagName(tagTime, tagConfig.getTagFormat()); } public void execute() { @@ -60,28 +68,21 @@ public void execute() { } private boolean tagExist() { - if (tagConfig.getTriggerPeriod() == TagConfiguration.Period.DAILY) { - return findTagOfToday() != null; - } else { - throw new IllegalArgumentException( - "unsupported trigger period " + tagConfig.getTriggerPeriod()); - } - } - - private String findTagOfToday() { - String name = generateTagName(); - return table.refs().entrySet().stream() - .filter(entry -> entry.getValue().isTag()) - .map(Map.Entry::getKey) - .filter(name::equals) - .findFirst() - .orElse(null); + String tag = + table.refs().entrySet().stream() + .filter(entry -> entry.getValue().isTag()) + .map(Map.Entry::getKey) + .filter(tagName::equals) + .findFirst() + .orElse(null); + return tag != null; } private boolean createTag() { - Snapshot snapshot = findSnapshot(table, getTagTriggerTime()); + long tagTriggerTimestampMillis = getTagTriggerTimestampMillis(); + Snapshot snapshot = findSnapshot(table, tagTriggerTimestampMillis); if (snapshot == null) { - LOG.info("Found no snapshot at {} for {}", getTagTriggerTime(), table.name()); + LOG.info("Found no snapshot at {} for {}", tagTriggerTimestampMillis, table.name()); return false; } if (exceedMaxDelay(snapshot)) { @@ -91,14 +92,13 @@ private boolean createTag() { snapshot.snapshotId(), snapshot.timestampMillis(), tagConfig.getMaxDelayMinutes(), - getTagTriggerTime()); + tagTriggerTimestampMillis); return false; } - String newTagName = generateTagName(); - table.manageSnapshots().createTag(newTagName, snapshot.snapshotId()).commit(); + table.manageSnapshots().createTag(tagName, snapshot.snapshotId()).commit(); LOG.info( "Created a tag {} for {} on snapshot {} at {}", - newTagName, + tagName, table.name(), snapshot.snapshotId(), snapshot.timestampMillis()); @@ -109,22 +109,12 @@ private boolean exceedMaxDelay(Snapshot snapshot) { if (tagConfig.getMaxDelayMinutes() <= 0) { return false; } - long delay = snapshot.timestampMillis() - getTagTriggerTime(); + long delay = snapshot.timestampMillis() - getTagTriggerTimestampMillis(); return delay > tagConfig.getMaxDelayMinutes() * 60_000L; } - private String generateTagName() { - if (tagConfig.getTriggerPeriod() == TagConfiguration.Period.DAILY) { - String tagFormat = tagConfig.getTagFormat(); - return now.minusDays(1).format(DateTimeFormatter.ofPattern(tagFormat)); - } else { - throw new IllegalArgumentException( - "unsupported trigger period " + tagConfig.getTriggerPeriod()); - } - } - - private long getTagTriggerTime() { - return tagConfig.getTriggerPeriod().getTagTriggerTime(now, tagConfig.getTriggerOffsetMinutes()); + private long getTagTriggerTimestampMillis() { + return triggerTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); } private static Snapshot findSnapshot(Table table, long tagTriggerTime) { diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/TagConfiguration.java b/ams/server/src/main/java/com/netease/arctic/server/table/TagConfiguration.java index 479c1dc48d..6a3a23bb23 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/TagConfiguration.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/TagConfiguration.java @@ -24,9 +24,10 @@ import com.netease.arctic.utils.CompatiblePropertyUtil; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import java.time.Duration; import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; import java.util.Locale; import java.util.Map; @@ -35,7 +36,7 @@ public class TagConfiguration { // tag.auto-create.enabled private boolean autoCreateTag = false; - // tag.auto-create.daily.tag-format + // tag.auto-create.tag-format private String tagFormat; // tag.auto-create.trigger.period private Period triggerPeriod; @@ -48,10 +49,25 @@ public class TagConfiguration { public enum Period { DAILY("daily") { @Override - public long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes) { - LocalTime offsetTime = LocalTime.ofSecondOfDay(triggerOffsetMinutes * 60L); - LocalDateTime triggerTime = LocalDateTime.of(checkTime.toLocalDate(), offsetTime); - return triggerTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); + protected Duration periodDuration() { + return Duration.ofDays(1); + } + + @Override + public LocalDateTime getTagTime(LocalDateTime checkTime, int triggerOffsetMinutes) { + return checkTime.minusMinutes(triggerOffsetMinutes).truncatedTo(ChronoUnit.DAYS); + } + }, + + HOURLY("hourly") { + @Override + protected Duration periodDuration() { + return Duration.ofHours(1); + } + + @Override + public LocalDateTime getTagTime(LocalDateTime checkTime, int triggerOffsetMinutes) { + return checkTime.minusMinutes(triggerOffsetMinutes).truncatedTo(ChronoUnit.HOURS); } }; @@ -65,14 +81,25 @@ public String propertyName() { return propertyName; } + protected abstract Duration periodDuration(); + /** - * Obtain the trigger time for creating a tag, which is the idea time of the last tag before the + * Obtain the tag time for creating a tag, which is the ideal time of the last tag before the * check time. * *

For example, when creating a daily tag, the check time is 2022-08-08 11:00:00 and the - * offset is set to be 5 min, the idea trigger time is 2022-08-08 00:05:00. + * offset is set to be 5 min, the idea tag time is 2022-08-08 00:00:00. + * + *

For example, when creating a daily tag, the offset is set to be 30 min, if the check time + * is 2022-08-08 02:00:00, the ideal tag time is 2022-08-08 00:00:00; if the check time is + * 2022-08-09 00:20:00 (before 00:30 of the next day), the ideal tag time is still 2022-08-08 + * 00:00:00. */ - public abstract long getTagTriggerTime(LocalDateTime checkTime, int triggerOffsetMinutes); + public abstract LocalDateTime getTagTime(LocalDateTime checkTime, int triggerOffsetMinutes); + + public String generateTagName(LocalDateTime tagTime, String tagFormat) { + return tagTime.minus(periodDuration()).format(DateTimeFormatter.ofPattern(tagFormat)); + } } public static TagConfiguration parse(Map tableProperties) { @@ -82,11 +109,6 @@ public static TagConfiguration parse(Map tableProperties) { tableProperties, TableProperties.ENABLE_AUTO_CREATE_TAG, TableProperties.ENABLE_AUTO_CREATE_TAG_DEFAULT)); - tagConfig.setTagFormat( - CompatiblePropertyUtil.propertyAsString( - tableProperties, - TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT, - TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT_DEFAULT)); tagConfig.setTriggerPeriod( Period.valueOf( CompatiblePropertyUtil.propertyAsString( @@ -94,6 +116,22 @@ public static TagConfiguration parse(Map tableProperties) { TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD, TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD_DEFAULT) .toUpperCase(Locale.ROOT))); + + String defaultFormat; + switch (tagConfig.getTriggerPeriod()) { + case DAILY: + defaultFormat = TableProperties.AUTO_CREATE_TAG_FORMAT_DAILY_DEFAULT; + break; + case HOURLY: + defaultFormat = TableProperties.AUTO_CREATE_TAG_FORMAT_HOURLY_DEFAULT; + break; + default: + throw new IllegalArgumentException( + "Unsupported trigger period: " + tagConfig.getTriggerPeriod()); + } + tagConfig.setTagFormat( + CompatiblePropertyUtil.propertyAsString( + tableProperties, TableProperties.AUTO_CREATE_TAG_FORMAT, defaultFormat)); tagConfig.setTriggerOffsetMinutes( CompatiblePropertyUtil.propertyAsInt( tableProperties, diff --git a/ams/server/src/main/java/com/netease/arctic/server/table/executor/TagsAutoCreatingExecutor.java b/ams/server/src/main/java/com/netease/arctic/server/table/executor/TagsAutoCreatingExecutor.java index b50ef53ebe..01e4d9ea52 100644 --- a/ams/server/src/main/java/com/netease/arctic/server/table/executor/TagsAutoCreatingExecutor.java +++ b/ams/server/src/main/java/com/netease/arctic/server/table/executor/TagsAutoCreatingExecutor.java @@ -23,6 +23,7 @@ import com.netease.arctic.AmoroTable; import com.netease.arctic.ams.api.TableFormat; import com.netease.arctic.server.optimizing.maintainer.TableMaintainer; +import com.netease.arctic.server.table.TableConfiguration; import com.netease.arctic.server.table.TableManager; import com.netease.arctic.server.table.TableRuntime; import org.slf4j.Logger; @@ -60,4 +61,9 @@ protected void execute(TableRuntime tableRuntime) { LOG.error("Failed to create tags on {}", tableRuntime.getTableIdentifier(), t); } } + + @Override + public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration originalConfig) { + scheduleIfNecessary(tableRuntime, getStartDelay()); + } } diff --git a/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestAutoCreateIcebergTagAction.java b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestAutoCreateIcebergTagAction.java index 2f45dbdad6..34f3c3ba3f 100644 --- a/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestAutoCreateIcebergTagAction.java +++ b/ams/server/src/test/java/com/netease/arctic/server/optimizing/maintainer/TestAutoCreateIcebergTagAction.java @@ -82,6 +82,30 @@ public void testCreateDailyTag() { checkTagCount(table, 1); } + @Test + public void testCreateHourlyTag() { + Table table = getArcticTable().asUnkeyedTable(); + table + .updateProperties() + .set(TableProperties.ENABLE_AUTO_CREATE_TAG, "true") + .set(TableProperties.AUTO_CREATE_TAG_MAX_DELAY_MINUTES, "0") + .set(TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD, "hourly") + .commit(); + table.newAppend().commit(); + checkSnapshots(table, 1); + checkNoTag(table); + + Snapshot snapshot = table.currentSnapshot(); + LocalDateTime now = fromEpochMillis(snapshot.timestampMillis()); + newAutoCreateIcebergTagAction(table, now).execute(); + checkTagCount(table, 1); + checkTag(table, "tag-" + formatDateTime(now.minusHours(1)), snapshot); + + // should not recreate tag + newAutoCreateIcebergTagAction(table, now).execute(); + checkTagCount(table, 1); + } + @Test public void testCreateDailyOffsetTag() { Table table = getArcticTable().asUnkeyedTable(); @@ -104,7 +128,7 @@ public void testCreateDailyOffsetTag() { .set(TableProperties.AUTO_CREATE_TAG_TRIGGER_OFFSET_MINUTES, offsetMinutesOfToday + "") .commit(); newAutoCreateIcebergTagAction(table, now).execute(); - checkTagCount(table, 0); + checkTag(table, "tag-" + formatDate(now.minusDays(2)), snapshot); // Offset -1 minute to make the snapshot exceed the offset to create tag offsetMinutesOfToday--; @@ -113,12 +137,48 @@ public void testCreateDailyOffsetTag() { .set(TableProperties.AUTO_CREATE_TAG_TRIGGER_OFFSET_MINUTES, offsetMinutesOfToday + "") .commit(); newAutoCreateIcebergTagAction(table, now).execute(); - checkTagCount(table, 1); + checkTagCount(table, 2); checkTag(table, "tag-" + formatDate(now.minusDays(1)), snapshot); // should not recreate tag newAutoCreateIcebergTagAction(table, now).execute(); - checkTagCount(table, 1); + checkTagCount(table, 2); + } + + @Test + public void testCreateHourlyOffsetTag() { + Table table = getArcticTable().asUnkeyedTable(); + table + .updateProperties() + .set(TableProperties.ENABLE_AUTO_CREATE_TAG, "true") + .set(TableProperties.AUTO_CREATE_TAG_MAX_DELAY_MINUTES, "0") + .set(TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD, "hourly") + .commit(); + table.newAppend().commit(); + checkSnapshots(table, 1); + checkNoTag(table); + + Snapshot snapshot = table.currentSnapshot(); + LocalDateTime testDateTime = fromEpochMillis(snapshot.timestampMillis()); + long offsetMinutesOfHour = getOffsetMinutesOfHour(snapshot.timestampMillis()) + 1; + table + .updateProperties() + .set(TableProperties.AUTO_CREATE_TAG_TRIGGER_OFFSET_MINUTES, offsetMinutesOfHour + "") + .commit(); + newAutoCreateIcebergTagAction(table, testDateTime).execute(); + checkTag(table, "tag-" + formatDateTime(testDateTime.minusHours(2)), snapshot); + + offsetMinutesOfHour--; + table + .updateProperties() + .set(TableProperties.AUTO_CREATE_TAG_TRIGGER_OFFSET_MINUTES, offsetMinutesOfHour + "") + .commit(); + newAutoCreateIcebergTagAction(table, testDateTime).execute(); + checkTagCount(table, 2); + checkTag(table, "tag-" + formatDateTime(testDateTime.minusHours(1)), snapshot); + + newAutoCreateIcebergTagAction(table, testDateTime).execute(); + checkTagCount(table, 2); } @Test @@ -147,6 +207,33 @@ public void testNotCreateDelayDailyTag() { checkTag(table, "tag-" + formatDate(now.minusDays(1)), snapshot); } + @Test + public void testNotCreateDelayHourlyTag() { + Table table = getArcticTable().asUnkeyedTable(); + table + .updateProperties() + .set(TableProperties.ENABLE_AUTO_CREATE_TAG, "true") + .set(TableProperties.AUTO_CREATE_TAG_TRIGGER_PERIOD, "hourly") + .set(TableProperties.AUTO_CREATE_TAG_MAX_DELAY_MINUTES, "60") + .commit(); + table.newAppend().commit(); + checkSnapshots(table, 1); + checkNoTag(table); + + Snapshot snapshot = table.currentSnapshot(); + LocalDateTime now = fromEpochMillis(snapshot.timestampMillis()); + LocalDateTime lastHour = now.minusHours(1); + + // should not create last hour tag + newAutoCreateIcebergTagAction(table, lastHour).execute(); + checkNoTag(table); + + // should create this hour tag + newAutoCreateIcebergTagAction(table, now).execute(); + checkTagCount(table, 1); + checkTag(table, "tag-" + formatDateTime(now.minusHours(1)), snapshot); + } + @Test public void testTagFormat() { Table table = getArcticTable().asUnkeyedTable(); @@ -157,7 +244,7 @@ public void testTagFormat() { .commit(); table .updateProperties() - .set(TableProperties.AUTO_CREATE_TAG_DAILY_FORMAT, "'custom-tag-'yyyyMMdd'-auto'") + .set(TableProperties.AUTO_CREATE_TAG_FORMAT, "'custom-tag-'yyyyMMdd'-auto'") .commit(); table.newAppend().commit(); checkSnapshots(table, 1); @@ -174,6 +261,59 @@ public void testTagFormat() { checkTagCount(table, 1); } + @Test + public void testTriggerTimePeriod() { + testTagTimePeriodHourly("2022-08-08T11:40:00", 30, "2022-08-08T11:00:00"); + testTagTimePeriodHourly("2022-08-08T23:40:00", 15, "2022-08-08T23:00:00"); + testTagTimePeriodHourly("2022-08-09T00:10:00", 30, "2022-08-08T23:00:00"); + + testTagTimePeriodDaily("2022-08-08T03:40:00", 30, "2022-08-08T00:00:00"); + testTagTimePeriodDaily("2022-08-08T23:40:00", 15, "2022-08-08T00:00:00"); + testTagTimePeriodDaily("2022-08-09T00:10:00", 30, "2022-08-08T00:00:00"); + } + + private void testTagTimePeriodHourly( + String checkTimeStr, int offsetMinutes, String expectedResultStr) { + LocalDateTime checkTime = LocalDateTime.parse(checkTimeStr); + Long expectedTriggerTime = + (expectedResultStr == null) + ? null + : LocalDateTime.parse(expectedResultStr) + .atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli(); + + Long actualTriggerTime = + TagConfiguration.Period.HOURLY + .getTagTime(checkTime, offsetMinutes) + .atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli(); + + Assert.assertEquals(expectedTriggerTime, actualTriggerTime); + } + + private void testTagTimePeriodDaily( + String checkTimeStr, int offsetMinutes, String expectedResultStr) { + LocalDateTime checkTime = LocalDateTime.parse(checkTimeStr); + Long expectedTriggerTime = + (expectedResultStr == null) + ? null + : LocalDateTime.parse(expectedResultStr) + .atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli(); + + Long actualTriggerTime = + TagConfiguration.Period.DAILY + .getTagTime(checkTime, offsetMinutes) + .atZone(ZoneId.systemDefault()) + .toInstant() + .toEpochMilli(); + + Assert.assertEquals(expectedTriggerTime, actualTriggerTime); + } + private long getOffsetMinutesOfToday(long millis) { LocalDateTime now = fromEpochMillis(millis); LocalDateTime today = LocalDateTime.of(now.toLocalDate(), LocalTime.ofSecondOfDay(0)); @@ -181,6 +321,11 @@ private long getOffsetMinutesOfToday(long millis) { return between.toMinutes(); } + private long getOffsetMinutesOfHour(long millis) { + LocalDateTime now = fromEpochMillis(millis); + return now.getMinute(); + } + private LocalDateTime fromEpochMillis(long millis) { return LocalDateTime.ofInstant(Instant.ofEpochMilli(millis), ZoneId.systemDefault()); } @@ -189,6 +334,10 @@ private String formatDate(LocalDateTime localDateTime) { return localDateTime.format(DateTimeFormatter.ofPattern("yyyyMMdd")); } + private String formatDateTime(LocalDateTime localDateTime) { + return localDateTime.format(DateTimeFormatter.ofPattern("yyyyMMddHH")); + } + private void checkNoTag(Table table) { Assert.assertFalse(table.refs().values().stream().anyMatch(SnapshotRef::isTag)); } diff --git a/core/src/main/java/com/netease/arctic/table/TableProperties.java b/core/src/main/java/com/netease/arctic/table/TableProperties.java index bcfc203c64..20fe3f6655 100644 --- a/core/src/main/java/com/netease/arctic/table/TableProperties.java +++ b/core/src/main/java/com/netease/arctic/table/TableProperties.java @@ -216,8 +216,9 @@ private TableProperties() {} "tag.auto-create.trigger.max-delay.minutes"; public static final int AUTO_CREATE_TAG_MAX_DELAY_MINUTES_DEFAULT = 60; - public static final String AUTO_CREATE_TAG_DAILY_FORMAT = "tag.auto-create.daily.tag-format"; - public static final String AUTO_CREATE_TAG_DAILY_FORMAT_DEFAULT = "'tag-'yyyyMMdd"; + public static final String AUTO_CREATE_TAG_FORMAT = "tag.auto-create.tag-format"; + public static final String AUTO_CREATE_TAG_FORMAT_DAILY_DEFAULT = "'tag-'yyyyMMdd"; + public static final String AUTO_CREATE_TAG_FORMAT_HOURLY_DEFAULT = "'tag-'yyyyMMddHH"; /** table write related properties */ public static final String FILE_FORMAT_PARQUET = "parquet"; diff --git a/docs/user-guides/configurations.md b/docs/user-guides/configurations.md index 0fbbf6557a..ade12d5727 100644 --- a/docs/user-guides/configurations.md +++ b/docs/user-guides/configurations.md @@ -70,13 +70,13 @@ Data-cleaning configurations are applicable to both Iceberg Format and Mixed str Tags configurations are applicable to Iceberg Format only now, and will be supported in Mixed Format soon. -| Key | Default | Description | -|---------------------------------------------|---------------|-------------------------------------------------------------------------------------------------------------| -| tag.auto-create.enabled | false | Enables automatically creating tags | -| tag.auto-create.trigger.period | daily | Period of creating tags, support `daily` now | -| tag.auto-create.trigger.offset.minutes | 0 | The minutes by which the tag is created after midnight (00:00) | -| tag.auto-create.trigger.max-delay.minutes | 60 | The maximum delay time for creating a tag | -| tag.auto-create.daily.tag-format | 'tag-'yyyyMMdd | The format of the name for daily tag | +| Key | Default | Description | +|--------------------------------------------|--------------------------------------------------|-------------------------------------------------------------| +| tag.auto-create.enabled | false | Enables automatically creating tags | +| tag.auto-create.trigger.period | daily | Period of creating tags, support `daily`,`hourly` now | +| tag.auto-create.trigger.offset.minutes | 0 | The minutes by which the tag is created after midnight (00:00) | +| tag.auto-create.trigger.max-delay.minutes | 60 | The maximum delay time for creating a tag | +| tag.auto-create.tag-format | 'tag-'yyyyMMdd for daily and 'tag-'yyyyMMddHH for hourly periods | The format of the name for tag | ## Mixed Format configurations @@ -132,4 +132,4 @@ If using Iceberg Format,please refer to [Iceberg configurations](https://icebe |-----------------------------------|------------------|--------------------------------------------------------------------------------------------------------| | base.hive.auto-sync-schema-change | true | Whether synchronize schema changes of Hive Table from HMS | | base.hive.auto-sync-data-write | false | Whether synchronize data changes of Hive Table from HMS, this should be true when writing to Hive | -| base.hive.consistent-write.enabled | true | To avoid writing dirty data, the files written to the Hive directory will be hidden files and renamed to visible files upon commit. | \ No newline at end of file +| base.hive.consistent-write.enabled | true | To avoid writing dirty data, the files written to the Hive directory will be hidden files and renamed to visible files upon commit. |