From f800a4775a51c9540d27398e8d06fe4671f13003 Mon Sep 17 00:00:00 2001 From: Sergei Morozov Date: Sun, 29 Sep 2024 21:30:36 -0700 Subject: [PATCH] [FLINK-36406]: Log exceptions in SchemaRegistryRequestHandler#close() --- .../coordinator/SchemaRegistryRequestHandler.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 3badd063a2..951dbfa225 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; -import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -343,8 +342,13 @@ public void getSchemaChangeResult(CompletableFuture respon } } + /** + * As at Flink 1.20, the runtime ( + * DefaultOperatorCoordinatorHandler#disposeAllOperatorCoordinators) will ignore the + * exception thrown by this method. Thus, it should report errors by logging them. + */ @Override - public void close() throws IOException { + public void close() { if (schemaChangeThreadPool != null) { schemaChangeThreadPool.shutdown(); } @@ -352,7 +356,7 @@ public void close() throws IOException { try { metadataApplier.close(); } catch (Exception e) { - throw new IOException("Failed to close metadata applier.", e); + LOG.error("Failed to close metadata applier.", e); } }