Skip to content

Commit bf00d51

Browse files
authored
Flink: backport PR #10832 of inferring parallelism in FLIP-27 source (#11009)
1 parent 64b3699 commit bf00d51

14 files changed

+586
-73
lines changed

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java

+82-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.concurrent.TimeUnit;
2929
import javax.annotation.Nullable;
3030
import org.apache.flink.annotation.Experimental;
31+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
32+
import org.apache.flink.api.common.typeinfo.TypeInformation;
3133
import org.apache.flink.api.connector.source.Boundedness;
3234
import org.apache.flink.api.connector.source.Source;
3335
import org.apache.flink.api.connector.source.SourceReader;
@@ -37,6 +39,9 @@
3739
import org.apache.flink.configuration.Configuration;
3840
import org.apache.flink.configuration.ReadableConfig;
3941
import org.apache.flink.core.io.SimpleVersionedSerializer;
42+
import org.apache.flink.streaming.api.datastream.DataStream;
43+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
44+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4045
import org.apache.flink.table.api.TableSchema;
4146
import org.apache.flink.table.data.RowData;
4247
import org.apache.flink.util.Preconditions;
@@ -74,6 +79,7 @@
7479
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
7580
import org.apache.iceberg.flink.source.split.SerializableComparator;
7681
import org.apache.iceberg.flink.source.split.SplitComparators;
82+
import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
7783
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
7884
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
7985
import org.apache.iceberg.util.ThreadPools;
@@ -97,6 +103,11 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
97103
private final SerializableRecordEmitter<T> emitter;
98104
private final String tableName;
99105

106+
// cache the discovered splits by planSplitsForBatch, which can be called twice. And they come
107+
// from two different threads: (1) source/stream construction by main thread (2) enumerator
108+
// creation. Hence need volatile here.
109+
private volatile List<IcebergSourceSplit> batchSplits;
110+
100111
IcebergSource(
101112
TableLoader tableLoader,
102113
ScanContext scanContext,
@@ -132,16 +143,26 @@ private String planningThreadName() {
132143
return tableName + "-" + UUID.randomUUID();
133144
}
134145

146+
/**
147+
* Cache the enumerated splits for batch execution to avoid double planning as there are two code
148+
* paths obtaining splits: (1) infer parallelism (2) enumerator creation.
149+
*/
135150
private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
151+
if (batchSplits != null) {
152+
return batchSplits;
153+
}
154+
136155
ExecutorService workerPool =
137156
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
138157
try (TableLoader loader = tableLoader.clone()) {
139158
loader.open();
140-
List<IcebergSourceSplit> splits =
159+
this.batchSplits =
141160
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
142161
LOG.info(
143-
"Discovered {} splits from table {} during job initialization", splits.size(), tableName);
144-
return splits;
162+
"Discovered {} splits from table {} during job initialization",
163+
batchSplits.size(),
164+
tableName);
165+
return batchSplits;
145166
} catch (IOException e) {
146167
throw new UncheckedIOException("Failed to close table loader", e);
147168
} finally {
@@ -207,12 +228,35 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
207228
// Only do scan planning if nothing is restored from checkpoint state
208229
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
209230
assigner.onDiscoveredSplits(splits);
231+
// clear the cached splits after enumerator creation as they won't be needed anymore
232+
this.batchSplits = null;
210233
}
211234

212235
return new StaticIcebergEnumerator(enumContext, assigner);
213236
}
214237
}
215238

