Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,25 @@ private static Schema applyAlterColumnTypeEvent(AlterColumnTypeEvent event, Sche
return oldSchema.copy(columns);
}

/**
* This function determines if the given schema change event {@code event} should be sent to
* downstream based on if the given transform rule has asterisk, and what columns are
* referenced.
*
* <p>For example, if {@code hasAsterisk} is false, then all {@code AddColumnEvent} and {@code
* DropColumnEvent} should be ignored since asterisk-less transform should not emit schema
* change events that change number of downstream columns.
*
* <p>Also, {@code referencedColumns} will be used to determine if the schema change event
* affects any referenced columns, since if a column has been projected out of downstream, its
* corresponding schema change events should not be emitted, either.
*
* <p>For the case when {@code hasAsterisk} is true, things will be cleaner since we don't have
* to filter out any schema change events. All we need to do is to change {@code
* AddColumnEvent}'s inserting position, and replacing `FIRST` / `LAST` with column-relative
* position indicators. This is necessary since extra calculated columns might be added, and
* `FIRST` / `LAST` position might differ.
*/
public static Optional<SchemaChangeEvent> transformSchemaChangeEvent(
boolean hasAsterisk, List<String> referencedColumns, SchemaChangeEvent event) {
Optional<SchemaChangeEvent> evolvedSchemaChangeEvent = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,101 @@ void testPostAsteriskWithSchemaEvolution() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[15 -> Oops, 12th, 15, Oops], after=[], op=DELETE, meta=()}");
}

@Test
void testTransformUnmatchedSchemaEvolution() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();

sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);

TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1");
List<Event> events = generateSchemaEvolutionEvents(tableId);

ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));

SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.singletonList(
new TransformDef(
"foo.bar.baz", // This doesn't match given tableId
"*",
null,
null,
null,
null,
null)),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");

assertThat(outputEvents)
.containsExactly(
// Initial stage
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 21], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22], after=[], op=DELETE, meta=()}",

// Add column stage
"AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",

// Alter column type stage
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0], after=[], op=DELETE, meta=()}",

// Rename column stage
"RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0], after=[8th, 11, Kella, 18.0, 0], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0], after=[], op=DELETE, meta=()}",

// Drop column stage
"DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko], after=[11th, 14, Nein], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops], after=[], op=DELETE, meta=()}");
}

private List<Event> generateSchemaEvolutionEvents(TableId tableId) {
List<Event> events = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.lang.reflect.InvocationTargetException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -242,6 +243,8 @@ public void processElement(StreamRecord<Event> element) throws Exception {

private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) throws Exception {
TableId tableId = event.tableId();
List<String> columnNamesBeforeChange = Collections.emptyList();

if (event instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
Set<String> projectedColumnsSet =
Expand Down Expand Up @@ -286,6 +289,9 @@ private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) throws
createTableEvent.getSchema().getColumnNames().stream()
.filter(projectedColumnsSet::contains)
.collect(Collectors.toList()));
} else {
columnNamesBeforeChange =
getPostTransformChangeInfo(tableId).getPreTransformedSchema().getColumnNames();
}

Schema schema;
Expand All @@ -304,9 +310,12 @@ private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) throws

if (event instanceof CreateTableEvent) {
return Optional.of(new CreateTableEvent(tableId, projectedSchema));
} else if (hasAsteriskMap.getOrDefault(tableId, true)) {
// See comments in PreTransformOperator#cacheChangeSchema method.
return SchemaUtils.transformSchemaChangeEvent(true, columnNamesBeforeChange, event);
} else {
return SchemaUtils.transformSchemaChangeEvent(
hasAsteriskMap.get(tableId), projectedColumnsMap.get(tableId), event);
false, projectedColumnsMap.get(tableId), event);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.schema.Selectors;
import org.apache.flink.cdc.common.utils.SchemaUtils;
Expand All @@ -50,7 +49,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand All @@ -71,7 +69,6 @@ public class PreTransformOperator extends AbstractStreamOperator<Event>
private List<UserDefinedFunctionDescriptor> udfDescriptors;
private Map<TableId, PreTransformProcessor> preTransformProcessorMap;
private Map<TableId, Boolean> hasAsteriskMap;
private Map<TableId, List<String>> referencedColumnsMap;

