Skip to content

Commit 32310d4

Browse files
committed
Flink: infer source parallelism for FLIP-27 source in batch execution mode
1 parent be29eaf commit 32310d4

File tree

7 files changed

+282
-36
lines changed

7 files changed

+282
-36
lines changed

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java

+71-3
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import java.util.concurrent.TimeUnit;
2929
import javax.annotation.Nullable;
3030
import org.apache.flink.annotation.Experimental;
31+
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
32+
import org.apache.flink.api.common.typeinfo.TypeInformation;
3133
import org.apache.flink.api.connector.source.Boundedness;
3234
import org.apache.flink.api.connector.source.Source;
3335
import org.apache.flink.api.connector.source.SourceReader;
@@ -37,6 +39,9 @@
3739
import org.apache.flink.configuration.Configuration;
3840
import org.apache.flink.configuration.ReadableConfig;
3941
import org.apache.flink.core.io.SimpleVersionedSerializer;
42+
import org.apache.flink.streaming.api.datastream.DataStream;
43+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
44+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
4045
import org.apache.flink.table.api.TableSchema;
4146
import org.apache.flink.table.data.RowData;
4247
import org.apache.flink.util.Preconditions;
@@ -97,6 +102,11 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
97102
private final SerializableRecordEmitter<T> emitter;
98103
private final String tableName;
99104

105+
// cache the discovered splits by planSplitsForBatch, which can be called twice. And they come
106+
// from two different threads: (1) source/stream construction by main thread (2) enumerator
107+
// creation. Hence need volatile here.
108+
private volatile List<IcebergSourceSplit> batchSplits;
109+
100110
IcebergSource(
101111
TableLoader tableLoader,
102112
ScanContext scanContext,
@@ -132,16 +142,26 @@ private String planningThreadName() {
132142
return tableName + "-" + UUID.randomUUID();
133143
}
134144

145+
/**
146+
* Cache the enumerated splits for batch execution to avoid double planning as there are two code
147+
* paths obtaining splits: (1) infer parallelism (2) enumerator creation.
148+
*/
135149
private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
150+
if (batchSplits != null) {
151+
return batchSplits;
152+
}
153+
136154
ExecutorService workerPool =
137155
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
138156
try (TableLoader loader = tableLoader.clone()) {
139157
loader.open();
140-
List<IcebergSourceSplit> splits =
158+
this.batchSplits =
141159
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
142160
LOG.info(
143-
"Discovered {} splits from table {} during job initialization", splits.size(), tableName);
144-
return splits;
161+
"Discovered {} splits from table {} during job initialization",
162+
batchSplits.size(),
163+
tableName);
164+
return batchSplits;
145165
} catch (IOException e) {
146166
throw new UncheckedIOException("Failed to close table loader", e);
147167
} finally {
@@ -207,12 +227,35 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
207227
// Only do scan planning if nothing is restored from checkpoint state
208228
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
209229
assigner.onDiscoveredSplits(splits);
230+
// clear the cached splits after enumerator creation as they won't be needed anymore
231+
this.batchSplits = null;
210232
}
211233

212234
return new StaticIcebergEnumerator(enumContext, assigner);
213235
}
214236
}
215237

