Skip to content

Commit

Permalink
[cdc-runtime] Improve DataSinkWriterOperator logic that only emit lat…
Browse files Browse the repository at this point in the history
…est schema for events that are not CreateTableEvent
  • Loading branch information
PatrickRen committed Nov 28, 2023
1 parent 369be8c commit 8e54e46
Showing 1 changed file with 45 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.ververica.cdc.common.annotation.Internal;
import com.ververica.cdc.common.event.ChangeEvent;
import com.ververica.cdc.common.event.CreateTableEvent;
import com.ververica.cdc.common.event.DataChangeEvent;
import com.ververica.cdc.common.event.Event;
import com.ververica.cdc.common.event.FlushEvent;
import com.ververica.cdc.common.event.TableId;
Expand Down Expand Up @@ -125,32 +124,30 @@ public void initializeState(StateInitializationContext context) throws Exception
@Override
public void processElement(StreamRecord<Event> element) throws Exception {
Event event = element.getValue();

// FlushEvent triggers flush
if (event instanceof FlushEvent) {
copySinkWriter.flush(false);
schemaEvolutionClient.notifyFlushSuccess(
getRuntimeContext().getIndexOfThisSubtask(), ((FlushEvent) event).getTableId());
} else {
TableId tableId = ((ChangeEvent) event).tableId();
if (event instanceof DataChangeEvent && !processedTableIds.contains(tableId)) {
Optional<Schema> schema = schemaEvolutionClient.getLatestSchema(tableId);
if (schema.isPresent()) {
// request and process CreateTableEvent because SinkWriter need to retrieve
// Schema to deserialize RecordData after resuming job.
this
.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>
getFlinkWriterOperator()
.processElement(
new StreamRecord<>(
new CreateTableEvent(tableId, schema.get())));
processedTableIds.add(tableId);
} else {
throw new RuntimeException(
"Could not find schema message from SchemaRegistry for " + tableId);
}
}
this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator()
.processElement(element);
handleFlushEvent(((FlushEvent) event));
return;
}

// CreateTableEvent marks the table as processed directly
if (event instanceof CreateTableEvent) {
processedTableIds.add(((CreateTableEvent) event).tableId());
return;
}

// Check if the table is processed before emitting all other events, because we have to make
// sure that sink have a view of the full schema before processing any change events,
// including schema changes.
ChangeEvent changeEvent = (ChangeEvent) event;
if (!processedTableIds.contains(changeEvent.tableId())) {
emitLatestSchema(changeEvent.tableId());
processedTableIds.add(changeEvent.tableId());
}
processedTableIds.add(changeEvent.tableId());
this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator()
.processElement(element);
}

@Override
Expand All @@ -170,6 +167,29 @@ public void endInput() throws Exception {
this.<BoundedOneInput>getFlinkWriterOperator().endInput();
}

// ----------------------------- Helper functions -------------------------------

private void handleFlushEvent(FlushEvent event) throws Exception {
copySinkWriter.flush(false);
schemaEvolutionClient.notifyFlushSuccess(
getRuntimeContext().getIndexOfThisSubtask(), event.getTableId());
}

private void emitLatestSchema(TableId tableId) throws Exception {
Optional<Schema> schema = schemaEvolutionClient.getLatestSchema(tableId);
if (schema.isPresent()) {
// request and process CreateTableEvent because SinkWriter need to retrieve
// Schema to deserialize RecordData after resuming job.
this.<OneInputStreamOperator<Event, CommittableMessage<CommT>>>getFlinkWriterOperator()
.processElement(
new StreamRecord<>(new CreateTableEvent(tableId, schema.get())));
processedTableIds.add(tableId);
} else {
throw new RuntimeException(
"Could not find schema message from SchemaRegistry for " + tableId);
}
}

// -------------------------- Reflection helper functions --------------------------

private Object createFlinkWriterOperator() {
Expand Down

0 comments on commit 8e54e46

Please sign in to comment.