From ad7c2b1e3686aed4121fe337affee9eac1fc4029 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Thu, 16 Jan 2025 18:18:49 +0800 Subject: [PATCH] [FLINK-36406][cdc-runtime] Close MetadataApplier when the job stops --- .../runtime/operators/schema/common/SchemaRegistry.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java index 566dffa93e6..149049177d9 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/common/SchemaRegistry.java @@ -45,6 +45,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.time.Duration; import java.util.List; import java.util.Map; @@ -130,6 +131,12 @@ public void start() throws Exception { public void close() throws Exception { LOG.info("Closing SchemaRegistry - {}.", operatorName); coordinatorExecutor.shutdown(); + try { + metadataApplier.close(); + } catch (Exception e) { + LOG.error("Failed to close metadata applier.", e); + throw new IOException("Failed to close metadata applier.", e); + } } // ------------------------------