Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,46 @@ public abstract class Column implements Serializable {

private static final long serialVersionUID = 1L;

protected static final String FIELD_FORMAT_WITH_DESCRIPTION = "%s %s '%s'";
protected static final String FIELD_FORMAT_WITH_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION =
"%s %s '%s'";

protected static final String FIELD_FORMAT_NO_DESCRIPTION = "%s %s";
protected static final String FIELD_FORMAT_NO_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION =
"%s %s '%s'";

protected static final String FIELD_FORMAT_WITH_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION =
"%s %s '%s' '%s'";

protected static final String FIELD_FORMAT_NO_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION = "%s %s";

protected final String name;

protected final DataType type;

protected final @Nullable String comment;

/**
* Save the literal value of the column's default value, For uncertain functions such as UUID(),
* the value is null, For the current time function such as CURRENT_TIMESTAMP(), the value is
* Unix Epoch time(1970-01-01 00:00:00).
*/
protected final @Nullable String defaultValueExpression;

protected Column(String name, DataType type, @Nullable String comment) {
this.name = name;
this.type = type;
this.comment = comment;
this.defaultValueExpression = null;
}

protected Column(
String name,
DataType type,
@Nullable String comment,
@Nullable String defaultValueExpression) {
this.name = name;
this.type = type;
this.comment = comment;
this.defaultValueExpression = defaultValueExpression;
}

/** Returns the name of this column. */
Expand All @@ -69,17 +95,41 @@ public String getComment() {
return comment;
}

@Nullable
public String getDefaultValueExpression() {
return defaultValueExpression;
}

/** Returns a string that summarizes this column for printing to a console. */
public String asSummaryString() {
if (comment == null) {
return String.format(
FIELD_FORMAT_NO_DESCRIPTION, escapeIdentifier(name), type.asSummaryString());
if (defaultValueExpression == null) {
return String.format(
FIELD_FORMAT_NO_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION,
escapeIdentifier(name),
type.asSummaryString());
} else {
return String.format(
FIELD_FORMAT_NO_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION,
escapeIdentifier(name),
type.asSummaryString(),
defaultValueExpression);
}
} else {
return String.format(
FIELD_FORMAT_WITH_DESCRIPTION,
escapeIdentifier(name),
type.asSummaryString(),
escapeSingleQuotes(comment));
if (defaultValueExpression == null) {
return String.format(
FIELD_FORMAT_WITH_DESCRIPTION_NO_DEFAULT_VALUE_EXPRESSION,
escapeIdentifier(name),
type.asSummaryString(),
escapeSingleQuotes(comment));
} else {
return String.format(
FIELD_FORMAT_WITH_DESCRIPTION_WITH_DEFAULT_VALUE_EXPRESSION,
escapeIdentifier(name),
type.asSummaryString(),
escapeSingleQuotes(comment),
defaultValueExpression);
}
}
}

Expand All @@ -103,19 +153,29 @@ public boolean equals(Object o) {
Column column = (Column) o;
return name.equals(column.name)
&& type.equals(column.type)
&& Objects.equals(comment, column.comment);
&& Objects.equals(comment, column.comment)
&& Objects.equals(defaultValueExpression, column.defaultValueExpression);
}

@Override
public int hashCode() {
return Objects.hash(name, type, comment);
return Objects.hash(name, type, comment, defaultValueExpression);
}

@Override
public String toString() {
return asSummaryString();
}

/** Creates a physical column. */
public static PhysicalColumn physicalColumn(
String name,
DataType type,
@Nullable String comment,
@Nullable String defaultValueExpression) {
return new PhysicalColumn(name, type, comment, defaultValueExpression);
}

/** Creates a physical column. */
public static PhysicalColumn physicalColumn(
String name, DataType type, @Nullable String comment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,23 @@ public PhysicalColumn(String name, DataType type, @Nullable String comment) {
super(name, type, comment);
}

public PhysicalColumn(
String name, DataType type, @Nullable String comment, @Nullable String defaultValue) {
super(name, type, comment, defaultValue);
}

@Override
public boolean isPhysical() {
return true;
}

@Override
public Column copy(DataType newType) {
return new PhysicalColumn(name, newType, comment);
return new PhysicalColumn(name, newType, comment, defaultValueExpression);
}

@Override
public Column copy(String newName) {
return new PhysicalColumn(newName, type, comment);
return new PhysicalColumn(newName, type, comment, defaultValueExpression);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,21 @@ public Builder physicalColumn(String columnName, DataType type, String comment)
return this;
}

/**
* Declares a physical column that is appended to this schema.
*
* @param columnName column name
* @param type data type of the column
* @param comment description of the column
* @param defaultValue default value of the column
*/
public Builder physicalColumn(
String columnName, DataType type, String comment, String defaultValue) {
checkColumn(columnName, type);
columns.add(Column.physicalColumn(columnName, type, comment, defaultValue));
return this;
}

/**
* Declares a metadata column that is appended to this schema.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ private Map<String, FieldSchema> buildFields(Schema schema) {
}
fieldSchemaMap.put(
column.getName(),
new FieldSchema(column.getName(), typeString, column.getComment()));
new FieldSchema(
column.getName(),
typeString,
column.getDefaultValueExpression(),
column.getComment()));
}
return fieldSchemaMap;
}
Expand Down Expand Up @@ -170,6 +174,7 @@ private void applyAddColumnEvent(AddColumnEvent event)
new FieldSchema(
column.getName(),
buildTypeString(column.getType()),
column.getDefaultValueExpression(),
column.getComment());
schemaChangeManager.addColumn(
tableId.getSchemaName(), tableId.getTableName(), addFieldSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,10 @@ public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx)

private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column dbzColumn) {
return org.apache.flink.cdc.common.schema.Column.physicalColumn(
dbzColumn.name(), fromDbzColumn(dbzColumn), dbzColumn.comment());
dbzColumn.name(),
fromDbzColumn(dbzColumn),
dbzColumn.comment(),
dbzColumn.defaultValueExpression().orElse(null));
}

private org.apache.flink.cdc.common.event.TableId toCdcTableId(TableId dbzTableId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ private Schema parseDDL(String ddlStatement, TableId tableId) {
if (!column.isOptional()) {
dataType = dataType.notNull();
}
tableBuilder.physicalColumn(colName, dataType, column.comment());
tableBuilder.physicalColumn(
colName,
dataType,
column.comment(),
column.defaultValueExpression().orElse(null));
}

List<String> primaryKey = table.primaryKeyColumnNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
tableId,
Schema.newBuilder()
.physicalColumn("id", DataTypes.INT().notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull())
.physicalColumn("name", DataTypes.VARCHAR(255).notNull(), null, "flink")
.physicalColumn("description", DataTypes.VARCHAR(512))
.physicalColumn("weight", DataTypes.FLOAT())
.primaryKey(Collections.singletonList("id"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.ArrayList;
import java.util.List;

import static org.apache.flink.cdc.connectors.starrocks.sink.StarRocksUtils.toStarRocksDataType;

/** A {@code MetadataApplier} that applies metadata changes to StarRocks. */
public class StarRocksMetadataApplier implements MetadataApplier {

Expand Down Expand Up @@ -117,8 +119,9 @@ private void applyAddColumn(AddColumnEvent addColumnEvent) {
new StarRocksColumn.Builder()
.setColumnName(column.getName())
.setOrdinalPosition(-1)
.setColumnComment(column.getComment());
StarRocksUtils.toStarRocksDataType(column, false, builder);
.setColumnComment(column.getComment())
.setDefaultValue(column.getDefaultValueExpression());
toStarRocksDataType(column, false, builder);
addColumns.add(builder.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ public static StarRocksTable toStarRocksTable(
new StarRocksColumn.Builder()
.setColumnName(column.getName())
.setOrdinalPosition(i)
.setColumnComment(column.getComment());
.setColumnComment(column.getComment())
.setDefaultValue(column.getDefaultValueExpression());
toStarRocksDataType(column, i < primaryKeyCount, builder);
starRocksColumns.add(builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,11 @@ private void registerNewSchema(TableId tableId, Schema newSchema) {
/** Serializer for {@link SchemaManager}. */
public static class Serializer implements SimpleVersionedSerializer<SchemaManager> {

public static final int CURRENT_VERSION = 1;
/**
* Update history: from Version 3.0.0, set to 0, from version 3.1.1, updated to 1, from
* version 3.2.0, updated to 2.
*/
public static final int CURRENT_VERSION = 2;

@Override
public int getVersion() {
Expand Down Expand Up @@ -214,6 +218,7 @@ public SchemaManager deserialize(int version, byte[] serialized) throws IOExcept
switch (version) {
case 0:
case 1:
case 2:
TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE;
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData
break;
}
case 1:
case 2:
{
int length = in.readInt();
byte[] serializedSchemaManager = new byte[length];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,13 @@ public static TableChangeInfo of(
/** Serializer for {@link TableChangeInfo}. */
public static class Serializer implements SimpleVersionedSerializer<TableChangeInfo> {

public static final int CURRENT_VERSION = 1;
/** The latest version before change of state compatibility. */
public static final int VERSION_BEFORE_STATE_COMPATIBILITY = 1;

public static final int CURRENT_VERSION = 2;

/** Used to distinguish with the state which CURRENT_VERSION was not written. */
public static final TableId MAGIC_TABLE_ID = TableId.tableId("__magic_table__");

@Override
public int getVersion() {
Expand All @@ -122,6 +128,8 @@ public byte[] serialize(TableChangeInfo tableChangeInfo) throws IOException {
SchemaSerializer schemaSerializer = SchemaSerializer.INSTANCE;
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
tableIdSerializer.serialize(MAGIC_TABLE_ID, new DataOutputViewStreamWrapper(out));
out.writeInt(CURRENT_VERSION);
tableIdSerializer.serialize(
tableChangeInfo.getTableId(), new DataOutputViewStreamWrapper(out));
schemaSerializer.serialize(
Expand All @@ -139,6 +147,12 @@ public TableChangeInfo deserialize(int version, byte[] serialized) throws IOExce
try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
DataInputStream in = new DataInputStream(bais)) {
TableId tableId = tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in));
if (tableId.equals(MAGIC_TABLE_ID)) {
version = in.readInt();
tableId = tableIdSerializer.deserialize(new DataInputViewStreamWrapper(in));
} else {
version = VERSION_BEFORE_STATE_COMPATIBILITY;
}
Schema originalSchema =
schemaSerializer.deserialize(version, new DataInputViewStreamWrapper(in));
Schema transformedSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public class ColumnSerializer extends TypeSerializerSingleton<Column> {
private final MetadataColumnSerializer metadataColumnSerializer =
MetadataColumnSerializer.INSTANCE;

private static int currentVersion = 2;

/** Update {@link #currentVersion} as We did not directly include this version in the file. */
public static void updateVersion(int version) {
currentVersion = version;
}

@Override
public boolean isImmutableType() {
return false;
Expand Down Expand Up @@ -92,12 +99,16 @@ public void serialize(Column record, DataOutputView target) throws IOException {

@Override
public Column deserialize(DataInputView source) throws IOException {
return deserialize(currentVersion, source);
}

public Column deserialize(int version, DataInputView source) throws IOException {
ColumnType columnType = enumSerializer.deserialize(source);
switch (columnType) {
case METADATA:
return metadataColumnSerializer.deserialize(source);
case PHYSICAL:
return physicalColumnSerializer.deserialize(source);
return physicalColumnSerializer.deserialize(version, source);
default:
throw new IOException("Unknown column type: " + columnType);
}
Expand Down
Loading