Skip to content

Commit

Permalink
[AMORO-1723] Support auto create tag daily for Iceberg Format (apache…
Browse files Browse the repository at this point in the history
…#2263)

* upgrade iceberg to 1.3.0

* fix flink

* fix flink

* remove useless

* change version from 0.5.0-SNAPSHOT to 0.5.1-SNAPSHOT

* update iceberg version 1.3.x in Flink 1.12 module

* update iceberg version 1.3.x in Flink 1.12 module

* ArcticUpdate support toBranch

* fix ci error

* update iceberg version 1.3.x in Flink 1.14 module

* update iceberg version 1.3.x in Flink 1.15 module

* add PuffinUtil and unit test

* remove legacyPartitionMaxTransactionId before 0.4.1

* store optimized sequence to puffin

* support overwrite puffin when retry

* add table property table.version

* calculate available core

* change max input size per thread from 5GB to 500MB

* add auto create tag properties

* add PuffinUtil and unit test

* remove legacyPartitionMaxTransactionId before 0.4.1

* store optimized sequence to puffin

* support overwrite puffin when retry

* add table property table.version

* implement TagsCheckingExecutor and add unit test

* fix unit test

* fix unit test for hive

* change version from 0.5.1-SNAPSHOT to 0.5.0-SNAPSHOT

* change version from 0.5.0-SNAPSHOT to 0.5.1-SNAPSHOT

* support keyed table scan use ref(tag/branch)

* no need to get optimized sequence from KeyedTableSnapshot

* change version back to 0.5.0-SNAPSHOT

* fix get null sequence

* fix sequence number = -1

* fix optimizing integration test for hive

* create puffin for each snapshot

* for compatibility, if puffin not exist, using table properties

* remove useless table version

* add readWithCompatibility

* add some comments for PuffinUtil

* expire statistics files

* fix check style

* fix compile error

* remove useless deprecate

* fix unit test

* fix compile error and unit test

* refactor PuffinUtil

* fix compile error in ams server

* spotless:apply core

* fix unit test

* support generic type for PartitionDataSerializer

* rename PuffinUtil to StatisticsFileUtil

* rename method to readFromStatisticsFile

* spotless: apply

* spotless: apply

* spotless: apply

* 1.fix comment
2.add writerBuilder for StatisticsFileUtil.Writer
3.store puffin files in the location of data/puffin/
4.search snapshot based on a mark in snapsnot summary

* fix snapshot expring unit test error

* fix compile error

* fix TableConfiguration equals hashcode

* refactor code and only support create tag now

* add unit test for TestAutoCreateIcebergTagAction

* spotless:apply

* remove the support for mixed format

* Update ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/AutoCreateIcebergTagAction.java

Co-authored-by: baiyangtx <xiangnebula@163.com>

* Update ams/server/src/main/java/com/netease/arctic/server/optimizing/maintainer/AutoCreateIcebergTagAction.java

Co-authored-by: baiyangtx <xiangnebula@163.com>

* revert useRef when scan

* fix compile error

* Update ams/server/src/main/java/com/netease/arctic/server/table/executor/TagsCheckingExecutor.java

Co-authored-by: baiyangtx <xiangnebula@163.com>

* fix comment

* support more auto create configs

* Update core/src/main/java/com/netease/arctic/table/TagTriggerPeriod.java

Co-authored-by: big face cat <731030576@qq.com>

* fix compile error

* fix unit test

* add auto create tag max delay

* implement auto creating tag max delay

* add docs

* change daily tag format to 'tag-'yyyyMMdd

* change docs

* add docs for TagTriggerPeriod

* revert test assert

* refactor TagConfiguration

* add docs

* spotless:apply

* add docs

* improve logs and remove autoCreateTagEnabled from TableConfiguration

* improve logs

---------

Co-authored-by: lklhdu <lekeleihz@163.com>
Co-authored-by: ZhouJinsong <zhoujinsong0505@163.com>
Co-authored-by: baiyangtx <xiangnebula@163.com>
Co-authored-by: big face cat <731030576@qq.com>
  • Loading branch information
5 people authored and ShawHee committed Dec 29, 2023
1 parent b2aa096 commit d4abc9f
Show file tree
Hide file tree
Showing 17 changed files with 727 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@ public class ArcticManagementConf {
.defaultValue(10)
.withDescription("The number of threads used for refreshing tables.");

public static final ConfigOption<Boolean> AUTO_CREATE_TAGS_ENABLED =
ConfigOptions.key("auto-create-tags.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Enable creating tags.");

public static final ConfigOption<Integer> AUTO_CREATE_TAGS_THREAD_COUNT =
ConfigOptions.key("auto-create-tags.thread-count")
.intType()
.defaultValue(3)
.withDescription("The number of threads used for creating tags.");

public static final ConfigOption<Long> AUTO_CREATE_TAGS_INTERVAL =
ConfigOptions.key("auto-create-tags.interval")
.longType()
.defaultValue(60000L)
.withDescription("Interval for creating tags.");

public static final ConfigOption<Long> REFRESH_TABLES_INTERVAL =
ConfigOptions.key("refresh-tables.interval")
.longType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public void startService() throws Exception {
addHandlerChain(AsyncTableExecutors.getInstance().getBlockerExpiringExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getHiveCommitSyncExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getTableRefreshingExecutor());
addHandlerChain(AsyncTableExecutors.getInstance().getTagsAutoCreatingExecutor());
tableService.initialize();
LOG.info("AMS table service have been initialized");
terminalManager = new TerminalManager(serviceConfig, tableService);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.server.optimizing.maintainer;

import com.netease.arctic.server.table.TagConfiguration;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;

/** Action to auto create tag for Iceberg Table. */
public class AutoCreateIcebergTagAction {
private static final Logger LOG = LoggerFactory.getLogger(AutoCreateIcebergTagAction.class);

private final Table table;
private final TagConfiguration tagConfig;
private final LocalDateTime now;

public AutoCreateIcebergTagAction(Table table, TagConfiguration tagConfig, LocalDateTime now) {
this.table = table;
this.tagConfig = tagConfig;
this.now = now;
}

public void execute() {
if (!tagConfig.isAutoCreateTag()) {
return;
}
LOG.debug("Start checking the automatic creation of tags for {}", table.name());
if (tagExist()) {
LOG.debug("Found the expected tag on {}, skip", table.name());
return;
}
boolean success = createTag();
if (success) {
LOG.info("Created a tag successfully on {}", table.name());
} else {
LOG.info("Skipped tag creation on {}", table.name());
}
}

private boolean tagExist() {
if (tagConfig.getTriggerPeriod() == TagConfiguration.Period.DAILY) {
return findTagOfToday() != null;
} else {
throw new IllegalArgumentException(
"unsupported trigger period " + tagConfig.getTriggerPeriod());
}
}

private String findTagOfToday() {
String name = generateTagName();
return table.refs().entrySet().stream()
.filter(entry -> entry.getValue().isTag())
.map(Map.Entry::getKey)
.filter(name::equals)
.findFirst()
.orElse(null);
}

private boolean createTag() {
Snapshot snapshot = findSnapshot(table, getTagTriggerTime());
if (snapshot == null) {
LOG.info("Found no snapshot at {} for {}", getTagTriggerTime(), table.name());
return false;
}
if (exceedMaxDelay(snapshot)) {
LOG.info(
"{}'s snapshot {} at {} exceeds max delay {}, and the expected trigger time is {}",
table.name(),
snapshot.snapshotId(),
snapshot.timestampMillis(),
tagConfig.getMaxDelayMinutes(),
getTagTriggerTime());
return false;
}
String newTagName = generateTagName();
table.manageSnapshots().createTag(newTagName, snapshot.snapshotId()).commit();
LOG.info(
"Created a tag {} for {} on snapshot {} at {}",
newTagName,
table.name(),
snapshot.snapshotId(),
snapshot.timestampMillis());
return true;
}

private boolean exceedMaxDelay(Snapshot snapshot) {
if (tagConfig.getMaxDelayMinutes() <= 0) {
return false;
}
long delay = snapshot.timestampMillis() - getTagTriggerTime();
return delay > tagConfig.getMaxDelayMinutes() * 60_000L;
}

private String generateTagName() {
if (tagConfig.getTriggerPeriod() == TagConfiguration.Period.DAILY) {
String tagFormat = tagConfig.getTagFormat();
return now.minusDays(1).format(DateTimeFormatter.ofPattern(tagFormat));
} else {
throw new IllegalArgumentException(
"unsupported trigger period " + tagConfig.getTriggerPeriod());
}
}

private long getTagTriggerTime() {
return tagConfig.getTriggerPeriod().getTagTriggerTime(now, tagConfig.getTriggerOffsetMinutes());
}

private static Snapshot findSnapshot(Table table, long tagTriggerTime) {
Iterable<Snapshot> snapshots = table.snapshots();
for (Snapshot snapshot : snapshots) {
long waterMark = getWaterMark(table, snapshot);
if (waterMark > tagTriggerTime) {
return snapshot;
}
}
return null;
}

private static long getWaterMark(Table table, Snapshot snapshot) {
// TODO get water mark from snapshot level
return snapshot.timestampMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ public class IcebergTableMaintainer implements TableMaintainer {

public static final String METADATA_FOLDER_NAME = "metadata";
public static final String DATA_FOLDER_NAME = "data";
// same as org.apache.iceberg.flink.sink.IcebergFilesCommitter#FLINK_JOB_ID
public static final String FLINK_JOB_ID = "flink.job-id";

// same as org.apache.iceberg.flink.sink.IcebergFilesCommitter#MAX_COMMITTED_CHECKPOINT_ID
public static final String FLINK_MAX_COMMITTED_CHECKPOINT_ID =
"flink.max-committed-checkpoint-id";

Expand Down Expand Up @@ -105,6 +106,7 @@ public void cleanOrphanFiles(TableRuntime tableRuntime) {
cleanDanglingDeleteFiles();
}

@Override
public void expireSnapshots(TableRuntime tableRuntime) {
TableConfiguration tableConfiguration = tableRuntime.getTableConfiguration();
if (!tableConfiguration.isExpireSnapshotEnabled()) {
Expand All @@ -114,7 +116,14 @@ public void expireSnapshots(TableRuntime tableRuntime) {
olderThanSnapshotNeedToExpire(tableRuntime), expireSnapshotNeedToExcludeFiles());
}

public void expireSnapshots(long mustOlderThan) {
@Override
public void autoCreateTags(TableRuntime tableRuntime) {
new AutoCreateIcebergTagAction(
table, tableRuntime.getTableConfiguration().getTagConfiguration(), LocalDateTime.now())
.execute();
}

void expireSnapshots(long mustOlderThan) {
expireSnapshots(
olderThanSnapshotNeedToExpire(mustOlderThan), expireSnapshotNeedToExcludeFiles());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ public void expireSnapshots(TableRuntime tableRuntime) {
baseMaintainer.expireSnapshots(tableRuntime);
}

@Override
public void autoCreateTags(TableRuntime tableRuntime) {
throw new UnsupportedOperationException("Mixed table doesn't support auto create tags");
}

protected void expireSnapshots(long mustOlderThan) {
if (changeMaintainer != null) {
changeMaintainer.expireSnapshots(mustOlderThan);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,24 @@
/**
* API for maintaining table.
*
* <p>Includes: clean content files, clean metadata, clean dangling delete files, expire snapshots.
* <p>Includes: clean content files, clean metadata, clean dangling delete files, expire snapshots,
* auto create tags.
*/
// TODO TableMaintainer should not be in this optimizing.xxx package.
public interface TableMaintainer {

/** Clean table orphan files. Includes: data files, metadata files, dangling delete files. */
void cleanOrphanFiles(TableRuntime tableRuntime);

/**
* Expire snapshotsThe optimizing based on the snapshot that the current table relies on will not
* expire according to TableRuntime.
* Expire snapshots. The optimizing based on the snapshot that the current table relies on will
* not expire according to TableRuntime.
*/
void expireSnapshots(TableRuntime tableRuntime);

/** Auto create tags for table. */
void autoCreateTags(TableRuntime tableRuntime);

static TableMaintainer ofTable(AmoroTable<?> amoroTable) {
TableFormat format = amoroTable.format();
if (format == TableFormat.MIXED_HIVE || format == TableFormat.MIXED_ICEBERG) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class TableConfiguration {
private boolean deleteDanglingDeleteFilesEnabled;
private OptimizingConfig optimizingConfig;
private DataExpirationConfig expiringDataConfig;
private TagConfiguration tagConfiguration;

public TableConfiguration() {}

Expand Down Expand Up @@ -95,14 +96,19 @@ public TableConfiguration setExpiringDataConfig(DataExpirationConfig expiringDat
return this;
}

public TagConfiguration getTagConfiguration() {
return Optional.ofNullable(tagConfiguration).orElse(new TagConfiguration());
}

public TableConfiguration setTagConfiguration(TagConfiguration tagConfiguration) {
this.tagConfiguration = tagConfiguration;
return this;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TableConfiguration that = (TableConfiguration) o;
return expireSnapshotEnabled == that.expireSnapshotEnabled
&& snapshotTTLMinutes == that.snapshotTTLMinutes
Expand All @@ -111,7 +117,8 @@ public boolean equals(Object o) {
&& orphanExistingMinutes == that.orphanExistingMinutes
&& deleteDanglingDeleteFilesEnabled == that.deleteDanglingDeleteFilesEnabled
&& Objects.equal(optimizingConfig, that.optimizingConfig)
&& Objects.equal(expiringDataConfig, that.expiringDataConfig);
&& Objects.equal(expiringDataConfig, that.expiringDataConfig)
&& Objects.equal(tagConfiguration, that.tagConfiguration);
}

@Override
Expand All @@ -124,7 +131,8 @@ public int hashCode() {
orphanExistingMinutes,
deleteDanglingDeleteFilesEnabled,
optimizingConfig,
expiringDataConfig);
expiringDataConfig,
tagConfiguration);
}

public static TableConfiguration parseConfig(Map<String, String> properties) {
Expand Down Expand Up @@ -160,6 +168,7 @@ public static TableConfiguration parseConfig(Map<String, String> properties) {
TableProperties.ENABLE_DANGLING_DELETE_FILES_CLEAN,
TableProperties.ENABLE_DANGLING_DELETE_FILES_CLEAN_DEFAULT))
.setOptimizingConfig(OptimizingConfig.parseOptimizingConfig(properties))
.setExpiringDataConfig(DataExpirationConfig.parse(properties));
.setExpiringDataConfig(DataExpirationConfig.parse(properties))
.setTagConfiguration(TagConfiguration.parse(properties));
}
}
Loading

0 comments on commit d4abc9f

Please sign in to comment.