Skip to content

Commit

Permalink
[KYUUBI #2203][FLINK] Support flink conf set by kyuubi conf file
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

Support setting Flink configuration by setting `kyuubi-default.conf`.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2206 from deadwind4/KYUUBI-2203.

Closes #2203

eeeb91c [Ada] fix IT case
5eefba5 [Ada] add IT case
86fc57e [Ada] [KYUUBI #2203][engine/flink] Support flink conf set by kyuubi conf file

Authored-by: Ada <wang4luning@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
a49a authored and pan3793 committed Mar 24, 2022
1 parent 88e0bcd commit c09cd65
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

import org.apache.flink.client.cli.{CliFrontend, CustomCommandLine, DefaultCLI, GenericCLI}
import org.apache.flink.configuration.DeploymentOptions
import org.apache.flink.configuration.GlobalConfiguration
import org.apache.flink.configuration.{Configuration, DeploymentOptions, GlobalConfiguration}
import org.apache.flink.table.client.SqlClientException
import org.apache.flink.table.client.gateway.context.DefaultContext
import org.apache.flink.util.JarUtils
Expand Down Expand Up @@ -77,6 +76,8 @@ object FlinkSQLEngine extends Logging {
try {
val flinkConfDir = CliFrontend.getConfigurationDirectoryFromEnv
val flinkConf = GlobalConfiguration.loadConfiguration(flinkConfDir)
val flinkConfFromKyuubi = kyuubiConf.getAllWithPrefix("flink", "")
flinkConf.addAll(Configuration.fromMap(flinkConfFromKyuubi.asJava))

val executionTarget = flinkConf.getString(DeploymentOptions.TARGET)
// set cluster name for per-job and application mode
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.it.flink

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_TYPE, FRONTEND_THRIFT_BINARY_BIND_PORT}
import org.apache.kyuubi.operation.HiveJDBCTestHelper

class FlinkSQLEngineSuite extends WithKyuubiServerAndFlinkMiniCluster with HiveJDBCTestHelper {

override val conf: KyuubiConf = KyuubiConf()
.set(ENGINE_TYPE, "FLINK_SQL")
.set(FRONTEND_THRIFT_BINARY_BIND_PORT, 10029)
.set("flink.parallelism.default", "6")

override protected def jdbcUrl: String = getJdbcUrl

test("set kyuubi conf into flink conf") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("SET")
// Flink does not support set key without value currently,
// thus read all rows to find the desired one
var success = false
while (resultSet.next() && success == false) {
if (resultSet.getString(1) == "parallelism.default" &&
resultSet.getString(2) == "6") {
success = true
}
}
assert(success)
}
}
}

0 comments on commit c09cd65

Please sign in to comment.