Skip to content

Commit

Permalink
[FLINK-25015][Table SQL] Use SQL string as jobName for DQL jobs submi…
Browse files Browse the repository at this point in the history
…tted by sql-gateway
  • Loading branch information
xiangyuf authored and KarmaGYZ committed Feb 19, 2024
1 parent 050503c commit dcb30f9
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1065,14 +1065,9 @@ private TableResultInternal executeQueryOperation(
QueryOperation operation,
CollectModifyOperation sinkOperation,
List<Transformation<?>> transformations) {
final String defaultJobName = "collect";

resourceManager.addJarConfiguration(tableConfig);

// We pass only the configuration to avoid reconfiguration with the rootConfiguration
Pipeline pipeline =
execEnv.createPipeline(
transformations, tableConfig.getConfiguration(), defaultJobName);
Pipeline pipeline = generatePipelineFromQueryOperation(operation, transformations);
try {
JobClient jobClient = execEnv.executeAsync(pipeline);
ResultProvider resultProvider = sinkOperation.getSelectResultProvider();
Expand Down Expand Up @@ -1185,6 +1180,23 @@ public TableResultInternal executeInternal(Operation operation) {
}
}

/** generate execution {@link Pipeline} from {@link QueryOperation}. */
@VisibleForTesting
public Pipeline generatePipelineFromQueryOperation(
QueryOperation operation, List<Transformation<?>> transformations) {
String defaultJobName = "collect";

try {
defaultJobName = operation.asSerializableString();
} catch (Throwable e) {
// ignore error for unsupported operations and use 'collect' as default job name
}

// We pass only the configuration to avoid reconfiguration with the rootConfiguration
return execEnv.createPipeline(
transformations, tableConfig.getConfiguration(), defaultJobName);
}

/**
* extract sink identifier names from {@link ModifyOperation}s and deduplicate them with {@link
* #deduplicateSinkIdentifierNames(List)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package org.apache.flink.table.api;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.operations.CollectModifyOperation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.test.program.SqlTestStep;
import org.apache.flink.table.test.program.TableApiTestStep;
import org.apache.flink.table.test.program.TableTestProgram;
Expand All @@ -29,6 +34,7 @@
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -78,6 +84,39 @@ void testSqlSerialization(TableTestProgram program) {
assertThat(table.getQueryOperation().asSerializableString()).isEqualTo(sqlStep.sql);
}

@ParameterizedTest
@MethodSource("supportedPrograms")
void testSqlAsJobNameForQueryOperation(TableTestProgram program) {
final TableEnvironmentImpl env = (TableEnvironmentImpl) setupEnv(program);

final TableApiTestStep tableApiStep =
(TableApiTestStep)
program.runSteps.stream()
.filter(s -> s instanceof TableApiTestStep)
.findFirst()
.get();

final SqlTestStep sqlStep =
(SqlTestStep)
program.runSteps.stream()
.filter(s -> s instanceof SqlTestStep)
.findFirst()
.get();

final Table table = tableApiStep.toTable(env);

QueryOperation queryOperation = table.getQueryOperation();
CollectModifyOperation sinkOperation = new CollectModifyOperation(queryOperation);
List<Transformation<?>> transformations =
env.getPlanner().translate(Collections.singletonList(sinkOperation));

StreamGraph streamGraph =
(StreamGraph)
env.generatePipelineFromQueryOperation(queryOperation, transformations);

assertThat(sqlStep.sql).isEqualTo(streamGraph.getJobName());
}

private static TableEnvironment setupEnv(TableTestProgram program) {
final TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
final Map<String, String> connectorOptions = new HashMap<>();
Expand Down

0 comments on commit dcb30f9

Please sign in to comment.