diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala index 127faadb246..cce82bd9837 100644 --- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala +++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala @@ -26,8 +26,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.ListBuffer import org.apache.flink.client.cli.{CliFrontend, CustomCommandLine, DefaultCLI, GenericCLI} -import org.apache.flink.configuration.DeploymentOptions -import org.apache.flink.configuration.GlobalConfiguration +import org.apache.flink.configuration.{Configuration, DeploymentOptions, GlobalConfiguration} import org.apache.flink.table.client.SqlClientException import org.apache.flink.table.client.gateway.context.DefaultContext import org.apache.flink.util.JarUtils @@ -77,6 +76,8 @@ object FlinkSQLEngine extends Logging { try { val flinkConfDir = CliFrontend.getConfigurationDirectoryFromEnv val flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir) + val flinkConfFromKyuubi = kyuubiConf.getAllWithPrefix("flink", "") + flinkConf.addAll(Configuration.fromMap(flinkConfFromKyuubi.asJava)) val executionTarget = flinkConf.getString(DeploymentOptions.TARGET) // set cluster name for per-job and application mode diff --git a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/FlinkSQLEngineSuite.scala b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/FlinkSQLEngineSuite.scala new file mode 100644 index 00000000000..814a6d1ddc6 --- /dev/null +++ b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/FlinkSQLEngineSuite.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.it.flink + +import org.apache.kyuubi.config.KyuubiConf +import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TYPE, FRONTEND_THRIFT_BINARY_BIND_PORT} +import org.apache.kyuubi.operation.HiveJDBCTestHelper + +class FlinkSQLEngineSuite extends WithKyuubiServerAndFlinkMiniCluster with HiveJDBCTestHelper { + + override val conf: KyuubiConf = KyuubiConf() + .set(ENGINE_TYPE, "FLINK_SQL") + .set(FRONTEND_THRIFT_BINARY_BIND_PORT, 10029) + .set("flink.parallelism.default", "6") + + override protected def jdbcUrl: String = getJdbcUrl + + test("set kyuubi conf into flink conf") { + withJdbcStatement() { statement => + val resultSet = statement.executeQuery("SET") + // Flink does not support set key without value currently, + // thus read all rows to find the desired one + var success = false + while (resultSet.next() && success == false) { + if (resultSet.getString(1) == "parallelism.default" && + resultSet.getString(2) == "6") { + success = true + } + } + assert(success) + } + } +}