diff --git a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java index 74bb3012f5..fa9c4274c9 100644 --- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java @@ -36,18 +36,23 @@ public class FlussConfigUtils { public static final String CLIENT_PREFIX = "client."; public static final String CLIENT_SECURITY_PREFIX = "client.security."; - public static final List ALTERABLE_TABLE_CONFIG; + public static final List ALTERABLE_TABLE_OPTIONS; static { TABLE_OPTIONS = extractConfigOptions("table."); CLIENT_OPTIONS = extractConfigOptions("client."); - ALTERABLE_TABLE_CONFIG = Collections.emptyList(); + ALTERABLE_TABLE_OPTIONS = + Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); } public static boolean isTableStorageConfig(String key) { return key.startsWith(TABLE_PREFIX); } + public static boolean isAlterableTableOption(String key) { + return ALTERABLE_TABLE_OPTIONS.contains(key); + } + @VisibleForTesting static Map> extractConfigOptions(String prefix) { Map> options = new HashMap<>(); diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java index 1be24772d0..5e97a635af 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java @@ -25,6 +25,7 @@ import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.metadata.Schema; +import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.testutils.FlussClusterExtension; @@ -48,7 +49,9 @@ import java.nio.file.Files; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.stream.Stream; @@ -364,6 +367,70 @@ void testCreateLakeEnabledTableWithAllTypes() throws Exception { BUCKET_NUM); } + @Test + void testAlterLakeEnabledLogTable() throws Exception { + Map customProperties = new HashMap<>(); + customProperties.put("k1", "v1"); + customProperties.put("paimon.file.format", "parquet"); + + // log table with lake disabled + TableDescriptor logTable = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("log_c1", DataTypes.INT()) + .column("log_c2", DataTypes.STRING()) + .build()) + .property(ConfigOptions.TABLE_DATALAKE_ENABLED, false) + .customProperties(customProperties) + .distributedBy(BUCKET_NUM, "log_c1", "log_c2") + .build(); + TablePath logTablePath = TablePath.of(DATABASE, "log_table_alter"); + admin.createTable(logTablePath, logTable, false).get(); + + assertThatThrownBy( + () -> + paimonCatalog.getTable( + Identifier.create(DATABASE, logTablePath.getTableName()))) + .isInstanceOf(Catalog.TableNotExistException.class); + + // enable lake + TableChange.SetOption enableLake = + TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); + List changes = Collections.singletonList(enableLake); + + admin.alterTable(logTablePath, changes, false).get(); + + Table enabledPaimonLogTable = + paimonCatalog.getTable(Identifier.create(DATABASE, logTablePath.getTableName())); + + Map updatedProperties = new HashMap<>(); + updatedProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); + TableDescriptor updatedLogTable = logTable.withProperties(updatedProperties); + // check the gotten log table + verifyPaimonTable( + enabledPaimonLogTable, + updatedLogTable, + RowType.of( + new DataType[] { + org.apache.paimon.types.DataTypes.INT(), + org.apache.paimon.types.DataTypes.STRING(), + // for __bucket, __offset, __timestamp + org.apache.paimon.types.DataTypes.INT(), + org.apache.paimon.types.DataTypes.BIGINT(), + org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE() + }, + new String[] { + "log_c1", + "log_c2", + BUCKET_COLUMN_NAME, + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME + }), + "log_c1,log_c2", + BUCKET_NUM); + } + @Test void testThrowExceptionWhenConflictWithSystemColumn() { for (String systemColumn : diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java index 995b2db469..2580370d8c 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/testutils/FlinkPaimonTieringTestBase.java @@ -256,10 +256,16 @@ protected long createLogTable(TablePath tablePath) throws Exception { } protected long createLogTable(TablePath tablePath, int bucketNum) throws Exception { - return createLogTable(tablePath, bucketNum, false); + return createLogTable( + tablePath, bucketNum, false, Collections.emptyMap(), Collections.emptyMap()); } - protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPartitioned) + protected long createLogTable( + TablePath tablePath, + int bucketNum, + boolean isPartitioned, + Map properties, + Map customProperties) throws Exception { Schema.Builder schemaBuilder = Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING()); @@ -277,6 +283,8 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart tableBuilder.property( ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR); } + tableBuilder.properties(properties); + tableBuilder.customProperties(customProperties); tableBuilder.schema(schemaBuilder.build()); return createTable(tablePath, tableBuilder.build()); } diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java index 442962f311..b4990f1983 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java @@ -22,6 +22,7 @@ import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase; import org.apache.fluss.metadata.Schema; import org.apache.fluss.metadata.TableBucket; +import org.apache.fluss.metadata.TableChange; import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.row.InternalRow; @@ -175,10 +176,7 @@ void testTiering() throws Exception { { put( FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, - "[" - + "{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3}," - + "{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}" - + "]"); + getPartitionOffsetStr(partitionNameByIds)); } }; checkSnapshotPropertyInPaimon(partitionedTablePath, properties); @@ -187,6 +185,150 @@ void testTiering() throws Exception { } } + @Test + void testTieringForAlterTable() throws Exception { + TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableAlter"); + Map tableProperties = new HashMap<>(); + tableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false"); + + long t1Id = createPkTable(t1, 1, tableProperties, Collections.emptyMap()); + + TableChange.SetOption setOption = + TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true"); + List changes = Collections.singletonList(setOption); + admin.alterTable(t1, changes, false).get(); + + TableBucket t1Bucket = new TableBucket(t1Id, 0); + + // write records + List rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")); + writeRows(t1, rows, false); + waitUntilSnapshot(t1Id, 1, 0); + + // then start tiering job + JobClient jobClient = buildTieringJob(execEnv); + + try { + // check the status of replica after synced + assertReplicaStatus(t1Bucket, 3); + // check data in paimon + checkDataInPaimonPrimaryKeyTable(t1, rows); + // check snapshot property in paimon + Map properties = + new HashMap() { + { + put( + FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, + "[{\"bucket_id\":0,\"log_offset\":3}]"); + } + }; + checkSnapshotPropertyInPaimon(t1, properties); + + // then, create another log table + TablePath t2 = TablePath.of(DEFAULT_DB, "logTableAlter"); + + Map logTableProperties = new HashMap<>(); + logTableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false"); + long t2Id = createLogTable(t2, 1, false, logTableProperties, Collections.emptyMap()); + // enable lake + admin.alterTable(t2, changes, false).get(); + + TableBucket t2Bucket = new TableBucket(t2Id, 0); + List flussRows = new ArrayList<>(); + // write records + for (int i = 0; i < 10; i++) { + rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3")); + flussRows.addAll(rows); + // write records + writeRows(t2, rows, true); + } + // check the status of replica after synced; + // note: we can't update log start offset for unaware bucket mode log table + assertReplicaStatus(t2Bucket, 30); + + // check data in paimon + checkDataInPaimonAppendOnlyTable(t2, flussRows, 0); + + // then write data to the pk tables + // write records + rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, "v333")); + // write records + writeRows(t1, rows, false); + + // check the status of replica of t2 after synced + // not check start offset since we won't + // update start log offset for primary key table + assertReplicaStatus(t1Bucket, 9); + + checkDataInPaimonPrimaryKeyTable(t1, rows); + + // then create partitioned table and wait partitions are ready + TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, "partitionedTableAlter"); + Map partitionTableProperties = new HashMap<>(); + partitionTableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false"); + + Tuple2 tableIdAndDescriptor = + createPartitionedTable( + partitionedTablePath, partitionTableProperties, Collections.emptyMap()); + + admin.alterTable(partitionedTablePath, changes, false).get(); + + Map partitionNameByIds = waitUntilPartitions(partitionedTablePath); + + // now, write rows into partitioned table + TableDescriptor partitionedTableDescriptor = tableIdAndDescriptor.f1; + Map> writtenRowsByPartition = + writeRowsIntoPartitionedTable( + partitionedTablePath, partitionedTableDescriptor, partitionNameByIds); + long tableId = tableIdAndDescriptor.f0; + + // wait until synced to paimon + for (Long partitionId : partitionNameByIds.keySet()) { + TableBucket tableBucket = new TableBucket(tableId, partitionId, 0); + assertReplicaStatus(tableBucket, 3); + } + + // now, let's check data in paimon per partition + // check data in paimon + String partitionCol = partitionedTableDescriptor.getPartitionKeys().get(0); + for (String partitionName : partitionNameByIds.values()) { + checkDataInPaimonAppendOnlyPartitionedTable( + partitionedTablePath, + Collections.singletonMap(partitionCol, partitionName), + writtenRowsByPartition.get(partitionName), + 0); + } + + properties = + new HashMap() { + { + put( + FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY, + getPartitionOffsetStr(partitionNameByIds)); + } + }; + checkSnapshotPropertyInPaimon(partitionedTablePath, properties); + } finally { + jobClient.cancel().get(); + } + } + + private String getPartitionOffsetStr(Map partitionNameByIds) { + String raw = + "{\"partition_id\":%s,\"bucket_id\":0,\"partition_name\":\"date=%s\",\"log_offset\":3}"; + List partitionIds = new ArrayList<>(partitionNameByIds.keySet()); + Collections.sort(partitionIds); + List partitionOffsetStrs = new ArrayList<>(); + + for (Long partitionId : partitionIds) { + String partitionName = partitionNameByIds.get(partitionId); + String partitionOffsetStr = String.format(raw, partitionId, partitionName); + partitionOffsetStrs.add(partitionOffsetStr); + } + + return "[" + String.join(",", partitionOffsetStrs) + "]"; + } + @Test void testTieringToDvEnabledTable() throws Exception { TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv"); @@ -214,6 +356,15 @@ void testTieringToDvEnabledTable() throws Exception { private Tuple2 createPartitionedTable(TablePath partitionedTablePath) throws Exception { + return createPartitionedTable( + partitionedTablePath, Collections.emptyMap(), Collections.emptyMap()); + } + + private Tuple2 createPartitionedTable( + TablePath partitionedTablePath, + Map properties, + Map customProperties) + throws Exception { TableDescriptor partitionedTableDescriptor = TableDescriptor.builder() .schema( @@ -229,6 +380,8 @@ private Tuple2 createPartitionedTable(TablePath partition AutoPartitionTimeUnit.YEAR) .property(ConfigOptions.TABLE_DATALAKE_ENABLED, true) .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500)) + .properties(properties) + .customProperties(customProperties) .build(); return Tuple2.of( createTable(partitionedTablePath, partitionedTableDescriptor), diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 8b0ffba40b..5e0c5dc2a6 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -90,6 +90,7 @@ import org.apache.fluss.server.coordinator.event.EventManager; import org.apache.fluss.server.entity.CommitKvSnapshotData; import org.apache.fluss.server.entity.LakeTieringTableInfo; +import org.apache.fluss.server.entity.TablePropertyChanges; import org.apache.fluss.server.kv.snapshot.CompletedSnapshot; import org.apache.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde; import org.apache.fluss.server.metadata.CoordinatorMetadataCache; @@ -297,10 +298,16 @@ public CompletableFuture alterTableProperties( authorizer.authorize(currentSession(), OperationType.ALTER, Resource.table(tablePath)); } + TablePropertyChanges tablePropertyChanges = + toTablePropertyChanges(request.getConfigChangesList()); + metadataManager.alterTableProperties( tablePath, - toTablePropertyChanges(request.getConfigChangesList()), - request.isIgnoreIfNotExists()); + tablePropertyChanges, + request.isIgnoreIfNotExists(), + lakeCatalog, + dataLakeFormat, + lakeTableTieringManager); return CompletableFuture.completedFuture(new AlterTablePropertiesResponse()); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java index f65814c659..2056068a51 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java @@ -23,7 +23,9 @@ import org.apache.fluss.exception.DatabaseNotEmptyException; import org.apache.fluss.exception.DatabaseNotExistException; import org.apache.fluss.exception.FlussRuntimeException; +import org.apache.fluss.exception.InvalidAlterTableException; import org.apache.fluss.exception.InvalidPartitionException; +import org.apache.fluss.exception.LakeTableAlreadyExistException; import org.apache.fluss.exception.PartitionAlreadyExistsException; import org.apache.fluss.exception.PartitionNotExistException; import org.apache.fluss.exception.SchemaNotExistException; @@ -32,6 +34,8 @@ import org.apache.fluss.exception.TableNotPartitionedException; import org.apache.fluss.exception.TooManyBucketsException; import org.apache.fluss.exception.TooManyPartitionsException; +import org.apache.fluss.lake.lakestorage.LakeCatalog; +import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.DatabaseInfo; import org.apache.fluss.metadata.ResolvedPartitionSpec; @@ -307,7 +311,10 @@ public long createTable( public void alterTableProperties( TablePath tablePath, TablePropertyChanges tablePropertyChanges, - boolean ignoreIfNotExists) { + boolean ignoreIfNotExists, + @Nullable LakeCatalog lakeCatalog, + @Nullable DataLakeFormat dataLakeFormat, + LakeTableTieringManager lakeTableTieringManager) { try { // it throws TableNotExistException if the table or database not exists TableRegistration tableReg = getTableRegistration(tablePath); @@ -328,12 +335,26 @@ public void alterTableProperties( if (newDescriptor != null) { // reuse the same validate logic with the createTable() method - validateTableDescriptor(tableDescriptor, maxBucketNum); + validateTableDescriptor(newDescriptor, maxBucketNum); + + // pre alter table properties, e.g. create lake table in lake storage if it's to + // enable datalake for the table + preAlterTableProperties( + tablePath, tableDescriptor, newDescriptor, lakeCatalog, dataLakeFormat); // update the table to zk TableRegistration updatedTableRegistration = tableReg.newProperties( newDescriptor.getProperties(), newDescriptor.getCustomProperties()); zookeeperClient.updateTable(tablePath, updatedTableRegistration); + + // post alter table properties, e.g. add the table to lake table tiering manager if + // it's to enable datalake for the table + postAlterTableProperties( + tablePath, + schemaInfo, + tableDescriptor, + updatedTableRegistration, + lakeTableTieringManager); } else { LOG.info( "No properties changed when alter table {}, skip update table.", tablePath); @@ -352,6 +373,66 @@ public void alterTableProperties( } } + private void preAlterTableProperties( + TablePath tablePath, + TableDescriptor tableDescriptor, + TableDescriptor newDescriptor, + LakeCatalog lakeCatalog, + DataLakeFormat dataLakeFormat) { + + boolean toEnableDataLake = + !isDataLakeEnabled(tableDescriptor) && isDataLakeEnabled(newDescriptor); + + // enable lake table + if (toEnableDataLake) { + // TODO: should tolerate if the lake exist but matches our schema. This ensures + // eventually + // consistent by idempotently creating the table multiple times. See #846 + // before create table in fluss, we may create in lake + if (lakeCatalog == null) { + throw new InvalidAlterTableException( + "Cannot alter table " + + tablePath + + " to enable data lake, because the Fluss cluster doesn't enable datalake tables."); + } else { + try { + lakeCatalog.createTable(tablePath, newDescriptor); + } catch (TableAlreadyExistException e) { + throw new LakeTableAlreadyExistException( + String.format( + "The table %s already exists in %s catalog, please " + + "first drop the table in %s catalog or use a new table name.", + tablePath, dataLakeFormat, dataLakeFormat)); + } + } + } + // more pre-alter actions can be added here + } + + private void postAlterTableProperties( + TablePath tablePath, + SchemaInfo schemaInfo, + TableDescriptor oldTableDescriptor, + TableRegistration newTableRegistration, + LakeTableTieringManager lakeTableTieringManager) { + + boolean toEnableDataLake = + !isDataLakeEnabled(oldTableDescriptor) + && isDataLakeEnabled(newTableRegistration.properties); + boolean toDisableDataLake = + isDataLakeEnabled(oldTableDescriptor) + && !isDataLakeEnabled(newTableRegistration.properties); + + if (toEnableDataLake) { + TableInfo newTableInfo = newTableRegistration.toTableInfo(tablePath, schemaInfo); + // if the table is lake table, we need to add it to lake table tiering manager + lakeTableTieringManager.addNewLakeTable(newTableInfo); + } else if (toDisableDataLake) { + lakeTableTieringManager.removeLakeTable(newTableRegistration.tableId); + } + // more post-alter actions can be added here + } + /** * Get a new TableDescriptor with updated properties. * @@ -387,6 +468,17 @@ public void alterTableProperties( } } + private boolean isDataLakeEnabled(TableDescriptor tableDescriptor) { + String dataLakeEnabledValue = + tableDescriptor.getProperties().get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); + return Boolean.parseBoolean(dataLakeEnabledValue); + } + + private boolean isDataLakeEnabled(Map properties) { + String dataLakeEnabledValue = properties.get(ConfigOptions.TABLE_DATALAKE_ENABLED.key()); + return Boolean.parseBoolean(dataLakeEnabledValue); + } + public TableInfo getTable(TablePath tablePath) throws TableNotExistException { Optional optionalTable; try { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java index 8b56166d68..76e8adc823 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java @@ -47,6 +47,7 @@ import java.util.stream.Collectors; import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS; +import static org.apache.fluss.config.FlussConfigUtils.isAlterableTableOption; import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig; import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; @@ -108,7 +109,7 @@ public static void validateAlterTableProperties( TableInfo currentTable, Set tableKeysToChange, Set customKeysToChange) { tableKeysToChange.forEach( k -> { - if (isTableStorageConfig(k)) { + if (isTableStorageConfig(k) && !isAlterableTableOption(k)) { throw new InvalidAlterTableException( "The option '" + k + "' is not supported to alter yet."); }