Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,23 @@ public class FlussConfigUtils {
public static final String CLIENT_PREFIX = "client.";
public static final String CLIENT_SECURITY_PREFIX = "client.security.";

public static final List<String> ALTERABLE_TABLE_CONFIG;
public static final List<String> ALTERABLE_TABLE_OPTIONS;

static {
TABLE_OPTIONS = extractConfigOptions("table.");
CLIENT_OPTIONS = extractConfigOptions("client.");
ALTERABLE_TABLE_CONFIG = Collections.emptyList();
ALTERABLE_TABLE_OPTIONS =
Collections.singletonList(ConfigOptions.TABLE_DATALAKE_ENABLED.key());
}

public static boolean isTableStorageConfig(String key) {
return key.startsWith(TABLE_PREFIX);
}

public static boolean isAlterableTableOption(String key) {
return ALTERABLE_TABLE_OPTIONS.contains(key);
}

@VisibleForTesting
static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
Map<String, ConfigOption<?>> options = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.testutils.FlussClusterExtension;
Expand All @@ -48,7 +49,9 @@

import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

Expand Down Expand Up @@ -364,6 +367,70 @@ void testCreateLakeEnabledTableWithAllTypes() throws Exception {
BUCKET_NUM);
}

@Test
void testAlterLakeEnabledLogTable() throws Exception {
Map<String, String> customProperties = new HashMap<>();
customProperties.put("k1", "v1");
customProperties.put("paimon.file.format", "parquet");

// log table with lake disabled
TableDescriptor logTable =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("log_c1", DataTypes.INT())
.column("log_c2", DataTypes.STRING())
.build())
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, false)
.customProperties(customProperties)
.distributedBy(BUCKET_NUM, "log_c1", "log_c2")
.build();
TablePath logTablePath = TablePath.of(DATABASE, "log_table_alter");
admin.createTable(logTablePath, logTable, false).get();

assertThatThrownBy(
() ->
paimonCatalog.getTable(
Identifier.create(DATABASE, logTablePath.getTableName())))
.isInstanceOf(Catalog.TableNotExistException.class);

// enable lake
TableChange.SetOption enableLake =
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
List<TableChange> changes = Collections.singletonList(enableLake);

admin.alterTable(logTablePath, changes, false).get();

Table enabledPaimonLogTable =
paimonCatalog.getTable(Identifier.create(DATABASE, logTablePath.getTableName()));

Map<String, String> updatedProperties = new HashMap<>();
updatedProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
TableDescriptor updatedLogTable = logTable.withProperties(updatedProperties);
// check the gotten log table
verifyPaimonTable(
enabledPaimonLogTable,
updatedLogTable,
RowType.of(
new DataType[] {
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.STRING(),
// for __bucket, __offset, __timestamp
org.apache.paimon.types.DataTypes.INT(),
org.apache.paimon.types.DataTypes.BIGINT(),
org.apache.paimon.types.DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()
},
new String[] {
"log_c1",
"log_c2",
BUCKET_COLUMN_NAME,
OFFSET_COLUMN_NAME,
TIMESTAMP_COLUMN_NAME
}),
"log_c1,log_c2",
BUCKET_NUM);
}

@Test
void testThrowExceptionWhenConflictWithSystemColumn() {
for (String systemColumn :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,16 @@ protected long createLogTable(TablePath tablePath) throws Exception {
}

protected long createLogTable(TablePath tablePath, int bucketNum) throws Exception {
return createLogTable(tablePath, bucketNum, false);
return createLogTable(
tablePath, bucketNum, false, Collections.emptyMap(), Collections.emptyMap());
}

protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPartitioned)
protected long createLogTable(
TablePath tablePath,
int bucketNum,
boolean isPartitioned,
Map<String, String> properties,
Map<String, String> customProperties)
throws Exception {
Schema.Builder schemaBuilder =
Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.STRING());
Expand All @@ -277,6 +283,8 @@ protected long createLogTable(TablePath tablePath, int bucketNum, boolean isPart
tableBuilder.property(
ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR);
}
tableBuilder.properties(properties);
tableBuilder.customProperties(customProperties);
tableBuilder.schema(schemaBuilder.build());
return createTable(tablePath, tableBuilder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
import org.apache.fluss.metadata.Schema;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.row.InternalRow;
Expand Down Expand Up @@ -175,10 +176,7 @@ void testTiering() throws Exception {
{
put(
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
"["
+ "{\"partition_id\":0,\"bucket_id\":0,\"partition_name\":\"date=2025\",\"log_offset\":3},"
+ "{\"partition_id\":1,\"bucket_id\":0,\"partition_name\":\"date=2026\",\"log_offset\":3}"
+ "]");
getPartitionOffsetStr(partitionNameByIds));
}
};
checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
Expand All @@ -187,6 +185,150 @@ void testTiering() throws Exception {
}
}

