diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index 6ada032d56f..90effc4d747 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -31,7 +31,6 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.event.FlushEvent; import org.apache.flink.cdc.common.event.SchemaChangeEvent; -import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.TableId; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.route.RouteRule; @@ -433,15 +432,6 @@ private TableId resolveReplacement( private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaChangeEvent) throws InterruptedException, TimeoutException { - if (schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION - && schemaChangeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) { - // CreateTableEvent should be applied even in EXCEPTION mode - throw new RuntimeException( - String.format( - "Refused to apply schema change event %s in EXCEPTION mode.", - schemaChangeEvent)); - } - // The request will block if another schema change event is being handled SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent); if (response.isAccepted()) { diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index b2a8a822df2..b2a90cfd73b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -240,6 +240,13 @@ private void applySchemaChange( TableId tableId, List derivedSchemaChangeEvents) { for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) { if (changeEvent.getType() != SchemaChangeEventType.CREATE_TABLE) { + if (schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION) { + // throw exception after all sink flush success + throw new RuntimeException( + String.format( + "Refused to apply schema change event %s in EXCEPTION mode.", + changeEvent)); + } if (schemaChangeBehavior == SchemaChangeBehavior.IGNORE) { currentIgnoredSchemaChanges.add(changeEvent); continue;