Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -940,4 +940,35 @@ public void testBucketLimitForNonPartitionedTable() throws Exception {
.isInstanceOf(TooManyBucketsException.class)
.hasMessageContaining("exceeds the maximum limit");
}

/** Test that creating a table with system columns throws InvalidTableException. */
@Test
public void testSystemsColumns() throws Exception {
String dbName = DEFAULT_TABLE_PATH.getDatabaseName();

TableDescriptor tableDescriptor =
TableDescriptor.builder()
.schema(
Schema.newBuilder()
.column("f0", DataTypes.STRING())
.column("f1", DataTypes.BIGINT())
.column("f3", DataTypes.STRING())
.column("__offset", DataTypes.STRING())
.column("__timestamp", DataTypes.STRING())
.column("__bucket", DataTypes.STRING())
.build())
.build();

TablePath tablePath = TablePath.of(dbName, "test_system_columns");

// Creating this table should throw InvalidTableException
assertThatThrownBy(() -> admin.createTable(tablePath, tableDescriptor, false).get())
.cause()
.isInstanceOf(InvalidTableException.class)
.hasMessageContaining(
"__offset, __timestamp, __bucket cannot be used as column names, "
+ "because they are reserved system columns in Fluss. "
+ "Please use other names for these columns. "
+ "The reserved system columns are: __offset, __timestamp, __bucket");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@
public final class TableDescriptor implements Serializable {
private static final long serialVersionUID = 1L;

public static final String OFFSET_COLUMN_NAME = "__offset";
public static final String TIMESTAMP_COLUMN_NAME = "__timestamp";
public static final String BUCKET_COLUMN_NAME = "__bucket";

private final Schema schema;
private final @Nullable String comment;
private final List<String> partitionKeys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@
import java.util.List;
import java.util.Map;

import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;

/** A Paimon implementation of {@link LakeCatalog}. */
public class PaimonLakeCatalog implements LakeCatalog {

public static final String OFFSET_COLUMN_NAME = "__offset";
public static final String TIMESTAMP_COLUMN_NAME = "__timestamp";
public static final String BUCKET_COLUMN_NAME = "__bucket";

private final Catalog paimonCatalog;

public PaimonLakeCatalog(Configuration configuration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@
import java.util.HashMap;
import java.util.Map;

import static com.alibaba.fluss.lake.paimon.PaimonLakeCatalog.BUCKET_COLUMN_NAME;
import static com.alibaba.fluss.lake.paimon.PaimonLakeCatalog.OFFSET_COLUMN_NAME;
import static com.alibaba.fluss.lake.paimon.PaimonLakeCatalog.TIMESTAMP_COLUMN_NAME;
import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
import static com.alibaba.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,32 @@
import com.alibaba.fluss.types.RowType;
import com.alibaba.fluss.utils.AutoPartitionStrategy;

import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.alibaba.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static com.alibaba.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
import static com.alibaba.fluss.utils.PartitionUtils.PARTITION_KEY_SUPPORTED_TYPES;

/** Validator of {@link TableDescriptor}. */
public class TableDescriptorValidation {

private static final Set<String> SYSTEM_COLUMNS =
Collections.unmodifiableSet(
new LinkedHashSet<>(
Arrays.asList(
OFFSET_COLUMN_NAME,
TIMESTAMP_COLUMN_NAME,
BUCKET_COLUMN_NAME)));

/** Validate table descriptor to create is valid and contain all necessary information. */
public static void validateTableDescriptor(TableDescriptor tableDescriptor, int maxBucketNum) {
boolean hasPrimaryKey = tableDescriptor.getSchema().getPrimaryKey().isPresent();
Expand Down Expand Up @@ -72,6 +88,23 @@ public static void validateTableDescriptor(TableDescriptor tableDescriptor, int
checkMergeEngine(tableConf, hasPrimaryKey, schema);
checkTieredLog(tableConf);
checkPartition(tableConf, tableDescriptor.getPartitionKeys(), schema);
checkSystemColumns(schema);
}

private static void checkSystemColumns(RowType schema) {
List<String> fieldNames = schema.getFieldNames();
List<String> unsupportedColumns =
fieldNames.stream().filter(SYSTEM_COLUMNS::contains).collect(Collectors.toList());
if (!unsupportedColumns.isEmpty()) {
throw new InvalidTableException(
String.format(
"%s cannot be used as column names, "
+ "because they are reserved system columns in Fluss. "
+ "Please use other names for these columns. "
+ "The reserved system columns are: %s",
String.join(", ", unsupportedColumns),
String.join(", ", SYSTEM_COLUMNS)));
}
}

private static void checkDistribution(TableDescriptor tableDescriptor, int maxBucketNum) {
Expand Down