diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java index 566dffa93e6..149049177d9 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java @@ -45,6 +45,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.time.Duration; import java.util.List; import java.util.Map; @@ -130,6 +131,12 @@ public void start() throws Exception { public void close() throws Exception { LOG.info("Closing SchemaRegistry - {}.", operatorName); coordinatorExecutor.shutdown(); + try { + metadataApplier.close(); + } catch (Exception e) { + LOG.error("Failed to close metadata applier.", e); + throw new IOException("Failed to close metadata applier.", e); + } } // ------------------------------