238+
private boolean shouldInferParallelism() {
239+
return !scanContext.isStreaming();
240+
}
241+
242+
private int inferParallelism(ReadableConfig flinkConf, StreamExecutionEnvironment env) {
243+
int parallelism =
244+
SourceUtil.inferParallelism(
245+
flinkConf,
246+
scanContext.limit(),
247+
() -> {
248+
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
249+
return splits.size();
250+
});
251+
252+
if (env.getMaxParallelism() > 0) {
253+
parallelism = Math.min(parallelism, env.getMaxParallelism());
254+
}
255+
256+
return parallelism;
257+
}
258+
216259
/**
217260
* Create a source builder.
218261
*
@@ -567,6 +610,31 @@ public IcebergSource<T> build() {
567610
emitter);
568611
}
569612

613+
/**
614+
* Build the {@link IcebergSource} and create a {@link DataStream} from the source. Watermark
615+
* strategy is set to {@link WatermarkStrategy#noWatermarks()}.
616+
*
617+
* @return data stream from the Iceberg source
618+
*/
619+
public DataStream<T> buildStream(StreamExecutionEnvironment env) {
620+
// buildStream should only be called with RowData or Converter paths.
621+
Preconditions.checkState(
622+
readerFunction == null,
623+
"Cannot set reader function when building a data stream from the source");
624+
TypeInformation<T> outputTypeInfo =
625+
converter != null
626+
? converter.getProducedType()
627+
: (TypeInformation<T>) TypeInformation.of(RowData.class);
628+
IcebergSource<T> source = build();
629+
DataStreamSource<T> stream =
630+
env.fromSource(source, WatermarkStrategy.noWatermarks(), source.name(), outputTypeInfo);
631+
if (source.shouldInferParallelism()) {
632+
stream = stream.setParallelism(source.inferParallelism(flinkConfig, env));
633+
}
634+
635+
return stream;
636+
}
637+
570638
private ReaderFunction<T> readerFunction(ScanContext context) {
571639
if (table instanceof BaseMetadataTable) {
572640
MetaDataReaderFunction rowDataReaderFunction =

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/source/IcebergTableSource.java

+10-21
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,8 @@
2323
import java.util.Map;
2424
import java.util.Optional;
2525
import org.apache.flink.annotation.Internal;
26-
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
27-
import org.apache.flink.api.common.typeinfo.TypeInformation;
2826
import org.apache.flink.configuration.ReadableConfig;
2927
import org.apache.flink.streaming.api.datastream.DataStream;
30-
import org.apache.flink.streaming.api.datastream.DataStreamSource;
3128
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3229
import org.apache.flink.table.api.TableSchema;
3330
import org.apache.flink.table.connector.ChangelogMode;
@@ -128,26 +125,18 @@ private DataStream<RowData> createDataStream(StreamExecutionEnvironment execEnv)
128125
.build();
129126
}
130127

131-
private DataStreamSource<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
128+
private DataStream<RowData> createFLIP27Stream(StreamExecutionEnvironment env) {
132129
SplitAssignerType assignerType =
133130
readableConfig.get(FlinkConfigOptions.TABLE_EXEC_SPLIT_ASSIGNER_TYPE);
134-
IcebergSource<RowData> source =
135-
IcebergSource.forRowData()
136-
.tableLoader(loader)
137-
.assignerFactory(assignerType.factory())
138-
.properties(properties)
139-
.project(getProjectedSchema())
140-
.limit(limit)
141-
.filters(filters)
142-
.flinkConfig(readableConfig)
143-
.build();
144-
DataStreamSource stream =
145-
env.fromSource(
146-
source,
147-
WatermarkStrategy.noWatermarks(),
148-
source.name(),
149-
TypeInformation.of(RowData.class));
150-
return stream;
131+
return IcebergSource.forRowData()
132+
.tableLoader(loader)
133+
.assignerFactory(assignerType.factory())
134+
.properties(properties)
135+
.project(getProjectedSchema())
136+
.limit(limit)
137+
.filters(filters)
138+
.flinkConfig(readableConfig)
139+
.buildStream(env);
151140
}
152141

153142
private TableSchema getProjectedSchema() {

flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBounded.java

+2-7
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@
2424
import java.util.Collections;
2525
import java.util.List;
2626
import java.util.Map;
27-
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
28-
import org.apache.flink.api.common.typeinfo.TypeInformation;
2927
import org.apache.flink.configuration.Configuration;
3028
import org.apache.flink.streaming.api.datastream.DataStream;
3129
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -130,11 +128,8 @@ protected List<Row> run(
130128
sourceBuilder.properties(options);
131129

132130
DataStream<Row> stream =
133-
env.fromSource(
134-
sourceBuilder.build(),
135-
WatermarkStrategy.noWatermarks(),
136-
"testBasicRead",
137-
TypeInformation.of(RowData.class))
131+
sourceBuilder
132+
.buildStream(env)
138133
.map(
139134
new RowDataToRowMapper(
140135
FlinkSchemaUtil.convert(

flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceBoundedSql.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public class TestIcebergSourceBoundedSql extends TestIcebergSourceBounded {
3737
@BeforeEach
3838
public void before() throws IOException {
3939
Configuration tableConf = getTableEnv().getConfig().getConfiguration();
40-
tableConf.setBoolean(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE.key(), true);
40+
tableConf.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_USE_FLIP27_SOURCE, true);
4141
SqlHelpers.sql(
4242
getTableEnv(),
4343
"create catalog iceberg_catalog with ('type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='%s')",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.flink.source;
20+
21+
import static org.apache.iceberg.flink.MiniFlinkClusterExtension.DISABLE_CLASSLOADER_CHECK_CONFIG;
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
24+
import java.io.IOException;
25+
import java.lang.reflect.Field;
26+
import java.nio.file.Path;
27+
import java.util.List;
28+
import org.apache.flink.configuration.Configuration;
29+
import org.apache.flink.core.execution.JobClient;
30+
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
31+
import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
32+
import org.apache.flink.runtime.minicluster.MiniCluster;
33+
import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
34+
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
35+
import org.apache.flink.streaming.api.datastream.DataStream;
36+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
37+
import org.apache.flink.test.junit5.MiniClusterExtension;
38+
import org.apache.flink.types.Row;
39+
import org.apache.flink.util.CloseableIterator;
40+
import org.apache.iceberg.FileFormat;
41+
import org.apache.iceberg.Table;
42+
import org.apache.iceberg.data.GenericAppenderHelper;
43+
import org.apache.iceberg.data.RandomGenericData;
44+
import org.apache.iceberg.data.Record;
45+
import org.apache.iceberg.flink.FlinkConfigOptions;
46+
import org.apache.iceberg.flink.FlinkSchemaUtil;
47+
import org.apache.iceberg.flink.HadoopCatalogExtension;
48+
import org.apache.iceberg.flink.TestFixtures;
49+
import org.apache.iceberg.flink.data.RowDataToRowMapper;
50+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
51+
import org.junit.jupiter.api.AfterEach;
52+
import org.junit.jupiter.api.BeforeEach;
53+
import org.junit.jupiter.api.Test;
54+
import org.junit.jupiter.api.extension.RegisterExtension;
55+
import org.junit.jupiter.api.io.TempDir;
56+
57+
public class TestIcebergSourceInferParallelism {
58+
private static final int NUM_TMS = 2;
59+
private static final int SLOTS_PER_TM = 2;
60+
private static final int PARALLELISM = NUM_TMS * SLOTS_PER_TM;
61+
private static final int MAX_INFERRED_PARALLELISM = 3;
62+
63+
@RegisterExtension
64+
private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
65+
new MiniClusterExtension(
66+
new MiniClusterResourceConfiguration.Builder()
67+
.setNumberTaskManagers(NUM_TMS)
68+
.setNumberSlotsPerTaskManager(SLOTS_PER_TM)
69+
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
70+
.build());
71+
72+
@RegisterExtension
73+
protected static final HadoopCatalogExtension CATALOG_EXTENSION =
74+
new HadoopCatalogExtension(TestFixtures.DATABASE, TestFixtures.TABLE);
75+
76+
@TempDir private Path tmpDir;
77+
78+
private Table table;
79+
private GenericAppenderHelper dataAppender;
80+
81+
@BeforeEach
82+
public void before() throws IOException {
83+
this.table =
84+
CATALOG_EXTENSION.catalog().createTable(TestFixtures.TABLE_IDENTIFIER, TestFixtures.SCHEMA);
85+
this.dataAppender = new GenericAppenderHelper(table, FileFormat.PARQUET, tmpDir);
86+
}
87+
88+
@AfterEach
89+
public void after() {
90+
CATALOG_EXTENSION.catalog().dropTable(TestFixtures.TABLE_IDENTIFIER);
91+
}
92+
93+
@Test
94+
public void testEmptyTable() throws Exception {
95+
// Inferred parallelism should be at least 1 even if table is empty
96+
test(1, 0);
97+
}
98+
99+
@Test
100+
public void testTableWithFilesLessThanMaxInferredParallelism() throws Exception {
101+
// Append files to the table
102+
for (int i = 0; i < 2; ++i) {
103+
List<Record> batch = RandomGenericData.generate(table.schema(), 1, 0);
104+
dataAppender.appendToTable(batch);
105+
}
106+
107+
// Inferred parallelism should equal to 2 splits
108+
test(2, 2);
109+
}
110+
111+
@Test
112+
public void testTableWithFilesMoreThanMaxInferredParallelism() throws Exception {
113+
// Append files to the table
114+
for (int i = 0; i < MAX_INFERRED_PARALLELISM + 1; ++i) {
115+
List<Record> batch = RandomGenericData.generate(table.schema(), 1, 0);
116+
dataAppender.appendToTable(batch);
117+
}
118+
119+
// Inferred parallelism should be capped by the MAX_INFERRED_PARALLELISM
120+
test(MAX_INFERRED_PARALLELISM, MAX_INFERRED_PARALLELISM + 1);
121+
}
122+
123+
private void test(int expectedParallelism, int expectedRecords) throws Exception {
124+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
125+
env.setParallelism(PARALLELISM);
126+
127+
Configuration config = new Configuration();
128+
config.set(FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM, true);
129+
config.set(
130+
FlinkConfigOptions.TABLE_EXEC_ICEBERG_INFER_SOURCE_PARALLELISM_MAX,
131+
MAX_INFERRED_PARALLELISM);
132+
133+
DataStream<Row> dataStream =
134+
IcebergSource.forRowData()
135+
.tableLoader(CATALOG_EXTENSION.tableLoader())
136+
.table(table)
137+
.flinkConfig(config)
138+
// force one file per split
139+
.splitSize(1L)
140+
.buildStream(env)
141+
.map(new RowDataToRowMapper(FlinkSchemaUtil.convert(table.schema())));
142+
143+
DataStream.Collector<Row> collector = new DataStream.Collector<>();
144+
dataStream.collectAsync(collector);
145+
JobClient jobClient = env.executeAsync();
146+
try (CloseableIterator<Row> iterator = collector.getOutput()) {
147+
List<Row> result = Lists.newArrayList();
148+
while (iterator.hasNext()) {
149+
result.add(iterator.next());
150+
}
151+
152+
assertThat(result).hasSize(expectedRecords);
153+
verifySourceParallelism(
154+
expectedParallelism, miniCluster().getExecutionGraph(jobClient.getJobID()).get());
155+
}
156+
}
157+
158+
/**
159+
* Borrowed this approach from Flink {@code FileSourceTextLinesITCase} to get source parallelism
160+
* from execution graph.
161+
*/
162+
private static void verifySourceParallelism(
163+
int expectedParallelism, AccessExecutionGraph executionGraph) {
164+
AccessExecutionJobVertex sourceVertex =
165+
executionGraph.getVerticesTopologically().iterator().next();
166+
assertThat(sourceVertex.getParallelism()).isEqualTo(expectedParallelism);
167+
}
168+
169+
/**
170+
* Use reflection to get {@code InternalMiniClusterExtension} and {@code MiniCluster} to get
171+
* execution graph and source parallelism. Haven't find other way via public APIS.
172+
*/
173+
private static MiniCluster miniCluster() throws Exception {
174+
Field privateField =
175+
MiniClusterExtension.class.getDeclaredField("internalMiniClusterExtension");
176+
privateField.setAccessible(true);
177+
InternalMiniClusterExtension internalExtension =
178+
(InternalMiniClusterExtension) privateField.get(MINI_CLUSTER_EXTENSION);
179+
return internalExtension.getMiniCluster();
180+
}
181+
}

0 commit comments

Comments
 (0)