239+
private boolean shouldInferParallelism() {
240+
return !scanContext.isStreaming();
241+
}
242+
243+
private int inferParallelism(ReadableConfig flinkConf, StreamExecutionEnvironment env) {
244+
int parallelism =
245+
SourceUtil.inferParallelism(
246+
flinkConf,
247+
scanContext.limit(),
248+
() -> {
249+
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
250+
return splits.size();
251+
});
252+
253+
if (env.getMaxParallelism() > 0) {
254+
parallelism = Math.min(parallelism, env.getMaxParallelism());
255+
}
256+
257+
return parallelism;
258+
}
259+
216260
/**
217261
* Create a source builder.
218262
*
@@ -571,6 +615,41 @@ public IcebergSource<T> build() {
571615
emitter);
572616
}
573617

618+
/**
619+
* Build the {@link IcebergSource} and create a {@link DataStream} from the source. Watermark
620+
* strategy is set to {@link WatermarkStrategy#noWatermarks()}.
621+
*
622+
* @return data stream from the Iceberg source
623+
*/
624+
public DataStream<T> buildStream(StreamExecutionEnvironment env) {
625+
// buildStream should only be called with RowData or Converter paths.
626+
Preconditions.checkState(
627+
readerFunction == null,
628+
"Cannot set reader function when building a data stream from the source");
629+
IcebergSource<T> source = build();
630+
TypeInformation<T> outputTypeInfo =
631+
outputTypeInfo(converter, table.schema(), source.scanContext.project());
632+
DataStreamSource<T> stream =
633+
env.fromSource(source, WatermarkStrategy.noWatermarks(), source.name(), outputTypeInfo);
634+
if (source.shouldInferParallelism()) {
635+
stream = stream.setParallelism(source.inferParallelism(flinkConfig, env));
636+
}
637+
638+
return stream;
639+
}
640+
641+
private static <T> TypeInformation<T> outputTypeInfo(
642+
RowDataConverter<T> converter, Schema tableSchema, Schema projected) {
643+
if (converter != null) {
644+
return converter.getProducedType();
645+
} else {
646+
// output type is RowData
647+
Schema readSchema = projected != null ? projected : tableSchema;
648+
return (TypeInformation<T>)
649+
FlinkCompatibilityUtil.toTypeInfo(FlinkSchemaUtil.convert(readSchema));
650+
}
651+
}
652+
574653
private ReaderFunction<T> readerFunction(ScanContext context) {
575654
if (table instanceof BaseMetadataTable) {
576655
MetaDataReaderFunction rowDataReaderFunction =

flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java

+10-21
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,8 @@
2323
import java.util.Map;
2424
import java.util.Optional;
2525
import org.apache.flink.annotation.Internal;
26-
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
27-
import org.apache.flink.api.common.typeinfo.TypeInformation;
2826
import org.apache.flink.configuration.ReadableConfig;
2927
import org.apache.flink.streaming.api.datastream.DataStream;
30-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
3128
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3229
import org.apache.flink.table.api.TableSchema;
3330
import org.apache.flink.table.connector.ChangelogMode;
@@ -128,26 +125,18 @@ private DataStream<RowData> createDataStream(StreamExecutionEnvironment execEnv)
128125
.build();
129126
}
130127

131-
private DataStreamSource<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
128+
private DataStream<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
132129
SplitAssignerType assignerType =
133130
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
134-
IcebergSource<RowData> source =
135-
IcebergSource.forRowData()
136-
.tableLoader(loader)
137-
.assignerFactory(assignerType.factory())
138-
.properties(properties)
139-
.project(getProjectedSchema())
140-
.limit(limit)
141-
.filters(filters)
142-
.flinkConfig(readableConfig)
143-
.build();
144-
DataStreamSource stream =
145-
env.fromSource(
146-
source,
147-
WatermarkStrategy.noWatermarks(),
148-
source.name(),
149-
TypeInformation.of(RowData.class));
150-
return stream;
131+
return IcebergSource.forRowData()
132+
.tableLoader(loader)
133+
.assignerFactory(assignerType.factory())
134+
.properties(properties)
135+
.project(getProjectedSchema())
136+
.limit(limit)
137+
.filters(filters)
138+
.flinkConfig(readableConfig)
139+
.buildStream(env);
151140
}
152141

153142
private TableSchema getProjectedSchema() {

flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import java.util.Collections;
2525
import java.util.List;
2626
import java.util.Map;
27-
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
28-
import org.apache.flink.api.common.typeinfo.TypeInformation;
2927
import org.apache.flink.configuration.Configuration;
3028
import org.apache.flink.streaming.api.datastream.DataStream;
3129
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -130,11 +128,8 @@ protected List<Row> run(
130128
sourceBuilder.properties(options);
131129

132130
DataStream<Row> stream =
133-
env.fromSource(
134-
sourceBuilder.build(),
135-
WatermarkStrategy.noWatermarks(),
136-
"testBasicRead",
137-
TypeInformation.of(RowData.class))
131+
sourceBuilder
132+
.buildStream(env)
138133
.map(
139134
new RowDataToRowMapper(
140135
FlinkSchemaUtil.convert(

flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class TestIcebergSourceBoundedSql extends TestIcebergSourceBounded {
3737
@BeforeEach
3838
public void before() throws IOException {
3939
Configuration tableConf = getTableEnv().getConfig().getConfiguration();
40-
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true);
40+
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
4141
SqlHelpers.sql(
4242
getTableEnv(),
4343
"create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')",

0 commit comments

Comments
 (0)