diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java index 48a61e0ee7..7e6bf8220c 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java @@ -1190,6 +1190,15 @@ public class ConfigOptions { + "The `table.datalake.format` can be pre-defined before enabling `table.datalake.enabled`. This allows the data lake feature to be dynamically enabled on the table without requiring table recreation. " + "If `table.datalake.format` is not explicitly set during table creation, the table will default to the format specified by the `datalake.format` configuration in the Fluss cluster."); + public static final ConfigOption TABLE_DATALAKE_FRESHNESS = + key("table.datalake.freshness") + .durationType() + .defaultValue(Duration.ofMinutes(3)) + .withDescription( + "It defines the maximum amount of time that the datalake table's content should lag behind updates to the Fluss table. " + + "Based on this target freshness, the Fluss service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. " + + "If the data does not need to be as fresh, you can specify a longer target freshness time to reduce costs."); + public static final ConfigOption TABLE_MERGE_ENGINE = key("table.merge-engine") .enumType(MergeEngineType.class) diff --git a/fluss-common/src/main/java/com/alibaba/fluss/config/TableConfig.java b/fluss-common/src/main/java/com/alibaba/fluss/config/TableConfig.java index 11a894b23e..5d4165978b 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/config/TableConfig.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/config/TableConfig.java @@ -24,6 +24,7 @@ import com.alibaba.fluss.metadata.MergeEngineType; import com.alibaba.fluss.utils.AutoPartitionStrategy; +import java.time.Duration; import java.util.Optional; /** @@ -84,6 +85,14 @@ public Optional getDataLakeFormat() { return config.getOptional(ConfigOptions.TABLE_DATALAKE_FORMAT); } + /** + * Gets the data lake freshness of the table. It defines the maximum amount of time that the + * datalake table's content should lag behind updates to the Fluss table. + */ + public Duration getDataLakeFreshness() { + return config.get(ConfigOptions.TABLE_DATALAKE_FRESHNESS); + } + /** Gets the optional merge engine type of the table. */ public Optional getMergeEngineType() { return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE); diff --git a/fluss-common/src/main/java/com/alibaba/fluss/exception/FencedTieringEpochException.java b/fluss-common/src/main/java/com/alibaba/fluss/exception/FencedTieringEpochException.java new file mode 100644 index 0000000000..7b1daa48b6 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/exception/FencedTieringEpochException.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.exception; + +/** Exception thrown when the tiering epoch is invalid. */ +public class FencedTieringEpochException extends ApiException { + + public FencedTieringEpochException(String message) { + super(message); + } +} diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java index 36caf1e98d..b83aa36b56 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java @@ -26,6 +26,7 @@ import com.alibaba.fluss.exception.DatabaseNotExistException; import com.alibaba.fluss.exception.DuplicateSequenceException; import com.alibaba.fluss.exception.FencedLeaderEpochException; +import com.alibaba.fluss.exception.FencedTieringEpochException; import com.alibaba.fluss.exception.InvalidColumnProjectionException; import com.alibaba.fluss.exception.InvalidConfigException; import com.alibaba.fluss.exception.InvalidCoordinatorException; @@ -199,7 +200,9 @@ public enum Errors { SECURITY_DISABLED_EXCEPTION(47, "Security is disabled.", SecurityDisabledException::new), AUTHORIZATION_EXCEPTION(48, "Authorization failed", AuthorizationException::new), BUCKET_MAX_NUM_EXCEPTION( - 49, "Exceed the maximum number of buckets", TooManyBucketsException::new); + 49, "Exceed the maximum number of buckets", TooManyBucketsException::new), + FENCED_TIERING_EPOCH_EXCEPTION( + 50, "The tiering epoch is invalid.", FencedTieringEpochException::new); private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java index 22ad22fcd4..1d761312f3 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -124,6 +124,7 @@ public class CoordinatorEventProcessor implements EventProcessor { private final MetadataManager metadataManager; private final TableManager tableManager; private final AutoPartitionManager autoPartitionManager; + private final LakeTableTieringManager lakeTableTieringManager; private final TableChangeWatcher tableChangeWatcher; private final CoordinatorChannelManager coordinatorChannelManager; private final TabletServerChangeWatcher tabletServerChangeWatcher; @@ -151,6 +152,7 @@ public CoordinatorEventProcessor( ServerMetadataCache serverMetadataCache, CoordinatorChannelManager coordinatorChannelManager, AutoPartitionManager autoPartitionManager, + LakeTableTieringManager lakeTableTieringManager, CoordinatorMetricGroup coordinatorMetricGroup, Configuration conf, ExecutorService ioExecutor) { @@ -160,6 +162,7 @@ public CoordinatorEventProcessor( coordinatorChannelManager, new CoordinatorContext(), autoPartitionManager, + lakeTableTieringManager, coordinatorMetricGroup, conf, ioExecutor); @@ -171,6 +174,7 @@ public CoordinatorEventProcessor( CoordinatorChannelManager coordinatorChannelManager, CoordinatorContext coordinatorContext, AutoPartitionManager autoPartitionManager, + LakeTableTieringManager lakeTableTieringManager, CoordinatorMetricGroup coordinatorMetricGroup, Configuration conf, ExecutorService ioExecutor) { @@ -211,6 +215,7 @@ public CoordinatorEventProcessor( ioExecutor, zooKeeperClient); this.autoPartitionManager = autoPartitionManager; + this.lakeTableTieringManager = lakeTableTieringManager; this.coordinatorMetricGroup = coordinatorMetricGroup; this.internalListenerName = conf.getString(ConfigOptions.INTERNAL_LISTENER_NAME); registerMetrics(); @@ -326,13 +331,18 @@ private void initCoordinatorContext() throws Exception { // load all tables List autoPartitionTables = new ArrayList<>(); + List> lakeTables = new ArrayList<>(); for (String database : metadataManager.listDatabases()) { for (String tableName : metadataManager.listTables(database)) { TablePath tablePath = TablePath.of(database, tableName); TableInfo tableInfo = metadataManager.getTable(tablePath); coordinatorContext.putTablePath(tableInfo.getTableId(), tablePath); coordinatorContext.putTableInfo(tableInfo); - + if (tableInfo.getTableConfig().isDataLakeEnabled()) { + // always set to current time, + // todo: should get from the last lake snapshot + lakeTables.add(Tuple2.of(tableInfo, System.currentTimeMillis())); + } if (tableInfo.isPartitioned()) { Map partitions = zooKeeperClient.getPartitionNameAndIds(tablePath); @@ -351,6 +361,7 @@ private void initCoordinatorContext() throws Exception { } } autoPartitionManager.initAutoPartitionTables(autoPartitionTables); + lakeTableTieringManager.initWithLakeTables(lakeTables); // load all assignment loadTableAssignment(); @@ -550,6 +561,9 @@ private void processCreateTable(CreateTableEvent createTableEvent) { if (createTableEvent.isAutoPartitionTable()) { autoPartitionManager.addAutoPartitionTable(tableInfo, true); } + if (tableInfo.getTableConfig().isDataLakeEnabled()) { + lakeTableTieringManager.addNewLakeTable(tableInfo); + } } private void processCreatePartition(CreatePartitionEvent createPartitionEvent) { @@ -585,6 +599,9 @@ private void processDropTable(DropTableEvent dropTableEvent) { if (dropTableEvent.isAutoPartitionTable()) { autoPartitionManager.removeAutoPartitionTable(dropTableEvent.getTableId()); } + if (dropTableEvent.isDataLakeEnabled()) { + lakeTableTieringManager.removeLakeTable(dropTableEvent.getTableId()); + } } private void processDropPartition(DropPartitionEvent dropPartitionEvent) { diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java index 9d105d310b..c5a7effd6e 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java @@ -127,6 +127,9 @@ public class CoordinatorServer extends ServerBase { @GuardedBy("lock") private AutoPartitionManager autoPartitionManager; + @GuardedBy("lock") + private LakeTableTieringManager lakeTableTieringManager; + @GuardedBy("lock") private ExecutorService ioExecutor; @@ -172,6 +175,8 @@ protected void startServices() throws Exception { authorizer.startup(); } + this.lakeTableTieringManager = new LakeTableTieringManager(); + MetadataManager metadataManager = new MetadataManager(zkClient, conf); this.coordinatorService = new CoordinatorService( @@ -221,6 +226,7 @@ protected void startServices() throws Exception { metadataCache, coordinatorChannelManager, autoPartitionManager, + lakeTableTieringManager, serverMetricGroup, conf, ioExecutor); @@ -366,6 +372,14 @@ CompletableFuture stopServices() { exception = ExceptionUtils.firstOrSuppressed(t, exception); } + try { + if (lakeTableTieringManager != null) { + lakeTableTieringManager.close(); + } + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + try { if (zkClient != null) { zkClient.close(); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/LakeTableTieringManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/LakeTableTieringManager.java new file mode 100644 index 0000000000..ccb05a1048 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/LakeTableTieringManager.java @@ -0,0 +1,536 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.coordinator; + +import com.alibaba.fluss.annotation.VisibleForTesting; +import com.alibaba.fluss.exception.FencedTieringEpochException; +import com.alibaba.fluss.exception.FlussRuntimeException; +import com.alibaba.fluss.exception.TableNotExistException; +import com.alibaba.fluss.metadata.TableInfo; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.server.entity.LakeTieringTableInfo; +import com.alibaba.fluss.server.utils.timer.DefaultTimer; +import com.alibaba.fluss.server.utils.timer.Timer; +import com.alibaba.fluss.server.utils.timer.TimerTask; +import com.alibaba.fluss.utils.clock.Clock; +import com.alibaba.fluss.utils.clock.SystemClock; +import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory; +import com.alibaba.fluss.utils.concurrent.ShutdownableThread; +import com.alibaba.fluss.utils.types.Tuple2; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.GuardedBy; + +import java.util.ArrayDeque; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock; + +/** + * A manager to manage the tables to be tiered. + * + *

For a lake table to be tiered, when created, it wil be put into this manager and scheduled to + * be tiered by tiering services. + * + *

There are five states for the table to be tiered: + * + *

    + *
  • New: when a new lake table is created + *
  • Initialized: when the coordinator server is restarted, the state of all existing lake table + * will be Initialized state + *
  • Scheduled: when the lake table is waiting for a period of time to be tiered + *
  • Pending: when the lake table is waiting for tiering service to request the table + *
  • Tiering: when the lake table is being tiered by tiering service + *
  • Tiered: when the lake table finish one round of tiering + *
  • Failed: when the lake table tiering failed + *
+ * + *

The state machine of table to be tiered is as follows: + * + *

{@code
+ * ┌─────┐ ┌──────┐
+ * │ New │ │ Init │
+ * └──┬──┘ └──┬───┘
+ *    ▼       ▼
+ *  ┌──────────┐ (lake freshness > tiering interval)
+ *  │Scheduled ├──────┐
+ *  └──────────┘      ▼
+ *       ▲        ┌───────┐ (assign to tier service)  ┌───────┐
+ *       |        |Pending├──────────────────────────►|Tiering├─┐
+ *       |        └───────┘                           └───┬───┘ │
+ *       |            ▲                 ┌─────────────────┘     │
+ *       |            |                 | (timeout or failure)  | (finished)
+ *       |            |                 ▼                       ▼
+ *       |            |  (retry)   ┌─────────┐             ┌────────┐
+ *       |            └────────────│ Failed  │             │ Tiered │
+ *       |                         └─────────┘             └────┬───┘
+ *       |                                                      |
+ *       └──────────────────────────────────────────────────────┘
+ *                   (ready for next round of tiering)
+ * }
+ */ +public class LakeTableTieringManager implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(LakeTableTieringManager.class); + + protected static final long TIERING_SERVICE_TIMEOUT_MS = 2 * 60 * 1000; // 2 minutes + + private final Timer lakeTieringScheduleTimer; + private final ScheduledExecutorService lakeTieringServiceTimeoutChecker; + private final Clock clock; + private final Queue pendingTieringTables; + private final LakeTieringExpiredOperationReaper expirationReaper; + + // the tiering state of the table to be tiered, + // from table_id -> tiering state + private final Map tieringStates; + + // table_id -> table path + private final Map tablePaths; + + // table_id -> freshness (tiering interval) + private final Map tableLakeFreshness; + + // cache table_id -> table tiering epoch + private final Map tableTierEpoch; + + // table_id -> the last timestamp of tiered lake snapshot + private final Map tableLastTieredTime; + + // the live tables that are tiering, + // from table_id -> last heartbeat time by the tiering service + private final Map liveTieringTableIds; + + private final Lock lock = new ReentrantLock(); + + public LakeTableTieringManager() { + this( + new DefaultTimer("delay lake tiering", 1_000, 20), + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory("fluss-lake-tiering-timeout-checker")), + SystemClock.getInstance()); + } + + @VisibleForTesting + protected LakeTableTieringManager( + Timer lakeTieringScheduleTimer, + ScheduledExecutorService lakeTieringServiceTimeoutChecker, + Clock clock) { + this.lakeTieringScheduleTimer = lakeTieringScheduleTimer; + this.lakeTieringServiceTimeoutChecker = lakeTieringServiceTimeoutChecker; + this.clock = clock; + this.pendingTieringTables = new ArrayDeque<>(); + this.tieringStates = new HashMap<>(); + this.liveTieringTableIds = new HashMap<>(); + this.tablePaths = new HashMap<>(); + this.tableLakeFreshness = new HashMap<>(); + this.expirationReaper = new LakeTieringExpiredOperationReaper(); + expirationReaper.start(); + this.lakeTieringServiceTimeoutChecker.scheduleWithFixedDelay( + this::checkTieringServiceTimeout, 0, 15, TimeUnit.SECONDS); + this.tableTierEpoch = new HashMap<>(); + this.tableLastTieredTime = new HashMap<>(); + } + + public void initWithLakeTables(List> tableInfoWithTieredTime) { + inLock( + lock, + () -> { + for (Tuple2 tableInfoAndLastLakeTime : + tableInfoWithTieredTime) { + TableInfo tableInfo = tableInfoAndLastLakeTime.f0; + long lastTieredTime = tableInfoAndLastLakeTime.f1; + registerLakeTable(tableInfo, lastTieredTime); + doHandleStateChange(tableInfo.getTableId(), TieringState.Initialized); + // schedule it to be tiered after the tiering interval + doHandleStateChange(tableInfo.getTableId(), TieringState.Scheduled); + } + }); + } + + public void addNewLakeTable(TableInfo tableInfo) { + inLock( + lock, + () -> { + registerLakeTable(tableInfo, clock.milliseconds()); + doHandleStateChange(tableInfo.getTableId(), TieringState.New); + // schedule it to be tiered after the tiering interval + doHandleStateChange(tableInfo.getTableId(), TieringState.Scheduled); + }); + } + + @GuardedBy("lock") + private void registerLakeTable(TableInfo tableInfo, long lastTieredTime) { + long tableId = tableInfo.getTableId(); + tablePaths.put(tableId, tableInfo.getTablePath()); + tableLakeFreshness.put( + tableId, tableInfo.getTableConfig().getDataLakeFreshness().toMillis()); + tableLastTieredTime.put(tableId, lastTieredTime); + tableTierEpoch.put(tableId, 0L); + } + + private void scheduleTableTiering(long tableId) { + Long freshnessInterval = tableLakeFreshness.get(tableId); + Long lastTieredTime = tableLastTieredTime.get(tableId); + if (freshnessInterval == null || lastTieredTime == null) { + // the table has been dropped, return directly + return; + } + long delayMs = freshnessInterval - (clock.milliseconds() - lastTieredTime); + // if the delayMs is < 0, the DelayedTiering will be triggered at once without + // adding into timing wheel. + lakeTieringScheduleTimer.add(new DelayedTiering(tableId, delayMs)); + } + + public void removeLakeTable(long tableId) { + inLock( + lock, + () -> { + tablePaths.remove(tableId); + tableLakeFreshness.remove(tableId); + tableLastTieredTime.remove(tableId); + tieringStates.remove(tableId); + liveTieringTableIds.remove(tableId); + tableTierEpoch.remove(tableId); + }); + } + + @VisibleForTesting + protected void checkTieringServiceTimeout() { + inLock( + lock, + () -> { + long currentTime = clock.milliseconds(); + Map timeoutTables = new HashMap<>(); + liveTieringTableIds.forEach( + (tableId, lastHeartbeat) -> { + if (currentTime - lastHeartbeat >= TIERING_SERVICE_TIMEOUT_MS) { + timeoutTables.put(tableId, tablePaths.get(tableId)); + } + }); + timeoutTables.forEach( + (tableId, tablePath) -> { + LOG.warn( + "The lake tiering service for table {}({}) is timeout, change it to PENDING.", + tablePaths.get(tableId), + tableId); + doHandleStateChange(tableId, TieringState.Failed); + // then to pending state to enable other tiering service can + // pick it + doHandleStateChange(tableId, TieringState.Pending); + }); + }); + } + + @Nullable + public LakeTieringTableInfo requestTable() { + return inLock( + lock, + () -> { + Long tableId = pendingTieringTables.poll(); + // no any pending table, return directly + if (tableId == null) { + return null; + } + TablePath tablePath = tablePaths.get(tableId); + // the table has been dropped, request again + if (tablePath == null) { + return requestTable(); + } + doHandleStateChange(tableId, TieringState.Tiering); + long tieringEpoch = tableTierEpoch.get(tableId); + return new LakeTieringTableInfo(tableId, tablePath, tieringEpoch); + }); + } + + public void finishTableTiering(long tableId, long tieredEpoch) { + inLock( + lock, + () -> { + validateTieringServiceRequest(tableId, tieredEpoch); + // to tiered state firstly + doHandleStateChange(tableId, TieringState.Tiered); + // then to scheduled state to enable other tiering service can pick it + doHandleStateChange(tableId, TieringState.Scheduled); + }); + } + + public void reportTieringFail(long tableId, long tieringEpoch) { + inLock( + lock, + () -> { + validateTieringServiceRequest(tableId, tieringEpoch); + // to fail state firstly + doHandleStateChange(tableId, TieringState.Failed); + // then to pending state to enable other tiering service can pick it + doHandleStateChange(tableId, TieringState.Pending); + }); + } + + public void renewTieringHeartbeat(long tableId, long tieringEpoch) { + inLock( + lock, + () -> { + validateTieringServiceRequest(tableId, tieringEpoch); + TieringState tieringState = tieringStates.get(tableId); + if (tieringState != TieringState.Tiering) { + throw new IllegalStateException( + String.format( + "The table %d to renew tiering heartbeat must in Tiering state, but in %s state.", + tableId, tieringState)); + } + liveTieringTableIds.put(tableId, clock.milliseconds()); + }); + } + + private void validateTieringServiceRequest(long tableId, long tieringEpoch) { + Long currentEpoch = tableTierEpoch.get(tableId); + // the table has been dropped, return false + if (currentEpoch == null) { + throw new TableNotExistException("The table " + tableId + " doesn't exist."); + } + if (tieringEpoch != currentEpoch) { + throw new FencedTieringEpochException( + String.format( + "The tiering epoch %d is not match current epoch %d in coordinator for table %d.", + tieringEpoch, currentEpoch, tableId)); + } + } + + /** + * Handle the state change of the lake table to be tiered. The core state transitions for the + * state machine are as follows: + * + *

New -> Scheduled: + * + *

-- When the lake table is newly created, do: schedule a timer to wait for a freshness + * interval configured in table which will transmit the table to Pending. + * + *

Initialized -> Scheduled: + * + *

-- When the coordinator server is restarted, for all existing lake table, if the interval + * from last lake snapshot is not less than tiering interval, do: transmit to Pending, otherwise + * schedule a timer to wait for a freshness interval which will transmit the table to Pending. + * + *

Scheduled -> Pending + * + *

-- The freshness interval to wait has passed, do: transmit to Pending state + * + *

Failed -> Pending + * + *

-- The previous tiering service failed to tier the table, retry to tier again, do: + * transmit to Pending state + * + *

Pending -> Tiering + * + *

-- When the table is assigned to a tiering service after tiering service request the + * table, do: transmit to Tiering state + * + *

Tiering -> Tiered + * + *

-- When the tiering service finished the table, do: transmit to Tiered state + * + *

Tiering -> Failed + * + *

-- When the tiering service timeout to report heartbeat or report failure for the table, + * do: transmit to Tiered state + */ + private void doHandleStateChange(long tableId, TieringState targetState) { + TieringState currentState = tieringStates.get(tableId); + if (!isValidStateTransition(currentState, targetState)) { + LOG.error( + "Fail to change state for table {} from {} to {} as it's not a valid state change.", + tableId, + currentState, + targetState); + return; + } + switch (targetState) { + case New: + case Initialized: + // do nothing + break; + case Scheduled: + scheduleTableTiering(tableId); + break; + case Pending: + // increase tiering epoch and initialize the heartbeat of the tiering table + tableTierEpoch.computeIfPresent(tableId, (t, v) -> v + 1); + pendingTieringTables.add(tableId); + break; + case Tiering: + liveTieringTableIds.put(tableId, clock.milliseconds()); + break; + case Tiered: + case Failed: + liveTieringTableIds.remove(tableId); + // do nothing + break; + } + doStateChange(tableId, currentState, targetState); + } + + private boolean isValidStateTransition( + @Nullable TieringState curState, TieringState targetState) { + if (targetState == TieringState.New || targetState == TieringState.Initialized) { + // when target state is new or Initialized, it's valid when current state is null + return curState == null; + } + if (curState == null) { + // the table is dropped, shouldn't continue to do state transition + return false; + } + return targetState.validPreviousStates().contains(curState); + } + + private void doStateChange(long tableId, TieringState fromState, TieringState toState) { + tieringStates.put(tableId, toState); + LOG.debug( + "Successfully changed tiering state for table {} from {} to {}.", + tableId, + fromState, + fromState); + } + + @Override + public void close() throws Exception { + lakeTieringServiceTimeoutChecker.shutdown(); + expirationReaper.initiateShutdown(); + // improve shutdown time by waking up any ShutdownableThread(s) blocked on poll by + // sending a no-op. + lakeTieringScheduleTimer.add( + new TimerTask(0) { + @Override + public void run() {} + }); + try { + expirationReaper.awaitShutdown(); + } catch (InterruptedException e) { + throw new FlussRuntimeException( + "Error while shutdown lake tiering expired operation manager", e); + } + + lakeTieringScheduleTimer.shutdown(); + } + + private class DelayedTiering extends TimerTask { + + private final long tableId; + + public DelayedTiering(long tableId, long delayMs) { + super(delayMs); + this.tableId = tableId; + } + + @Override + public void run() { + inLock( + lock, + () -> + // to pending state + doHandleStateChange(tableId, TieringState.Pending)); + } + } + + private class LakeTieringExpiredOperationReaper extends ShutdownableThread { + + public LakeTieringExpiredOperationReaper() { + super("LakeTieringExpiredOperationReaper", false); + } + + @Override + public void doWork() throws Exception { + advanceClock(); + } + + private void advanceClock() throws InterruptedException { + lakeTieringScheduleTimer.advanceClock(200L); + } + } + + private enum TieringState { + // When a new lake table is created, the state will be New + New { + @Override + public Set validPreviousStates() { + return Collections.emptySet(); + } + }, + // When the coordinator server is restarted, the state of existing lake table + // will be Initialized + Initialized { + @Override + public Set validPreviousStates() { + return Collections.emptySet(); + } + }, + // When the lake table is waiting to be tiered, such as waiting for the period of tiering + // interval, the state will be Scheduled + Scheduled { + @Override + public Set validPreviousStates() { + return EnumSet.of(New, Initialized, Tiered); + } + }, + // When the period of tiering interval has passed, but no any tiering service requesting + // table, the state will be Pending + Pending { + @Override + public Set validPreviousStates() { + return EnumSet.of(Scheduled, Failed); + } + }, + // When one tiering service is tiering the table, the state will be Tiering + Tiering { + @Override + public Set validPreviousStates() { + return EnumSet.of(Pending); + } + }, + + // When one tiering service has successfully tiered the table, the state will be Tiered + Tiered { + @Override + public Set validPreviousStates() { + return EnumSet.of(Tiering); + } + }, + // When one tiering service fail or timeout to tier the table, the state will be Failed + Failed { + @Override + public Set validPreviousStates() { + return EnumSet.of(Tiering); + } + }; + + abstract Set validPreviousStates(); + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/DropTableEvent.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/DropTableEvent.java index dcb2e2ed03..fc61b90789 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/DropTableEvent.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/DropTableEvent.java @@ -25,10 +25,12 @@ public class DropTableEvent implements CoordinatorEvent { // true if the table is with auto partition enabled private final boolean isAutoPartitionTable; + private final boolean isDataLakeEnabled; - public DropTableEvent(long tableId, boolean isAutoPartitionTable) { + public DropTableEvent(long tableId, boolean isAutoPartitionTable, boolean isDataLakeEnabled) { this.tableId = tableId; this.isAutoPartitionTable = isAutoPartitionTable; + this.isDataLakeEnabled = isDataLakeEnabled; } public long getTableId() { @@ -39,21 +41,27 @@ public boolean isAutoPartitionTable() { return isAutoPartitionTable; } + public boolean isDataLakeEnabled() { + return isDataLakeEnabled; + } + @Override - public boolean equals(Object o) { - if (this == o) { + public boolean equals(Object object) { + if (this == object) { return true; } - if (o == null || getClass() != o.getClass()) { + if (!(object instanceof DropTableEvent)) { return false; } - DropTableEvent that = (DropTableEvent) o; - return tableId == that.tableId && isAutoPartitionTable == that.isAutoPartitionTable; + DropTableEvent that = (DropTableEvent) object; + return tableId == that.tableId + && isAutoPartitionTable == that.isAutoPartitionTable + && isDataLakeEnabled == that.isDataLakeEnabled; } @Override public int hashCode() { - return Objects.hash(tableId, isAutoPartitionTable); + return Objects.hash(tableId, isAutoPartitionTable, isDataLakeEnabled); } @Override @@ -63,6 +71,8 @@ public String toString() { + tableId + ", isAutoPartitionTable=" + isAutoPartitionTable + + ", isDataLakeEnabled=" + + isDataLakeEnabled + '}'; } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcher.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcher.java index e03ecc0caa..0206bd5047 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcher.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcher.java @@ -16,6 +16,8 @@ package com.alibaba.fluss.server.coordinator.event.watcher; +import com.alibaba.fluss.config.Configuration; +import com.alibaba.fluss.config.TableConfig; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.SchemaInfo; import com.alibaba.fluss.metadata.TableInfo; @@ -36,7 +38,6 @@ import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.ChildData; import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCache; import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.cache.CuratorCacheListener; -import com.alibaba.fluss.utils.AutoPartitionStrategy; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,11 +141,15 @@ public void event(Type type, ChildData oldData, ChildData newData) { break; } TableRegistration table = TableZNode.decode(oldData.getData()); + TableConfig tableConfig = + new TableConfig(Configuration.fromMap(table.properties)); eventManager.put( new DropTableEvent( table.tableId, - AutoPartitionStrategy.from(table.properties) - .isAutoPartitionEnabled())); + tableConfig + .getAutoPartitionStrategy() + .isAutoPartitionEnabled(), + tableConfig.isDataLakeEnabled())); } break; } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/entity/LakeTieringTableInfo.java b/fluss-server/src/main/java/com/alibaba/fluss/server/entity/LakeTieringTableInfo.java new file mode 100644 index 0000000000..b5d2bfc406 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/entity/LakeTieringTableInfo.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.entity; + +import com.alibaba.fluss.metadata.TablePath; + +import java.util.Objects; + +/** The info for the table assigned from Coordinator to lake tiering service to do tiering. */ +public class LakeTieringTableInfo { + + private final long tableId; + private final TablePath tablePath; + private final long tieringEpoch; + + public LakeTieringTableInfo(long tableId, TablePath tablePath, long tieringEpoch) { + this.tableId = tableId; + this.tablePath = tablePath; + this.tieringEpoch = tieringEpoch; + } + + public long tableId() { + return tableId; + } + + public TablePath tablePath() { + return tablePath; + } + + public long tieringEpoch() { + return tieringEpoch; + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + LakeTieringTableInfo that = (LakeTieringTableInfo) o; + return tableId == that.tableId + && tieringEpoch == that.tieringEpoch + && Objects.equals(tablePath, that.tablePath); + } + + @Override + public int hashCode() { + return Objects.hash(tableId, tablePath, tieringEpoch); + } + + @Override + public String toString() { + return "LakeTieringTableInfo{" + + "tableId=" + + tableId + + ", tablePath=" + + tablePath + + ", tieringEpoch=" + + tieringEpoch + + '}'; + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/DefaultTimer.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/DefaultTimer.java index a9ca5d3ac7..2ea5490405 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/DefaultTimer.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/DefaultTimer.java @@ -16,6 +16,8 @@ package com.alibaba.fluss.server.utils.timer; +import com.alibaba.fluss.utils.clock.Clock; +import com.alibaba.fluss.utils.clock.SystemClock; import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory; import javax.annotation.concurrent.ThreadSafe; @@ -46,17 +48,30 @@ public class DefaultTimer implements Timer { private final AtomicInteger taskCounter; private final TimingWheel timingWheel; private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); + private final Clock clock; - public DefaultTimer(String executorName, long tickMs, int wheelSize, long startMs) { + public DefaultTimer(String executorName, long tickMs, int wheelSize, Clock clock) { this.taskExecutor = Executors.newFixedThreadPool(1, new ExecutorThreadFactory(executorName)); this.delayQueue = new DelayQueue<>(); this.taskCounter = new AtomicInteger(0); - this.timingWheel = new TimingWheel(tickMs, wheelSize, startMs, taskCounter, delayQueue); + this.clock = clock; + this.timingWheel = + new TimingWheel( + tickMs, + wheelSize, + TimeUnit.NANOSECONDS.toMillis(clock.nanoseconds()), + taskCounter, + delayQueue, + clock); + } + + public DefaultTimer(String executorName, long tickMs, int wheelSize) { + this(executorName, tickMs, wheelSize, SystemClock.getInstance()); } public DefaultTimer(String executorName) { - this(executorName, 1, 20, TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); + this(executorName, 1, 20, SystemClock.getInstance()); } @Override @@ -69,7 +84,7 @@ public void add(TimerTask timerTask) { timerTask, timerTask.getDelayMs() + TimeUnit.NANOSECONDS.toMillis( - System.nanoTime())))); + clock.nanoseconds())))); } @Override diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimerTaskList.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimerTaskList.java index f43a552604..1707a72b32 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimerTaskList.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimerTaskList.java @@ -16,6 +16,9 @@ package com.alibaba.fluss.server.utils.timer; +import com.alibaba.fluss.utils.clock.Clock; +import com.alibaba.fluss.utils.clock.SystemClock; + import javax.annotation.concurrent.ThreadSafe; import java.util.concurrent.Delayed; @@ -32,13 +35,19 @@ @ThreadSafe class TimerTaskList implements Delayed { private final AtomicInteger taskCounter; + private final Clock clock; private final TimerTaskEntry root = new TimerTaskEntry(null, -1); private final AtomicLong expiration = new AtomicLong(-1L); TimerTaskList(AtomicInteger taskCounter) { + this(taskCounter, SystemClock.getInstance()); + } + + TimerTaskList(AtomicInteger taskCounter, Clock clock) { this.taskCounter = taskCounter; this.root.next = root; this.root.prev = root; + this.clock = clock; } /** @@ -121,7 +130,7 @@ synchronized void flush(Consumer f) { @Override public long getDelay(TimeUnit unit) { return unit.convert( - Math.max(getExpiration() - TimeUnit.NANOSECONDS.toMillis(System.nanoTime()), 0), + Math.max(getExpiration() - TimeUnit.NANOSECONDS.toMillis(clock.nanoseconds()), 0), TimeUnit.MILLISECONDS); } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimingWheel.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimingWheel.java index 583e852853..a68d77ff7e 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimingWheel.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/timer/TimingWheel.java @@ -16,6 +16,8 @@ package com.alibaba.fluss.server.utils.timer; +import com.alibaba.fluss.utils.clock.Clock; + import javax.annotation.concurrent.NotThreadSafe; import java.util.Timer; @@ -129,6 +131,7 @@ final class TimingWheel { private final AtomicInteger taskCounter; private final DelayQueue queue; + private final Clock clock; /** The upper level timing wheel. */ private volatile TimingWheel overflowWheel; @@ -139,7 +142,8 @@ final class TimingWheel { int wheelSize, long startMs, AtomicInteger taskCounter, - DelayQueue queue) { + DelayQueue queue, + Clock clock) { this.tickMs = tickMs; this.wheelSize = wheelSize; this.interval = tickMs * wheelSize; @@ -151,13 +155,15 @@ final class TimingWheel { // Initialize buckets for (int i = 0; i < wheelSize; i++) { - buckets[i] = new TimerTaskList(taskCounter); + buckets[i] = new TimerTaskList(taskCounter, clock); } + this.clock = clock; } private synchronized void addOverflowWheel() { if (overflowWheel == null) { - overflowWheel = new TimingWheel(interval, wheelSize, currentTime, taskCounter, queue); + overflowWheel = + new TimingWheel(interval, wheelSize, currentTime, taskCounter, queue, clock); } } diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 977b431d21..866895cba0 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -132,6 +132,7 @@ class CoordinatorEventProcessorTest { private ServerMetadataCache serverMetadataCache; private TestCoordinatorChannelManager testCoordinatorChannelManager; private AutoPartitionManager autoPartitionManager; + private LakeTableTieringManager lakeTableTieringManager; private CompletedSnapshotStoreManager completedSnapshotStoreManager; @BeforeAll @@ -159,6 +160,7 @@ void beforeEach() throws IOException { testCoordinatorChannelManager = new TestCoordinatorChannelManager(); autoPartitionManager = new AutoPartitionManager(serverMetadataCache, metadataManager, new Configuration()); + lakeTableTieringManager = new LakeTableTieringManager(); Configuration conf = new Configuration(); conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); eventProcessor = buildCoordinatorEventProcessor(); @@ -716,6 +718,7 @@ private CoordinatorEventProcessor buildCoordinatorEventProcessor() { serverMetadataCache, testCoordinatorChannelManager, autoPartitionManager, + lakeTableTieringManager, TestingMetricGroups.COORDINATOR_METRICS, new Configuration(), Executors.newFixedThreadPool(1, new ExecutorThreadFactory("test-coordinator-io"))); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/LakeTableTieringManagerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/LakeTableTieringManagerTest.java new file mode 100644 index 0000000000..ca8bc5a6b8 --- /dev/null +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/LakeTableTieringManagerTest.java @@ -0,0 +1,261 @@ +/* + * Copyright (c) 2025 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.coordinator; + +import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.exception.FencedTieringEpochException; +import com.alibaba.fluss.exception.TableNotExistException; +import com.alibaba.fluss.metadata.Schema; +import com.alibaba.fluss.metadata.TableDescriptor; +import com.alibaba.fluss.metadata.TableInfo; +import com.alibaba.fluss.metadata.TablePath; +import com.alibaba.fluss.server.entity.LakeTieringTableInfo; +import com.alibaba.fluss.server.utils.timer.DefaultTimer; +import com.alibaba.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService; +import com.alibaba.fluss.types.DataTypes; +import com.alibaba.fluss.utils.clock.ManualClock; +import com.alibaba.fluss.utils.types.Tuple2; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static com.alibaba.fluss.server.coordinator.LakeTableTieringManager.TIERING_SERVICE_TIMEOUT_MS; +import static com.alibaba.fluss.testutils.common.CommonTestUtils.waitValue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link LakeTableTieringManager}. */ +class LakeTableTieringManagerTest { + + private LakeTableTieringManager tableTieringManager; + private ManualClock manualClock; + private ManuallyTriggeredScheduledExecutorService lakeTieringServiceTimeoutChecker; + + @BeforeEach + void beforeEach() { + manualClock = new ManualClock(); + lakeTieringServiceTimeoutChecker = new ManuallyTriggeredScheduledExecutorService(); + tableTieringManager = createLakeTableTieringManager(); + } + + private LakeTableTieringManager createLakeTableTieringManager() { + return new LakeTableTieringManager( + new DefaultTimer("delay lake tiering", 1_000, 20, manualClock), + lakeTieringServiceTimeoutChecker, + manualClock); + } + + @Test + void testInitLakeTableTieringManagerWithTables() { + long tableId1 = 1L; + TablePath tablePath1 = TablePath.of("db", "table1"); + TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, Duration.ofMinutes(3)); + + long tableId2 = 2L; + TablePath tablePath2 = TablePath.of("db", "table2"); + TableInfo tableInfo2 = createTableInfo(tableId2, tablePath2, Duration.ofMinutes(3)); + + List> lakeTables = + Arrays.asList( + Tuple2.of(tableInfo1, manualClock.milliseconds()), + // the last lake snapshot of table2 is older than 3 minutes, should be + // tiered right now + Tuple2.of( + tableInfo2, + manualClock.milliseconds() - Duration.ofMinutes(3).toMillis())); + tableTieringManager.initWithLakeTables(lakeTables); + // table2 should be PENDING at once without async scheduling + assertRequestTable(tableId2, tablePath2, 1); + + // advance 3 min to trigger table1 to be tiered + manualClock.advanceTime(Duration.ofMinutes(3)); + assertRequestTable(tableId1, tablePath1, 1); + } + + @Test + void testAddNewLakeTable() { + long tableId1 = 1L; + TablePath tablePath1 = TablePath.of("db", "table"); + TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo1); + + // advance time to trigger the table tiering + manualClock.advanceTime(Duration.ofSeconds(10)); + assertRequestTable(tableId1, tablePath1, 1); + } + + @Test + void testRemoveLakeTable() { + long tableId1 = 1L; + TablePath tablePath1 = TablePath.of("db", "table"); + TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo1); + + long tableId2 = 2L; + TablePath tablePath2 = TablePath.of("db", "table2"); + TableInfo tableInfo2 = createTableInfo(tableId2, tablePath2, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo2); + + // remove the tableId1 + tableTieringManager.removeLakeTable(tableId1); + + // advance time to trigger the table tiering + manualClock.advanceTime(Duration.ofSeconds(10)); + // shouldn't get tableId1, should only get tableId2 + assertRequestTable(tableId2, tablePath2, 1); + + // verify the request for table1 should throw table not exist exception + assertThatThrownBy(() -> tableTieringManager.renewTieringHeartbeat(tableId1, 1)) + .isInstanceOf(TableNotExistException.class) + .hasMessage("The table %d doesn't exist.", tableId1); + assertThatThrownBy(() -> tableTieringManager.reportTieringFail(tableId1, 1)) + .isInstanceOf(TableNotExistException.class) + .hasMessage("The table %d doesn't exist.", tableId1); + assertThatThrownBy(() -> tableTieringManager.finishTableTiering(tableId1, 1)) + .isInstanceOf(TableNotExistException.class) + .hasMessage("The table %d doesn't exist.", tableId1); + } + + @Test + void testFinishTableTieringReTriggerSchedule() { + long tieredEpoch = 1; + long tableId1 = 1L; + TablePath tablePath1 = TablePath.of("db", "table"); + TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo1); + + manualClock.advanceTime(Duration.ofSeconds(10)); + // check requested table + assertRequestTable(tableId1, tablePath1, 1); + + // request table should return null + assertThat(tableTieringManager.requestTable()).isNull(); + + // mock lake tiering finish one-round tiering + tableTieringManager.finishTableTiering(tableId1, tieredEpoch); + // not advance time, request table should return null + assertThat(tableTieringManager.requestTable()).isNull(); + + // now, advance time to trigger the table tiering + manualClock.advanceTime(Duration.ofSeconds(10)); + // the tiered epoch should be 2 now + assertRequestTable(tableId1, tablePath1, 2); + } + + @Test + void testTieringServiceTimeOutReTriggerPending() { + long tableId1 = 1L; + TablePath tablePath1 = TablePath.of("db", "table1"); + TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo1); + long tableId2 = 2L; + TablePath tablePath2 = TablePath.of("db", "table2"); + TableInfo tableInfo2 = createTableInfo(tableId2, tablePath2, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo2); + + manualClock.advanceTime(Duration.ofSeconds(10)); + // check requested table + assertRequestTable(tableId1, tablePath1, 1); + assertRequestTable(tableId2, tablePath2, 1); + + // advance time and mock tiering service heartbeat + manualClock.advanceTime(Duration.ofMillis(TIERING_SERVICE_TIMEOUT_MS - 1)); + // tableid1 renew the tiering heartbeat, so that it won't be + // re-pending after heartbeat timeout + tableTieringManager.renewTieringHeartbeat(tableId1, 1); + // should only get table2 + manualClock.advanceTime(Duration.ofSeconds(10)); + lakeTieringServiceTimeoutChecker.triggerPeriodicScheduledTasks(); + assertRequestTable(tableId2, tablePath2, 2); + + // advance a large time to mock tiering service heartbeat timeout + // and check the request table, the table1 should be re-scheduled + manualClock.advanceTime(Duration.ofMinutes(5)); + tableTieringManager.checkTieringServiceTimeout(); + assertRequestTable(tableId1, tablePath1, 2); + + // now, assume the previous tiering service come alive, try to send request for the table1 + // should throw FencedTieringEpochException + assertThatThrownBy(() -> tableTieringManager.renewTieringHeartbeat(tableId1, 1)) + .isInstanceOf(FencedTieringEpochException.class) + .hasMessage( + "The tiering epoch %d is not match current epoch %d in coordinator for table %d.", + 1, 2, tableId1); + assertThatThrownBy(() -> tableTieringManager.reportTieringFail(tableId1, 1)) + .isInstanceOf(FencedTieringEpochException.class) + .hasMessage( + "The tiering epoch %d is not match current epoch %d in coordinator for table %d.", + 1, 2, tableId1); + assertThatThrownBy(() -> tableTieringManager.finishTableTiering(tableId1, 1)) + .isInstanceOf(FencedTieringEpochException.class) + .hasMessage( + "The tiering epoch %d is not match current epoch %d in coordinator for table %d.", + 1, 2, tableId1); + assertThatThrownBy(() -> tableTieringManager.finishTableTiering(tableId1, 3)) + .isInstanceOf(FencedTieringEpochException.class) + .hasMessage( + "The tiering epoch %d is not match current epoch %d in coordinator for table %d.", + 3, 2, tableId1); + } + + @Test + void testTieringFail() { + long tableId1 = 1L; + TablePath tablePath1 = TablePath.of("db", "table1"); + TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, Duration.ofSeconds(10)); + tableTieringManager.addNewLakeTable(tableInfo1); + manualClock.advanceTime(Duration.ofSeconds(10)); + assertRequestTable(tableId1, tablePath1, 1); + + // should be re-pending after tiering fail + tableTieringManager.reportTieringFail(tableId1, 1); + // we should get the table again + assertRequestTable(tableId1, tablePath1, 2); + } + + private TableInfo createTableInfo(long tableId, TablePath tablePath, Duration freshness) { + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema(Schema.newBuilder().column("c1", DataTypes.INT()).build()) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) + .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, freshness) + .distributedBy(1) + .build(); + + return TableInfo.of( + tablePath, + tableId, + 1, + tableDescriptor, + System.currentTimeMillis(), + System.currentTimeMillis()); + } + + private void assertRequestTable(long tableId, TablePath tablePath, long tieredEpoch) { + LakeTieringTableInfo table = + waitValue( + () -> Optional.ofNullable(tableTieringManager.requestTable()), + Duration.ofSeconds(10), + "Request tiering table timout"); + assertThat(table).isEqualTo(new LakeTieringTableInfo(tableId, tablePath, tieredEpoch)); + } +} diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java index 81e6a8453f..b77270299a 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java @@ -134,7 +134,7 @@ void testTableChanges() { CreateTableEvent createTableEvent = (CreateTableEvent) coordinatorEvent; TableInfo tableInfo = createTableEvent.getTableInfo(); metadataManager.dropTable(tableInfo.getTablePath(), false); - expectedTableEvents.add(new DropTableEvent(tableInfo.getTableId(), false)); + expectedTableEvents.add(new DropTableEvent(tableInfo.getTableId(), false, false)); } // collect all events and check the all events @@ -211,7 +211,7 @@ void testPartitionedTable() throws Exception { expectedEvents.add(new DropPartitionEvent(tableId, 1L, "2011")); expectedEvents.add(new DropPartitionEvent(tableId, 2L, "2022")); // drop table event - expectedEvents.add(new DropTableEvent(tableId, true)); + expectedEvents.add(new DropTableEvent(tableId, true, false)); retry( Duration.ofMinutes(1), diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java index aa3da6cac5..aae4e1e645 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java @@ -28,6 +28,7 @@ import com.alibaba.fluss.server.coordinator.CoordinatorEventProcessor; import com.alibaba.fluss.server.coordinator.CoordinatorRequestBatch; import com.alibaba.fluss.server.coordinator.CoordinatorTestUtils; +import com.alibaba.fluss.server.coordinator.LakeTableTieringManager; import com.alibaba.fluss.server.coordinator.MetadataManager; import com.alibaba.fluss.server.coordinator.TestCoordinatorChannelManager; import com.alibaba.fluss.server.coordinator.event.CoordinatorEventManager; @@ -74,6 +75,7 @@ class TableBucketStateMachineTest { private TestCoordinatorChannelManager testCoordinatorChannelManager; private CoordinatorRequestBatch coordinatorRequestBatch; private AutoPartitionManager autoPartitionManager; + private LakeTableTieringManager lakeTableTieringManager; @BeforeAll static void baseBeforeAll() { @@ -102,6 +104,7 @@ void beforeEach() throws IOException { serverMetadataCache, new MetadataManager(zookeeperClient, new Configuration()), new Configuration()); + lakeTableTieringManager = new LakeTableTieringManager(); } @Test @@ -235,6 +238,7 @@ void testStateChangeToOnline() throws Exception { TestingClientMetricGroup.newInstance())), coordinatorContext, autoPartitionManager, + lakeTableTieringManager, TestingMetricGroups.COORDINATOR_METRICS, new Configuration(), Executors.newFixedThreadPool( diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/utils/timer/DefaultTimerTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/utils/timer/DefaultTimerTest.java index ebd0006418..5b090c858d 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/utils/timer/DefaultTimerTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/utils/timer/DefaultTimerTest.java @@ -34,7 +34,7 @@ public class DefaultTimerTest { @BeforeEach void setup() { - timer = new DefaultTimer("test", 1, 3, TimeUnit.NANOSECONDS.toMillis(System.nanoTime())); + timer = new DefaultTimer("test", 1, 3); } @AfterEach diff --git a/website/docs/engine-flink/options.md b/website/docs/engine-flink/options.md index d008862365..c72f141738 100644 --- a/website/docs/engine-flink/options.md +++ b/website/docs/engine-flink/options.md @@ -94,6 +94,7 @@ ALTER TABLE log_table SET ('table.log.ttl' = '7d'); | table.log.tiered.local-segments | Integer | 2 | The number of log segments to retain in local for each table when log tiered storage is enabled. It must be greater that 0. The default is 2. | | table.datalake.enabled | Boolean | false | Whether enable lakehouse storage for the table. Disabled by default. When this option is set to ture and the datalake tiering service is up, the table will be tiered and compacted into datalake format stored on lakehouse storage. | | table.datalake.format | Enum | (None) | The data lake format of the table specifies the tiered Lakehouse storage format, such as Paimon, Iceberg, DeltaLake, or Hudi. Currently, only 'paimon' is supported. Once the `table.datalake.format` property is configured, Fluss adopts the key encoding and bucketing strategy used by the corresponding data lake format. This ensures consistency in key encoding and bucketing, enabling seamless **Union Read** functionality across Fluss and Lakehouse. The `table.datalake.format` can be pre-defined before enabling `table.datalake.enabled`. This allows the data lake feature to be dynamically enabled on the table without requiring table recreation. If `table.datalake.format` is not explicitly set during table creation, the table will default to the format specified by the `datalake.format` configuration in the Fluss cluster | +| table.datalake.freshness | Duration | 3min | It defines the maximum amount of time that the datalake table's content should lag behind updates to the Fluss table. Based on this target freshness, the Fluss service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. If the data does not need to be as fresh, you can specify a longer target freshness time to reduce costs.| | table.merge-engine | Enum | (None) | Defines the merge engine for the primary key table. By default, primary key table uses the [default merge engine(last_row)](table-design/table-types/pk-table/merge-engines/default.md). It also supports two merge engines are 'first_row' and 'versioned'. The [first_row merge engine](table-design/table-types/pk-table/merge-engines/first-row.md) will keep the first row of the same primary key. The [versioned merge engine](table-design/table-types/pk-table/merge-engines/versioned.md) will keep the row with the largest version of the same primary key. | | table.merge-engine.versioned.ver-column | String | (None) | The column name of the version column for the 'versioned' merge engine. If the merge engine is set to 'versioned', the version column must be set. |