@Test
void testTieringForAlterTable() throws Exception {
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableAlter");
Map<String, String> tableProperties = new HashMap<>();
tableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false");

long t1Id = createPkTable(t1, 1, tableProperties, Collections.emptyMap());

TableChange.SetOption setOption =
TableChange.set(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true");
List<TableChange> changes = Collections.singletonList(setOption);
admin.alterTable(t1, changes, false).get();

TableBucket t1Bucket = new TableBucket(t1Id, 0);

// write records
List<InternalRow> rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
writeRows(t1, rows, false);
waitUntilSnapshot(t1Id, 1, 0);

// then start tiering job
JobClient jobClient = buildTieringJob(execEnv);

try {
// check the status of replica after synced
assertReplicaStatus(t1Bucket, 3);
// check data in paimon
checkDataInPaimonPrimaryKeyTable(t1, rows);
// check snapshot property in paimon
Map<String, String> properties =
new HashMap<String, String>() {
{
put(
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
"[{\"bucket_id\":0,\"log_offset\":3}]");
}
};
checkSnapshotPropertyInPaimon(t1, properties);

// then, create another log table
TablePath t2 = TablePath.of(DEFAULT_DB, "logTableAlter");

Map<String, String> logTableProperties = new HashMap<>();
logTableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false");
long t2Id = createLogTable(t2, 1, false, logTableProperties, Collections.emptyMap());
// enable lake
admin.alterTable(t2, changes, false).get();

TableBucket t2Bucket = new TableBucket(t2Id, 0);
List<InternalRow> flussRows = new ArrayList<>();
// write records
for (int i = 0; i < 10; i++) {
rows = Arrays.asList(row(1, "v1"), row(2, "v2"), row(3, "v3"));
flussRows.addAll(rows);
// write records
writeRows(t2, rows, true);
}
// check the status of replica after synced;
// note: we can't update log start offset for unaware bucket mode log table
assertReplicaStatus(t2Bucket, 30);

// check data in paimon
checkDataInPaimonAppendOnlyTable(t2, flussRows, 0);

// then write data to the pk tables
// write records
rows = Arrays.asList(row(1, "v111"), row(2, "v222"), row(3, "v333"));
// write records
writeRows(t1, rows, false);

// check the status of replica of t2 after synced
// not check start offset since we won't
// update start log offset for primary key table
assertReplicaStatus(t1Bucket, 9);

checkDataInPaimonPrimaryKeyTable(t1, rows);

// then create partitioned table and wait partitions are ready
TablePath partitionedTablePath = TablePath.of(DEFAULT_DB, "partitionedTableAlter");
Map<String, String> partitionTableProperties = new HashMap<>();
partitionTableProperties.put(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false");

Tuple2<Long, TableDescriptor> tableIdAndDescriptor =
createPartitionedTable(
partitionedTablePath, partitionTableProperties, Collections.emptyMap());

admin.alterTable(partitionedTablePath, changes, false).get();

Map<Long, String> partitionNameByIds = waitUntilPartitions(partitionedTablePath);

// now, write rows into partitioned table
TableDescriptor partitionedTableDescriptor = tableIdAndDescriptor.f1;
Map<String, List<InternalRow>> writtenRowsByPartition =
writeRowsIntoPartitionedTable(
partitionedTablePath, partitionedTableDescriptor, partitionNameByIds);
long tableId = tableIdAndDescriptor.f0;

// wait until synced to paimon
for (Long partitionId : partitionNameByIds.keySet()) {
TableBucket tableBucket = new TableBucket(tableId, partitionId, 0);
assertReplicaStatus(tableBucket, 3);
}

// now, let's check data in paimon per partition
// check data in paimon
String partitionCol = partitionedTableDescriptor.getPartitionKeys().get(0);
for (String partitionName : partitionNameByIds.values()) {
checkDataInPaimonAppendOnlyPartitionedTable(
partitionedTablePath,
Collections.singletonMap(partitionCol, partitionName),
writtenRowsByPartition.get(partitionName),
0);
}

properties =
new HashMap<String, String>() {
{
put(
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
getPartitionOffsetStr(partitionNameByIds));
}
};
checkSnapshotPropertyInPaimon(partitionedTablePath, properties);
} finally {
jobClient.cancel().get();
}
}

private String getPartitionOffsetStr(Map<Long, String> partitionNameByIds) {
String raw =
"{\"partition_id\":%s,\"bucket_id\":0,\"partition_name\":\"date=%s\",\"log_offset\":3}";
List<Long> partitionIds = new ArrayList<>(partitionNameByIds.keySet());
Collections.sort(partitionIds);
List<String> partitionOffsetStrs = new ArrayList<>();

for (Long partitionId : partitionIds) {
String partitionName = partitionNameByIds.get(partitionId);
String partitionOffsetStr = String.format(raw, partitionId, partitionName);
partitionOffsetStrs.add(partitionOffsetStr);
}

return "[" + String.join(",", partitionOffsetStrs) + "]";
}

@Test
void testTieringToDvEnabledTable() throws Exception {
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableWithDv");
Expand Down Expand Up @@ -214,6 +356,15 @@ void testTieringToDvEnabledTable() throws Exception {

private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath partitionedTablePath)
throws Exception {
return createPartitionedTable(
partitionedTablePath, Collections.emptyMap(), Collections.emptyMap());
}

private Tuple2<Long, TableDescriptor> createPartitionedTable(
TablePath partitionedTablePath,
Map<String, String> properties,
Map<String, String> customProperties)
throws Exception {
TableDescriptor partitionedTableDescriptor =
TableDescriptor.builder()
.schema(
Expand All @@ -229,6 +380,8 @@ private Tuple2<Long, TableDescriptor> createPartitionedTable(TablePath partition
AutoPartitionTimeUnit.YEAR)
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500))
.properties(properties)
.customProperties(customProperties)
.build();
return Tuple2.of(
createTable(partitionedTablePath, partitionedTableDescriptor),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import org.apache.fluss.server.coordinator.event.EventManager;
import org.apache.fluss.server.entity.CommitKvSnapshotData;
import org.apache.fluss.server.entity.LakeTieringTableInfo;
import org.apache.fluss.server.entity.TablePropertyChanges;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshot;
import org.apache.fluss.server.kv.snapshot.CompletedSnapshotJsonSerde;
import org.apache.fluss.server.metadata.CoordinatorMetadataCache;
Expand Down Expand Up @@ -297,10 +298,16 @@ public CompletableFuture<AlterTablePropertiesResponse> alterTableProperties(
authorizer.authorize(currentSession(), OperationType.ALTER, Resource.table(tablePath));
}

TablePropertyChanges tablePropertyChanges =
toTablePropertyChanges(request.getConfigChangesList());

metadataManager.alterTableProperties(
tablePath,
toTablePropertyChanges(request.getConfigChangesList()),
request.isIgnoreIfNotExists());
tablePropertyChanges,
request.isIgnoreIfNotExists(),
lakeCatalog,
dataLakeFormat,
lakeTableTieringManager);

return CompletableFuture.completedFuture(new AlterTablePropertiesResponse());
}
Expand Down
Loading