From 059c6edbe6a08c7f5d79356aaddce8381e13311b Mon Sep 17 00:00:00 2001 From: dujie Date: Sun, 28 Apr 2024 14:20:11 +0800 Subject: [PATCH] [FLINK-35255][flink-cdc-runtime]DataSinkWriterOperator adds overrides for the snapshotState and processWatermark methods --- .../sink/DataSinkWriterOperator.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java index c8056cc7a5d..bdb384cbb79 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/sink/DataSinkWriterOperator.java @@ -29,6 +29,7 @@ import org.apache.flink.cdc.common.schema.Schema; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.connector.sink2.CommittableMessage; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -36,9 +37,11 @@ import org.apache.flink.streaming.api.operators.ChainingStrategy; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.Output; +import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus; import java.lang.reflect.Constructor; import java.lang.reflect.Field; @@ -123,6 +126,26 @@ public void initializeState(StateInitializationContext context) throws Exception .initializeState(context); } + @Override + public void snapshotState(StateSnapshotContext context) throws Exception { + this.>>getFlinkWriterOperator() + .snapshotState(context); + } + + @Override + public void processWatermark(Watermark mark) throws Exception { + super.processWatermark(mark); + this.>>getFlinkWriterOperator() + .processWatermark(mark); + } + + @Override + public void processWatermarkStatus(WatermarkStatus watermarkStatus) throws Exception { + super.processWatermarkStatus(watermarkStatus); + this.>>getFlinkWriterOperator() + .processWatermarkStatus(watermarkStatus); + } + @Override public void processElement(StreamRecord element) throws Exception { Event event = element.getValue();