Skip to content

Commit

Permalink
[KYUUBI #307]GetCatalogs supports DSv2 and keeps its backward compati…
Browse files Browse the repository at this point in the history
…bility (#307)

![yaooqinn](https://badgen.net/badge/Hello/yaooqinn/green) [![PR 307](https://badgen.net/badge/Preview/PR%20307/blue)](https://github.com/yaooqinn/kyuubi/pull/307) ![Feature](https://badgen.net/badge/Label/Feature/) [&#10088;?&#10089;](https://pullrequestbadge.com/?utm_medium=github&utm_source=yaooqinn&utm_campaign=badge_info)<!-- PR-BADGE: PLEASE DO NOT REMOVE THIS COMMENT -->

<!--
Thanks for sending a pull request!  Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
-->

### Please add issue ID here?
<!-- replace ${issue ID} with the actual issue id -->
Fixes #307

### Why are the changes needed?
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the user case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->

GetCatalogs supports DSv2 and keeps its backward compatibility

### Test Plan:
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
![image](https://user-images.githubusercontent.com/8326978/106161043-7259c400-61c1-11eb-9beb-3326f6093284.png)

- [x] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

* GetCatalogs supports DSv2

* pr template

* nit

* nit

* shim

* add iceberg tests

* nit
  • Loading branch information
yaooqinn authored Jan 29, 2021
1 parent 05c64fe commit efdd9fa
Show file tree
Hide file tree
Showing 15 changed files with 282 additions and 24 deletions.
31 changes: 21 additions & 10 deletions .github/PULL_REQUEST_TEMPLATE
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
<!--
Thanks for sending a pull request! Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
Thanks for sending a pull request!

Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
-->

### _Which issue are you going to fix?_
<!--
Replace ${ID} below with the actual issue id from
https://github.com/yaooqinn/kyuubi/issues,
so that the issue will be linked and automatically closed after merging
-->

### Please add issue ID here?
<!-- replace ${issue ID} with the actual issue id -->
Fixes #${issue ID}
Fixes #${ID}

### Why are the changes needed?
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the user case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->

### Test Plan:
- Add some test cases that check the changes thoroughly including negative and positive cases if possible
- Add screenshots for manual tests if appropriate
- [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/tools/testing.html#running-tests) locally before make a pull request
16 changes: 14 additions & 2 deletions .github/pr-badge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,22 @@
color: "green"

- label: "Preview"
message: "PR $prNumber"
message: "Closes%20#$prNumber"
color: "blue"
url: "https://github.com/yaooqinn/kyuubi/pull/$prNumber"

- label: "+"
message: "$additions"
color: "red"

- label: "-"
message: "$deletions"
color: "green"

- label: "commits"
message: "$commits"
color: "yellow"

- label: "Missing"
message: "Target Issue"
color: "#ff0000"
Expand All @@ -15,7 +27,7 @@
- label: "Missing"
message: "Test Plan"
color: "#ff0000"
when: "$payload.pull_request.body.includes('## Test Plan') === false"
when: "$payload.pull_request.body.includes('- [x]') === false"

- label: "Label"
message: "Feature"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

package org.apache.kyuubi.engine.spark.operation

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.engine.spark.shim.SparkShim
import org.apache.kyuubi.operation.OperationType
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
import org.apache.kyuubi.session.Session
Expand All @@ -33,8 +34,8 @@ class GetCatalogs(spark: SparkSession, session: Session)
}

override protected def runInternal(): Unit = {
iter = Seq(
Row(spark.sessionState.catalogManager.currentCatalog.name())
).toList.iterator
try {
iter = SparkShim().getCatalogs(spark).toIterator
} catch onError()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.kyuubi.engine.spark.operation

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.types.StructType

import org.apache.kyuubi.operation.OperationType
Expand All @@ -33,6 +32,6 @@ class GetTableTypes(spark: SparkSession, session: Session)
}

override protected def runInternal(): Unit = {
iter = CatalogTableType.tableTypes.map(t => Row(t.name)).toList.iterator
iter = Seq("EXTERNAL", "MANAGED", "VIEW").map(Row(_)).toList.iterator
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.shim

import org.apache.spark.sql.{Row, SparkSession}

class Shim_v2_4 extends SparkShim {
override def getCatalogs(ss: SparkSession): Seq[Row] = {
Seq(Row("spark_catalog"))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.shim

import org.apache.spark.sql.{Row, SparkSession}

class Shim_v3_0 extends Shim_v2_4 {

override def getCatalogs(ss: SparkSession): Seq[Row] = {
val sessionState = getSessionState(ss)

// A [[CatalogManager]] is session unique
val catalogMgr = invoke(sessionState, "catalogManager")
// get the custom v2 session catalog or default spark_catalog
val sessionCatalog = invoke(catalogMgr, "v2SessionCatalog")
val defaultCatalog = invoke(catalogMgr, "currentCatalog")

val defaults = Seq(sessionCatalog, defaultCatalog).distinct
.map(invoke(_, "name").asInstanceOf[String])
val catalogs = getField(catalogMgr, "catalogs")
.asInstanceOf[scala.collection.Map[String, _]]
(catalogs.keys ++: defaults).distinct.map(Row(_))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kyuubi.engine.spark.shim

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.kyuubi.{Logging, Utils}

/**
* A shim that defines the interface interact with Spark's catalogs
*/
trait SparkShim extends Logging {

/**
* Get all register catalogs in Spark's `CatalogManager`
*/
def getCatalogs(ss: SparkSession): Seq[Row]

protected def getSessionState(ss: SparkSession): Any = {
invoke(classOf[SparkSession], ss, "sessionState")
}

protected def invoke(
obj: Any,
methodName: String,
args: (Class[_], AnyRef)*): Any = {
val (types, values) = args.unzip
val method = obj.getClass.getDeclaredMethod(methodName, types: _*)
method.setAccessible(true)
method.invoke(obj, values.toSeq: _*)
}

protected def invoke(
clazz: Class[_],
obj: AnyRef,
methodName: String,
args: (Class[_], AnyRef)*): AnyRef = {
val (types, values) = args.unzip
val method = clazz.getDeclaredMethod(methodName, types: _*)
method.setAccessible(true)
method.invoke(obj, values.toSeq: _*)
}

protected def getField(o: Any, fieldName: String): Any = {
val field = o.getClass.getDeclaredField(fieldName)
field.setAccessible(true)
field.get(o)
}
}

object SparkShim {
def apply(): SparkShim = {
val runtimeSparkVer = org.apache.spark.SPARK_VERSION
val (major, minor) = Utils.majorMinorVersion(runtimeSparkVer)
(major, minor) match {
case (3, _) => new Shim_v3_0
case (2, _) => new Shim_v2_4
case _ => throw new IllegalArgumentException(s"Not Support spark version $runtimeSparkVer")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SparkSQLEngineListenerSuite extends KyuubiFunSuite {
.builder().master("local").config("spark.ui.port", "0").getOrCreate()

val engine = new SparkSQLEngine(spark)
engine.initialize(KyuubiConf())
engine.initialize(KyuubiConf().set(KyuubiConf.FRONTEND_BIND_PORT, 0))
engine.start()
assert(engine.getServiceState === ServiceState.STARTED)
spark.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,10 @@ package object session {
val HIVE_VAR_PREFIX: Regex = """set:hivevar:([^=]+)""".r
val HIVE_CONF_PREFIX: Regex = """set:hiveconf:([^=]+)""".r

val ENV_PREFIX = "env:"
val SYSTEM_PREFIX = "system:"
val HIVECONF_PREFIX = "hiveconf:"
val HIVEVAR_PREFIX = "hivevar:"
val METACONF_PREFIX = "metaconf:"

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ServiceDiscoverySuite extends KerberizedTestHelper {
.unset(KyuubiConf.SERVER_PRINCIPAL)
.set(HA_ZK_QUORUM, zkServer.getConnectString)
.set(HA_ZK_NAMESPACE, namespace)
.set(KyuubiConf.FRONTEND_BIND_PORT, 0)

val server: Serverable = new NoopServer()
server.initialize(conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,14 @@ class EmbeddedZkServerSuite extends KyuubiFunSuite {
assert(zkServer.getName === zkServer.getClass.getSimpleName)
assert(zkServer.getServiceState === LATENT)
val conf = KyuubiConf()
conf.set(KyuubiConf.EMBEDDED_ZK_PORT, 0)
zkServer.stop() // only for test coverage
zkServer.initialize(conf)
assert(zkServer.getConf === conf)
assert(zkServer.getServiceState === INITIALIZED)
assert(zkServer.getConnectString.endsWith("2181"))
assert(zkServer.getStartTime === 0)
zkServer.start()
assert(zkServer.getServiceState === STARTED)
assert(zkServer.getConnectString.endsWith("2181"))
assert(zkServer.getStartTime !== 0)
zkServer.stop()
assert(zkServer.getServiceState === STOPPED)
Expand All @@ -50,7 +49,7 @@ class EmbeddedZkServerSuite extends KyuubiFunSuite {
test("connect test with embedded zookeeper") {
val zkServer = new EmbeddedZkServer()
assert(zkServer.getConnectString === null)
zkServer.initialize(KyuubiConf())
zkServer.initialize(KyuubiConf().set(KyuubiConf.EMBEDDED_ZK_PORT, 0))
zkServer.start()

val zkClient = CuratorFrameworkFactory.builder()
Expand Down
6 changes: 6 additions & 0 deletions kyuubi-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@
<artifactId>netty</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>${iceberg.name}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,21 @@ class KyuubiSessionImpl(

private def mergeConf(): Unit = {
conf.foreach {
case (HIVE_VAR_PREFIX(key), value) => sessionConf.set(key, value)
case (HIVE_CONF_PREFIX(key), value) => sessionConf.set(key, value)
case (k, v) if k.startsWith("set:") =>
val newKey = k.substring(4)
if (newKey.startsWith(SYSTEM_PREFIX)) {
sessionConf.set(newKey.substring(SYSTEM_PREFIX.length), v)
} else if (newKey.startsWith(HIVECONF_PREFIX)) {
sessionConf.set(newKey.substring(HIVECONF_PREFIX.length), v)
} else if (newKey.startsWith(HIVEVAR_PREFIX)) {
sessionConf.set(newKey.substring(HIVEVAR_PREFIX.length), v)
} else if (newKey.startsWith(METACONF_PREFIX)) {
sessionConf.set(newKey.substring(METACONF_PREFIX.length), v)
} else if (newKey.startsWith(SYSTEM_PREFIX)) {
// do nothing
} else {
sessionConf.set(k, v)
}
case ("use:database", _) =>
case (key, value) => sessionConf.set(key, value)
}
Expand Down
Loading

0 comments on commit efdd9fa

Please sign in to comment.