Skip to content

Commit

Permalink
[AMORO-1035][Flink] Support customizing mixed-format table source par…
Browse files Browse the repository at this point in the history
…allelism (#1973)

* [AMORO-1035][Flink] Support customizing source parallelism for mixed-format table

* [AMORO-1035][Flink] Support customizing source parallelism for mixed-format table
  • Loading branch information
YesOrNo828 authored Jan 22, 2024
1 parent 6ce5960 commit 962bc2f
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 17 deletions.
7 changes: 7 additions & 0 deletions docs/engines/flink/flink-dml.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ SET execution.runtime-mode = streaming;
-- Run Flink tasks in batch mode in the current session
SET execution.runtime-mode = batch;
```

The following Hint Options are supported:

| Key | Default Value | Type | Required | Description |
|--------------------|---------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| source.parallelism | (none) | Integer | No | Defines a custom parallelism for the source. By default, if this option is not defined, the planner will derive the parallelism for each statement individually by also considering the global configuration. |

### Batch mode
Use batch mode to read full and incremental data from FileStore.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.netease.arctic.flink.FlinkSchemaUtil.filterWatermark;
import static com.netease.arctic.flink.FlinkSchemaUtil.toRowType;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.DIM_TABLE_ENABLE;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_PARALLELISM;

import com.netease.arctic.flink.interceptor.ProxyFactory;
import com.netease.arctic.flink.read.ArcticSource;
Expand Down Expand Up @@ -184,17 +185,19 @@ public DataStream<RowData> build() {
scanContext.caseSensitive(),
arcticTable.io());

int scanParallelism = flinkConf.getOptional(SCAN_PARALLELISM).orElse(env.getParallelism());
DataStreamSource<RowData> sourceStream =
env.fromSource(
new ArcticSource<>(
tableLoader,
scanContext,
rowDataReaderFunction,
InternalTypeInfo.of(rowType),
arcticTable.name(),
dimTable),
watermarkStrategy,
ArcticSource.class.getName());
new ArcticSource<>(
tableLoader,
scanContext,
rowDataReaderFunction,
InternalTypeInfo.of(rowType),
arcticTable.name(),
dimTable),
watermarkStrategy,
ArcticSource.class.getName())
.setParallelism(scanParallelism);
context.generateUid(ARCTIC_FILE_TRANSFORMATION).ifPresent(sourceStream::uid);
return sourceStream;
}
Expand Down Expand Up @@ -225,6 +228,7 @@ public DataStream<RowData> buildUnkeyedTableSource() {
private DataStream<RowData> wrapKrb(DataStream<RowData> ds) {
IcebergClassUtil.clean(env);
Transformation origin = ds.getTransformation();
int scanParallelism = flinkConf.getOptional(SCAN_PARALLELISM).orElse(origin.getParallelism());

if (origin instanceof OneInputTransformation) {
OneInputTransformation<RowData, RowData> tf =
Expand All @@ -236,7 +240,7 @@ private DataStream<RowData> wrapKrb(DataStream<RowData> ds) {
if (tf.getInputs().isEmpty()) {
return env.addSource(
new UnkeyedInputFormatSourceFunction(inputFormatProxyFactory, tf.getOutputType()))
.setParallelism(tf.getParallelism());
.setParallelism(scanParallelism);
}

LegacySourceTransformation tfSource = (LegacySourceTransformation) tf.getInputs().get(0);
Expand All @@ -248,10 +252,12 @@ private DataStream<RowData> wrapKrb(DataStream<RowData> ds) {
DataStreamSource sourceStream =
env.addSource(functionProxy, tfSource.getName(), tfSource.getOutputType());
context.generateUid(ARCTIC_FILE_TRANSFORMATION).ifPresent(sourceStream::uid);
return sourceStream.transform(
tf.getName(),
tf.getOutputType(),
new UnkeyedInputFormatOperatorFactory(inputFormatProxyFactory));
return sourceStream
.setParallelism(scanParallelism)
.transform(
tf.getName(),
tf.getOutputType(),
new UnkeyedInputFormatOperatorFactory(inputFormatProxyFactory));
}

LegacySourceTransformation tfSource = (LegacySourceTransformation) origin;
Expand All @@ -263,7 +269,7 @@ private DataStream<RowData> wrapKrb(DataStream<RowData> ds) {
(InputFormat) ProxyUtil.getProxy(function.getFormat(), arcticTable.io());
DataStreamSource sourceStream =
env.createInput(inputFormatProxy, tfSource.getOutputType())
.setParallelism(origin.getParallelism());
.setParallelism(scanParallelism);
context.generateUid(ARCTIC_FILE_TRANSFORMATION).ifPresent(sourceStream::uid);
return sourceStream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.ARCTIC_LOG_CONSUMER_CHANGELOG_MODE;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.LOG_CONSUMER_CHANGELOG_MODE_ALL_KINDS;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.LOG_CONSUMER_CHANGELOG_MODE_APPEND_ONLY;
import static com.netease.arctic.flink.table.descriptors.ArcticValidator.SCAN_PARALLELISM;
import static org.apache.flink.table.connector.ChangelogMode.insertOnly;

import com.netease.arctic.flink.read.source.log.kafka.LogKafkaSource;
Expand Down Expand Up @@ -165,8 +166,11 @@ public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv)
if (watermarkStrategy == null) {
watermarkStrategy = WatermarkStrategy.noWatermarks();
}
return execEnv.fromSource(
kafkaSource, watermarkStrategy, "LogStoreSource-" + arcticTable.name());
int scanParallelism =
tableOptions.getOptional(SCAN_PARALLELISM).orElse(execEnv.getParallelism());
return execEnv
.fromSource(kafkaSource, watermarkStrategy, "LogStoreSource-" + arcticTable.name())
.setParallelism(scanParallelism);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,15 @@ public class ArcticValidator extends ConnectorDescriptorValidator {
TableFormat.PAIMON,
TableFormat.MIXED_ICEBERG));

public static final ConfigOption<Integer> SCAN_PARALLELISM =
ConfigOptions.key("source.parallelism")
.intType()
.noDefaultValue()
.withDescription(
"Defines a custom parallelism for the source. "
+ "By default, if this option is not defined, the planner will derive the parallelism "
+ "for each statement individually by also considering the global configuration.");

@Override
public void validate(DescriptorProperties properties) {
String emitMode = properties.getString(ARCTIC_EMIT_MODE.key());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ public void testSinkSourceFile() throws IOException {
+ "/*+ OPTIONS("
+ "'arctic.read.mode'='file'"
+ ", 'streaming'='false'"
+ ", 'source.parallelism'='2'"
+ ")*/");

List<Object[]> expected = new LinkedList<>();
Expand Down Expand Up @@ -347,6 +348,7 @@ public void testUnpartitionLogSinkSource() throws Exception {
+ "/*+ OPTIONS("
+ "'arctic.read.mode'='log'"
+ ", 'scan.startup.mode'='earliest'"
+ ", 'source.parallelism'='2'"
+ ")*/");

Set<Row> actual = new HashSet<>();
Expand Down

0 comments on commit 962bc2f

Please sign in to comment.