public static PreTransformOperator.Builder newBuilder() {
return new PreTransformOperator.Builder();
Expand Down Expand Up @@ -163,7 +160,6 @@ public void setup(
}
this.preTransformProcessorMap = new ConcurrentHashMap<>();
this.hasAsteriskMap = new ConcurrentHashMap<>();
this.referencedColumnsMap = new ConcurrentHashMap<>();
}

@Override
Expand All @@ -186,8 +182,7 @@ public void initializeState(StateInitializationContext context) throws Exception
new CreateTableEvent(
stateTableChangeInfo.getTableId(),
stateTableChangeInfo.getPreTransformedSchema());
// hasAsteriskMap and referencedColumnsMap needs to be recalculated after restoring
// from a checkpoint.
// hasAsteriskMap needs to be recalculated after restoring from a checkpoint.
cacheTransformRuleInfo(restoredCreateTableEvent);

// Since PostTransformOperator doesn't preserve state, pre-transformed schema
Expand Down Expand Up @@ -258,12 +253,32 @@ private SchemaChangeEvent cacheCreateTable(CreateTableEvent event) {
private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent event) {
TableId tableId = event.tableId();
PreTransformChangeInfo tableChangeInfo = preTransformChangeInfoMap.get(tableId);
List<String> columnNamesBeforeChange = tableChangeInfo.getSourceSchema().getColumnNames();

Schema originalSchema =
SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event);
Schema preTransformedSchema = tableChangeInfo.getPreTransformedSchema();
Optional<SchemaChangeEvent> schemaChangeEvent =
SchemaUtils.transformSchemaChangeEvent(
hasAsteriskMap.get(tableId), referencedColumnsMap.get(tableId), event);

Optional<SchemaChangeEvent> schemaChangeEvent;
if (hasAsteriskMap.getOrDefault(tableId, true)) {
// If this TableId is asterisk-ful, we should use the latest upstream schema as
// referenced columns to perform schema evolution, not of the original ones generated
// when creating tables. If hasAsteriskMap has no entry for this TableId, it means that
// this TableId has not been referenced by any transform rules, and should be regarded
// as asterisk-ful by default.
schemaChangeEvent =
SchemaUtils.transformSchemaChangeEvent(
true, tableChangeInfo.getSourceSchema().getColumnNames(), event);
} else {
// Otherwise, we will use the pre-transformed columns to determine if the given schema
// change event should be passed to downstream, only when it is presented in the
// pre-transformed schema.
schemaChangeEvent =
SchemaUtils.transformSchemaChangeEvent(
false,
tableChangeInfo.getPreTransformedSchema().getColumnNames(),
event);
}
if (schemaChangeEvent.isPresent()) {
preTransformedSchema =
SchemaUtils.applySchemaChangeEvent(
Expand All @@ -276,26 +291,8 @@ private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent event) {

private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {
TableId tableId = createTableEvent.tableId();
Set<String> referencedColumnsSet =
transforms.stream()
.filter(t -> t.getSelectors().isMatch(tableId))
.flatMap(
rule ->
TransformParser.generateReferencedColumns(
rule.getProjection()
.map(TransformProjection::getProjection)
.orElse(null),
rule.getFilter()
.map(TransformFilter::getExpression)
.orElse(null),
createTableEvent.getSchema().getColumns())
.stream())
.map(Column::getName)
.collect(Collectors.toSet());

boolean notTransformed =
transforms.stream().noneMatch(t -> t.getSelectors().isMatch(tableId));

if (notTransformed) {
// If this TableId isn't presented in any transform block, it should behave like a "*"
// projection and should be regarded as asterisk-ful.
Expand All @@ -313,11 +310,6 @@ private void cacheTransformRuleInfo(CreateTableEvent createTableEvent) {

hasAsteriskMap.put(createTableEvent.tableId(), hasAsterisk);
}
referencedColumnsMap.put(
createTableEvent.tableId(),
createTableEvent.getSchema().getColumnNames().stream()
.filter(referencedColumnsSet::contains)
.collect(Collectors.toList()));
}

private CreateTableEvent transformCreateTableEvent(CreateTableEvent createTableEvent) {
Expand Down