From 480a9a3a43496a588c8b62bea0efece131498171 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Mon, 19 Aug 2024 16:51:07 +0800 Subject: [PATCH 1/3] [FLINK-36076] Set isSchemaChangeApplying as isSchemaChangeApplying for thread safe. --- .../flink/cdc/composer/flink/FlinkPipelineComposer.java | 3 ++- .../composer/flink/translator/DataSourceTranslator.java | 7 ++++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java index bddd5fc00fe..b3f17b2e52d 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java @@ -97,7 +97,8 @@ public PipelineExecution compose(PipelineDef pipelineDef) { // Build Source Operator DataSourceTranslator sourceTranslator = new DataSourceTranslator(); DataStream stream = - sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig()); + sourceTranslator.translate( + pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism); // Build PreTransformOperator for processing Schema Event TransformTranslator transformTranslator = new TransformTranslator(); diff --git a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java index 7b631c6f1ad..90bbea73ee3 100644 --- a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java +++ b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/DataSourceTranslator.java @@ -23,7 +23,6 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.common.factories.DataSourceFactory; import org.apache.flink.cdc.common.factories.FactoryHelper; -import org.apache.flink.cdc.common.pipeline.PipelineOptions; import org.apache.flink.cdc.common.source.DataSource; import org.apache.flink.cdc.common.source.EventSourceProvider; import org.apache.flink.cdc.common.source.FlinkSourceFunctionProvider; @@ -41,12 +40,14 @@ public class DataSourceTranslator { public DataStreamSource translate( - SourceDef sourceDef, StreamExecutionEnvironment env, Configuration pipelineConfig) { + SourceDef sourceDef, + StreamExecutionEnvironment env, + Configuration pipelineConfig, + int sourceParallelism) { // Create data source DataSource dataSource = createDataSource(sourceDef, env, pipelineConfig); // Get source provider - final int sourceParallelism = pipelineConfig.get(PipelineOptions.PIPELINE_PARALLELISM); EventSourceProvider eventSourceProvider = dataSource.getEventSourceProvider(); if (eventSourceProvider instanceof FlinkSourceProvider) { // Source From 0ff08b7e3ea97eb6933b076715ba0f16b7258f7e Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Mon, 19 Aug 2024 21:22:53 +0800 Subject: [PATCH 2/3] [FLINK-36076] Set isSchemaChangeApplying as isSchemaChangeApplying for thread safe. --- .../schema/coordinator/SchemaRegistryRequestHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 77360169e24..e80ab8c955c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -94,7 +94,7 @@ public class SchemaRegistryRequestHandler implements Closeable { private final Set flushedSinkWriters; /** Status of the execution of current schema change request. */ - private boolean isSchemaChangeApplying; + private transient boolean isSchemaChangeApplying; /** Executor service to execute schema change. */ private final ExecutorService schemaChangeThreadPool; From 178d9fea725fc6e884e2507e8ec8426a45b0feb2 Mon Sep 17 00:00:00 2001 From: Hongshun Wang Date: Mon, 19 Aug 2024 21:25:17 +0800 Subject: [PATCH 3/3] [FLINK-36076] Set isSchemaChangeApplying as volatile for thread safe. --- .../schema/coordinator/SchemaRegistryRequestHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index e80ab8c955c..da88753e581 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -94,7 +94,7 @@ public class SchemaRegistryRequestHandler implements Closeable { private final Set flushedSinkWriters; /** Status of the execution of current schema change request. */ - private transient boolean isSchemaChangeApplying; + private volatile boolean isSchemaChangeApplying; /** Executor service to execute schema change. */ private final ExecutorService schemaChangeThreadPool;