Skip to content

Commit

Permalink
[FLINK-25920] Fix AbstractStreamingWriter sending after EOI
Browse files Browse the repository at this point in the history
AbstractStreamingWriter send partition info twice on EOI. This commit ensures that we are not resending partition information even after restarting from a final checkpoint.

(cherry picked from commit 6d60f41)
  • Loading branch information
AHeise committed Nov 13, 2024
1 parent 8068376 commit 1a5e4dc
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -560,29 +560,30 @@ Method <org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.open
Method <org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (BatchFileWriter.java:116)
Method <org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<T>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (BatchFileWriter.java:0)
Method <org.apache.flink.connector.file.table.batch.compact.BatchFileWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (BatchFileWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> in (AbstractStreamingWriter.java:106)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> in (AbstractStreamingWriter.java:125)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> has generic parameter type <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket<IN, java.lang.String>> with type argument depending on <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in (AbstractStreamingWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketCreated(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in (AbstractStreamingWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> in (AbstractStreamingWriter.java:111)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket.getBucketId()> in (AbstractStreamingWriter.java:130)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> has generic parameter type <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket<IN, java.lang.String>> with type argument depending on <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in (AbstractStreamingWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter$1.bucketInactive(org.apache.flink.streaming.api.functions.sink.filesystem.Bucket)> has parameter of type <org.apache.flink.streaming.api.functions.sink.filesystem.Bucket> in (AbstractStreamingWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.close()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.close()> in (AbstractStreamingWriter.java:167)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.commitUpToCheckpoint(long)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.commitUpToCheckpoint(long)> in (AbstractStreamingWriter.java:90)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(long)> in (AbstractStreamingWriter.java:157)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)> in (AbstractStreamingWriter.java:158)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(org.apache.flink.streaming.api.functions.sink.filesystem.Buckets, boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> in (AbstractStreamingWriter.java:122)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setBucketLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener)> in (AbstractStreamingWriter.java:101)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setFileLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener)> in (AbstractStreamingWriter.java:115)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getProcessingTimeService()> in (AbstractStreamingWriter.java:122)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (AbstractStreamingWriter.java:98)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(java.lang.Object, long, java.lang.Long, long)> in (AbstractStreamingWriter.java:142)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getTimestamp()> in (AbstractStreamingWriter.java:145)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (AbstractStreamingWriter.java:143)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.hasTimestamp()> in (AbstractStreamingWriter.java:145)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (AbstractStreamingWriter.java:144)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.close()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.close()> in (AbstractStreamingWriter.java:213)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.commitUpToCheckpoint(long)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.commitUpToCheckpoint(long)> in (AbstractStreamingWriter.java:109)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.onProcessingTime(long)> in (AbstractStreamingWriter.java:202)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.endInput()> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)> in (AbstractStreamingWriter.java:203)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls constructor <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.<init>(org.apache.flink.streaming.api.functions.sink.filesystem.Buckets, boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService, long)> in (AbstractStreamingWriter.java:141)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.shaded.guava31.com.google.common.collect.Lists.newArrayList(java.lang.Iterable)> in (AbstractStreamingWriter.java:164)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setBucketLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.BucketLifeCycleListener)> in (AbstractStreamingWriter.java:120)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.Buckets.setFileLifeCycleListener(org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener)> in (AbstractStreamingWriter.java:134)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getProcessingTimeService()> in (AbstractStreamingWriter.java:141)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.initializeState(org.apache.flink.runtime.state.StateInitializationContext)> calls method <org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getTaskInfo()> in (AbstractStreamingWriter.java:117)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.onElement(java.lang.Object, long, java.lang.Long, long)> in (AbstractStreamingWriter.java:183)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getTimestamp()> in (AbstractStreamingWriter.java:186)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.getValue()> in (AbstractStreamingWriter.java:184)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.streamrecord.StreamRecord.hasTimestamp()> in (AbstractStreamingWriter.java:186)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> calls method <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService.getCurrentProcessingTime()> in (AbstractStreamingWriter.java:185)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has generic parameter type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord<IN>> with type argument depending on <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (AbstractStreamingWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)> has parameter of type <org.apache.flink.streaming.runtime.streamrecord.StreamRecord> in (AbstractStreamingWriter.java:0)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(org.apache.flink.runtime.state.StateSnapshotContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)> in (AbstractStreamingWriter.java:131)
Method <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.snapshotState(org.apache.flink.runtime.state.StateSnapshotContext)> calls method <org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper.snapshotState(long)> in (AbstractStreamingWriter.java:171)
Method <org.apache.flink.connector.file.table.stream.PartitionCommitTrigger.create(boolean, org.apache.flink.api.common.state.OperatorStateStore, org.apache.flink.configuration.Configuration, java.lang.ClassLoader, java.util.List, org.apache.flink.streaming.runtime.tasks.ProcessingTimeService)> has parameter of type <org.apache.flink.streaming.runtime.tasks.ProcessingTimeService> in (PartitionCommitTrigger.java:0)
Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.commitPartitions(long)> calls method <org.apache.flink.table.utils.PartitionPathUtils.extractPartitionSpecFromPath(org.apache.flink.core.fs.Path)> in (PartitionCommitter.java:167)
Method <org.apache.flink.connector.file.table.stream.PartitionCommitter.commitPartitions(long)> calls method <org.apache.flink.table.utils.PartitionPathUtils.generatePartitionPath(java.util.LinkedHashMap)> in (PartitionCommitter.java:172)
Expand Down Expand Up @@ -647,6 +648,7 @@ Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCo
Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory.getInstance()> in (StandardDeCompressors.java:43)
Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory.getInstance()> in (StandardDeCompressors.java:44)
Static Initializer <org.apache.flink.connector.file.src.compression.StandardDeCompressors.<clinit>()> calls method <org.apache.flink.api.common.io.compression.XZInputStreamFactory.getInstance()> in (StandardDeCompressors.java:46)
Static Initializer <org.apache.flink.connector.file.table.stream.AbstractStreamingWriter.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.BooleanSerializer.INSTANCE> in (AbstractStreamingWriter.java:74)
Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.ListSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer)> in (PartitionTimeCommitTrigger.java:52)
Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> calls constructor <org.apache.flink.api.common.typeutils.base.MapSerializer.<init>(org.apache.flink.api.common.typeutils.TypeSerializer, org.apache.flink.api.common.typeutils.TypeSerializer)> in (PartitionTimeCommitTrigger.java:56)
Static Initializer <org.apache.flink.connector.file.table.stream.PartitionTimeCommitTrigger.<clinit>()> gets field <org.apache.flink.api.common.typeutils.base.LongSerializer.INSTANCE> in (PartitionTimeCommitTrigger.java:56)
Expand Down
Loading

0 comments on commit 1a5e4dc

Please sign in to comment.