Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.runtime.operators.transform.ProjectionColumn;
Expand Down Expand Up @@ -154,6 +155,12 @@ public static List<ProjectionColumn> generateProjectionColumns(
.collect(
Collectors.toMap(
RelDataTypeField::getName, RelDataTypeField::getType));

Map<String, Boolean> isNotNullMap =
columns.stream()
.collect(
Collectors.toMap(
Column::getName, column -> !column.getType().isNullable()));
List<ProjectionColumn> projectionColumns = new ArrayList<>();
for (SqlNode sqlNode : sqlSelect.getSelectList()) {
if (sqlNode instanceof SqlBasicCall) {
Expand Down Expand Up @@ -205,21 +212,27 @@ public static List<ProjectionColumn> generateProjectionColumns(
} else if (sqlNode instanceof SqlIdentifier) {
SqlIdentifier sqlIdentifier = (SqlIdentifier) sqlNode;
String columnName = sqlIdentifier.names.get(sqlIdentifier.names.size() - 1);
DataType columnType =
DataTypeConverter.convertCalciteRelDataTypeToDataType(
relDataTypeMap.get(columnName));
if (isMetadataColumn(columnName)) {
projectionColumns.add(
ProjectionColumn.of(
columnName,
DataTypeConverter.convertCalciteRelDataTypeToDataType(
relDataTypeMap.get(columnName)),
// Metadata columns should never be null
columnType.notNull(),
columnName,
columnName,
Arrays.asList(columnName)));
} else {
// Calcite translated column type doesn't keep nullability.
// Appending it manually to circumvent this problem.
projectionColumns.add(
ProjectionColumn.of(
columnName,
DataTypeConverter.convertCalciteRelDataTypeToDataType(
relDataTypeMap.get(columnName))));
isNotNullMap.get(columnName)
? columnType.notNull()
: columnType.nullable()));
}
} else {
throw new ParseException("Unrecognized projection: " + sqlNode.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,26 @@ public class TransformSchemaOperatorTest {
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
.build();

private static final Schema NULLABILITY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.STRING().notNull())
.physicalColumn("name", DataTypes.STRING())
.primaryKey("id")
.partitionKey("id")
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
.build();

private static final Schema EXPECTED_NULLABILITY_SCHEMA =
Schema.newBuilder()
.physicalColumn("id", DataTypes.STRING().notNull())
.physicalColumn("uid", DataTypes.STRING())
.physicalColumn("name", DataTypes.STRING())
.physicalColumn("uname", DataTypes.STRING())
.primaryKey("id")
.partitionKey("id")
.options(ImmutableMap.of("key1", "value1", "key2", "value2"))
.build();

@Test
void testEventTransform() throws Exception {
TransformSchemaOperator transform =
Expand Down Expand Up @@ -176,4 +196,33 @@ void testEventTransform() throws Exception {
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(new StreamRecord<>(updateEventExpect));
}

@Test
public void testNullabilityColumn() throws Exception {
TransformSchemaOperator transform =
TransformSchemaOperator.newBuilder()
.addTransform(
CUSTOMERS_TABLEID.identifier(),
"id, upper(id) uid, name, upper(name) uname",
"id",
"id",
"key1=value1,key2=value2")
.build();
EventOperatorTestHarness<TransformSchemaOperator, Event>
transformFunctionEventEventOperatorTestHarness =
new EventOperatorTestHarness<>(transform, 1);
// Initialization
transformFunctionEventEventOperatorTestHarness.open();
// Create table
CreateTableEvent createTableEvent =
new CreateTableEvent(CUSTOMERS_TABLEID, NULLABILITY_SCHEMA);
transform.processElement(new StreamRecord<>(createTableEvent));

Assertions.assertThat(
transformFunctionEventEventOperatorTestHarness.getOutputRecords().poll())
.isEqualTo(
new StreamRecord<>(
new CreateTableEvent(
CUSTOMERS_TABLEID, EXPECTED_NULLABILITY_SCHEMA)));
}
}