diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java index 01780ae569..39cd747709 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java @@ -940,4 +940,35 @@ public void testBucketLimitForNonPartitionedTable() throws Exception { .isInstanceOf(TooManyBucketsException.class) .hasMessageContaining("exceeds the maximum limit"); } + + /** Test that creating a table with system columns throws InvalidTableException. */ + @Test + public void testSystemsColumns() throws Exception { + String dbName = DEFAULT_TABLE_PATH.getDatabaseName(); + + TableDescriptor tableDescriptor = + TableDescriptor.builder() + .schema( + Schema.newBuilder() + .column("f0", DataTypes.STRING()) + .column("f1", DataTypes.BIGINT()) + .column("f3", DataTypes.STRING()) + .column("__offset", DataTypes.STRING()) + .column("__timestamp", DataTypes.STRING()) + .column("__bucket", DataTypes.STRING()) + .build()) + .build(); + + TablePath tablePath = TablePath.of(dbName, "test_system_columns"); + + // Creating this table should throw InvalidTableException + assertThatThrownBy(() -> admin.createTable(tablePath, tableDescriptor, false).get()) + .cause() + .isInstanceOf(InvalidTableException.class) + .hasMessageContaining( + "__offset, __timestamp, __bucket cannot be used as column names, " + + "because they are reserved system columns in Fluss. " + + "Please use other names for these columns. " + + "The reserved system columns are: __offset, __timestamp, __bucket"); + } } diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java index 96a6d5959d..d5af1a7ae0 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableDescriptor.java @@ -55,6 +55,10 @@ public final class TableDescriptor implements Serializable { private static final long serialVersionUID = 1L; + public static final String OFFSET_COLUMN_NAME = "__offset"; + public static final String TIMESTAMP_COLUMN_NAME = "__timestamp"; + public static final String BUCKET_COLUMN_NAME = "__bucket"; + private final Schema schema; private final @Nullable String comment; private final List partitionKeys; diff --git a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeCatalog.java b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeCatalog.java index 6b1d8f0ad6..d2a39e2627 100644 --- a/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeCatalog.java +++ b/fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeCatalog.java @@ -37,13 +37,13 @@ import java.util.List; import java.util.Map; +import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; + /** A Paimon implementation of {@link LakeCatalog}. */ public class PaimonLakeCatalog implements LakeCatalog { - public static final String OFFSET_COLUMN_NAME = "__offset"; - public static final String TIMESTAMP_COLUMN_NAME = "__timestamp"; - public static final String BUCKET_COLUMN_NAME = "__bucket"; - private final Catalog paimonCatalog; public PaimonLakeCatalog(Configuration configuration) { diff --git a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java index 76b6302513..d62dde7898 100644 --- a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java +++ b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java @@ -50,9 +50,9 @@ import java.util.HashMap; import java.util.Map; -import static com.alibaba.fluss.lake.paimon.PaimonLakeCatalog.BUCKET_COLUMN_NAME; -import static com.alibaba.fluss.lake.paimon.PaimonLakeCatalog.OFFSET_COLUMN_NAME; -import static com.alibaba.fluss.lake.paimon.PaimonLakeCatalog.TIMESTAMP_COLUMN_NAME; +import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; import static com.alibaba.fluss.server.utils.LakeStorageUtils.extractLakeProperties; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/TableDescriptorValidation.java b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/TableDescriptorValidation.java index ef36922f9f..b92c5f6c6f 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/TableDescriptorValidation.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/TableDescriptorValidation.java @@ -32,16 +32,32 @@ import com.alibaba.fluss.types.RowType; import com.alibaba.fluss.utils.AutoPartitionStrategy; +import java.util.Arrays; +import java.util.Collections; import java.util.EnumSet; +import java.util.LinkedHashSet; import java.util.List; import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import static com.alibaba.fluss.config.FlussConfigUtils.TABLE_OPTIONS; +import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; import static com.alibaba.fluss.utils.PartitionUtils.PARTITION_KEY_SUPPORTED_TYPES; /** Validator of {@link TableDescriptor}. */ public class TableDescriptorValidation { + private static final Set SYSTEM_COLUMNS = + Collections.unmodifiableSet( + new LinkedHashSet<>( + Arrays.asList( + OFFSET_COLUMN_NAME, + TIMESTAMP_COLUMN_NAME, + BUCKET_COLUMN_NAME))); + /** Validate table descriptor to create is valid and contain all necessary information. */ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int maxBucketNum) { boolean hasPrimaryKey = tableDescriptor.getSchema().getPrimaryKey().isPresent(); @@ -72,6 +88,23 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int checkMergeEngine(tableConf, hasPrimaryKey, schema); checkTieredLog(tableConf); checkPartition(tableConf, tableDescriptor.getPartitionKeys(), schema); + checkSystemColumns(schema); + } + + private static void checkSystemColumns(RowType schema) { + List fieldNames = schema.getFieldNames(); + List unsupportedColumns = + fieldNames.stream().filter(SYSTEM_COLUMNS::contains).collect(Collectors.toList()); + if (!unsupportedColumns.isEmpty()) { + throw new InvalidTableException( + String.format( + "%s cannot be used as column names, " + + "because they are reserved system columns in Fluss. " + + "Please use other names for these columns. " + + "The reserved system columns are: %s", + String.join(", ", unsupportedColumns), + String.join(", ", SYSTEM_COLUMNS))); + } } private static void checkDistribution(TableDescriptor tableDescriptor, int maxBucketNum) {