28
28
import java .util .concurrent .TimeUnit ;
29
29
import javax .annotation .Nullable ;
30
30
import org .apache .flink .annotation .Experimental ;
31
+ import org .apache .flink .api .common .eventtime .WatermarkStrategy ;
32
+ import org .apache .flink .api .common .typeinfo .TypeInformation ;
31
33
import org .apache .flink .api .connector .source .Boundedness ;
32
34
import org .apache .flink .api .connector .source .Source ;
33
35
import org .apache .flink .api .connector .source .SourceReader ;
37
39
import org .apache .flink .configuration .Configuration ;
38
40
import org .apache .flink .configuration .ReadableConfig ;
39
41
import org .apache .flink .core .io .SimpleVersionedSerializer ;
42
+ import org .apache .flink .streaming .api .datastream .DataStream ;
43
+ import org .apache .flink .streaming .api .datastream .DataStreamSource ;
44
+ import org .apache .flink .streaming .api .environment .StreamExecutionEnvironment ;
40
45
import org .apache .flink .table .api .TableSchema ;
41
46
import org .apache .flink .table .data .RowData ;
42
47
import org .apache .flink .util .Preconditions ;
74
79
import org .apache .iceberg .flink .source .split .IcebergSourceSplitSerializer ;
75
80
import org .apache .iceberg .flink .source .split .SerializableComparator ;
76
81
import org .apache .iceberg .flink .source .split .SplitComparators ;
82
+ import org .apache .iceberg .flink .util .FlinkCompatibilityUtil ;
77
83
import org .apache .iceberg .relocated .com .google .common .collect .Maps ;
78
84
import org .apache .iceberg .relocated .com .google .common .collect .Sets ;
79
85
import org .apache .iceberg .util .ThreadPools ;
@@ -97,6 +103,11 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
97
103
private final SerializableRecordEmitter <T > emitter ;
98
104
private final String tableName ;
99
105
106
+ // cache the discovered splits by planSplitsForBatch, which can be called twice. And they come
107
+ // from two different threads: (1) source/stream construction by main thread (2) enumerator
108
+ // creation. Hence need volatile here.
109
+ private volatile List <IcebergSourceSplit > batchSplits ;
110
+
100
111
IcebergSource (
101
112
TableLoader tableLoader ,
102
113
ScanContext scanContext ,
@@ -132,16 +143,26 @@ private String planningThreadName() {
132
143
return tableName + "-" + UUID .randomUUID ();
133
144
}
134
145
146
+ /**
147
+ * Cache the enumerated splits for batch execution to avoid double planning as there are two code
148
+ * paths obtaining splits: (1) infer parallelism (2) enumerator creation.
149
+ */
135
150
private List <IcebergSourceSplit > planSplitsForBatch (String threadName ) {
151
+ if (batchSplits != null ) {
152
+ return batchSplits ;
153
+ }
154
+
136
155
ExecutorService workerPool =
137
156
ThreadPools .newWorkerPool (threadName , scanContext .planParallelism ());
138
157
try (TableLoader loader = tableLoader .clone ()) {
139
158
loader .open ();
140
- List < IcebergSourceSplit > splits =
159
+ this . batchSplits =
141
160
FlinkSplitPlanner .planIcebergSourceSplits (loader .loadTable (), scanContext , workerPool );
142
161
LOG .info (
143
- "Discovered {} splits from table {} during job initialization" , splits .size (), tableName );
144
- return splits ;
162
+ "Discovered {} splits from table {} during job initialization" ,
163
+ batchSplits .size (),
164
+ tableName );
165
+ return batchSplits ;
145
166
} catch (IOException e ) {
146
167
throw new UncheckedIOException ("Failed to close table loader" , e );
147
168
} finally {
@@ -207,12 +228,35 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
207
228
// Only do scan planning if nothing is restored from checkpoint state
208
229
List <IcebergSourceSplit > splits = planSplitsForBatch (planningThreadName ());
209
230
assigner .onDiscoveredSplits (splits );
231
+ // clear the cached splits after enumerator creation as they won't be needed anymore
232
+ this .batchSplits = null ;
210
233
}
211
234
212
235
return new StaticIcebergEnumerator (enumContext , assigner );
213
236
}
214
237
}
215
238
239
+ private boolean shouldInferParallelism () {
240
+ return !scanContext .isStreaming ();
241
+ }
242
+
243
+ private int inferParallelism (ReadableConfig flinkConf , StreamExecutionEnvironment env ) {
244
+ int parallelism =
245
+ SourceUtil .inferParallelism (
246
+ flinkConf ,
247
+ scanContext .limit (),
248
+ () -> {
249
+ List <IcebergSourceSplit > splits = planSplitsForBatch (planningThreadName ());
250
+ return splits .size ();
251
+ });
252
+
253
+ if (env .getMaxParallelism () > 0 ) {
254
+ parallelism = Math .min (parallelism , env .getMaxParallelism ());
255
+ }
256
+
257
+ return parallelism ;
258
+ }
259
+
216
260
/**
217
261
* Create a source builder.
218
262
*
@@ -571,6 +615,41 @@ public IcebergSource<T> build() {
571
615
emitter );
572
616
}
573
617
618
+ /**
619
+ * Build the {@link IcebergSource} and create a {@link DataStream} from the source. Watermark
620
+ * strategy is set to {@link WatermarkStrategy#noWatermarks()}.
621
+ *
622
+ * @return data stream from the Iceberg source
623
+ */
624
+ public DataStream <T > buildStream (StreamExecutionEnvironment env ) {
625
+ // buildStream should only be called with RowData or Converter paths.
626
+ Preconditions .checkState (
627
+ readerFunction == null ,
628
+ "Cannot set reader function when building a data stream from the source" );
629
+ IcebergSource <T > source = build ();
630
+ TypeInformation <T > outputTypeInfo =
631
+ outputTypeInfo (converter , table .schema (), source .scanContext .project ());
632
+ DataStreamSource <T > stream =
633
+ env .fromSource (source , WatermarkStrategy .noWatermarks (), source .name (), outputTypeInfo );
634
+ if (source .shouldInferParallelism ()) {
635
+ stream = stream .setParallelism (source .inferParallelism (flinkConfig , env ));
636
+ }
637
+
638
+ return stream ;
639
+ }
640
+
641
+ private static <T > TypeInformation <T > outputTypeInfo (
642
+ RowDataConverter <T > converter , Schema tableSchema , Schema projected ) {
643
+ if (converter != null ) {
644
+ return converter .getProducedType ();
645
+ } else {
646
+ // output type is RowData
647
+ Schema readSchema = projected != null ? projected : tableSchema ;
648
+ return (TypeInformation <T >)
649
+ FlinkCompatibilityUtil .toTypeInfo (FlinkSchemaUtil .convert (readSchema ));
650
+ }
651
+ }
652
+
574
653
private ReaderFunction <T > readerFunction (ScanContext context ) {
575
654
if (table instanceof BaseMetadataTable ) {
576
655
MetaDataReaderFunction rowDataReaderFunction =
0 commit comments