diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala index 01266eb2c85..1ccc9ef700c 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala @@ -284,9 +284,7 @@ object PrivilegesBuilder { val spec = getTableCommandSpec(command) val functionPrivAndOpType = spec.queries(plan) .map(plan => buildFunctions(plan, spark)) - functionPrivAndOpType.map(_._1) - .reduce(_ ++ _) - .foreach(functionPriv => inputObjs += functionPriv) + inputObjs ++= functionPrivAndOpType.flatMap(_._1) case plan => plan transformAllExpressions { case hiveFunction: Expression if isKnownFunction(hiveFunction) => diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala index 7772c86b788..70943187229 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala @@ -32,6 +32,7 @@ class AccessResource private (val objectType: ObjectType, val catalog: Option[St extends RangerAccessResourceImpl { implicit def asString(obj: Object): String = if (obj != null) obj.asInstanceOf[String] else null def getDatabase: String = getValue("database") + def getUdf: String = getValue("udf") def getTable: String = getValue("table") def getColumn: String = getValue("column") def getColumns: Seq[String] = { diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala index 288719f07bf..2caeca11304 100644 --- a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala @@ -45,6 +45,7 @@ class RangerSparkExtension extends (SparkSessionExtensions => Unit) { override def apply(v1: SparkSessionExtensions): Unit = { v1.injectCheckRule(AuthzConfigurationChecker) + v1.injectCheckRule(RuleFunctionAuthorization) v1.injectResolutionRule(_ => RuleReplaceShowObjectCommands) v1.injectResolutionRule(_ => RuleApplyPermanentViewMarker) v1.injectResolutionRule(_ => RuleApplyTypeOfMarker) diff --git a/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala new file mode 100644 index 00000000000..d4b16558d54 --- /dev/null +++ b/extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleFunctionAuthorization.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.plugin.spark.authz.ranger + +import scala.collection.mutable + +import org.apache.ranger.plugin.policyengine.RangerAccessRequest +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan + +import org.apache.kyuubi.plugin.spark.authz._ +import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType.AccessType +import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._ +import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._ + +case class RuleFunctionAuthorization(spark: SparkSession) extends (LogicalPlan => Unit) { + override def apply(plan: LogicalPlan): Unit = { + val auditHandler = new SparkRangerAuditHandler + val ugi = getAuthzUgi(spark.sparkContext) + val (inputs, _, opType) = PrivilegesBuilder.buildFunctions(plan, spark) + + // Use a HashSet to deduplicate the same AccessResource and AccessType, the requests will be all + // the non-duplicate requests and in the same order as the input requests. + val requests = new mutable.ArrayBuffer[AccessRequest]() + val requestsSet = new mutable.HashSet[(AccessResource, AccessType)]() + + def addAccessRequest(objects: Iterable[PrivilegeObject], isInput: Boolean): Unit = { + objects.foreach { obj => + val resource = AccessResource(obj, opType) + val accessType = ranger.AccessType(obj, opType, isInput) + if (accessType != AccessType.NONE && !requestsSet.contains((resource, accessType))) { + requests += AccessRequest(resource, ugi, opType, accessType) + requestsSet.add(resource, accessType) + } + } + } + + addAccessRequest(inputs, isInput = true) + + val requestSeq: Seq[RangerAccessRequest] = + requests.map(_.asInstanceOf[RangerAccessRequest]).toSeq + + if (authorizeInSingleCall) { + verify(requestSeq, auditHandler) + } else { + requestSeq.foreach { req => + verify(Seq(req), auditHandler) + } + } + } +} diff --git a/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala b/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala index edff7d8a079..6ec44aa8d43 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala @@ -110,6 +110,7 @@ class PolicyJsonFileGenerator extends AnyFunSuite { policyAccessForPermViewAccessOnly, policyAccessForTable2AccessOnly, policyAccessForPaimonNsTable1SelectOnly, + policyAccessForDefaultDbUDF, // row filter policyFilterForSrcTableKeyLessThan20, policyFilterForPermViewKeyLessThan20, @@ -371,4 +372,20 @@ class PolicyJsonFileGenerator extends AnyFunSuite { users = List(table1OnlyUserForNs), accesses = allowTypes(select), delegateAdmin = true))) + + private val policyAccessForDefaultDbUDF = KRangerPolicy( + name = "defaultdb_udf", + description = "Policy for default db udf", + resources = Map( + databaseRes(defaultDb), + "udf" -> KRangerPolicyResource(values = List("kyuubi_func*"))), + policyItems = List( + KRangerPolicyItem( + users = List(bob), + accesses = allowTypes(select, update, create, drop, alter, index, lock, all, read, write), + delegateAdmin = true), + KRangerPolicyItem( + users = List(kent), + accesses = allowTypes(select), + delegateAdmin = true))) } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json index 16bdd5087a2..c5bd2f28dec 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json +++ b/extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json @@ -510,6 +510,72 @@ "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", + "name" : "defaultdb_udf", + "policyType" : 0, + "policyPriority" : 0, + "description" : "Policy for default db udf", + "isAuditEnabled" : true, + "resources" : { + "database" : { + "values" : [ "default" ], + "isExcludes" : false, + "isRecursive" : false + }, + "udf" : { + "values" : [ "kyuubi_func*" ], + "isExcludes" : false, + "isRecursive" : false + } + }, + "policyItems" : [ { + "accesses" : [ { + "type" : "select", + "isAllowed" : true + }, { + "type" : "update", + "isAllowed" : true + }, { + "type" : "create", + "isAllowed" : true + }, { + "type" : "drop", + "isAllowed" : true + }, { + "type" : "alter", + "isAllowed" : true + }, { + "type" : "index", + "isAllowed" : true + }, { + "type" : "lock", + "isAllowed" : true + }, { + "type" : "all", + "isAllowed" : true + }, { + "type" : "read", + "isAllowed" : true + }, { + "type" : "write", + "isAllowed" : true + } ], + "users" : [ "bob" ], + "delegateAdmin" : true + }, { + "accesses" : [ { + "type" : "select", + "isAllowed" : true + } ], + "users" : [ "kent" ], + "delegateAdmin" : true + } ], + "isDenyAllElse" : false + }, { + "id" : 11, + "guid" : "6512bd43-d9ca-36e0-ac99-0b0a82652dca", + "isEnabled" : true, + "version" : 1, + "service" : "hive_jenkins", "name" : "src_key_less_than_20", "policyType" : 2, "policyPriority" : 0, @@ -539,8 +605,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 11, - "guid" : "6512bd43-d9ca-36e0-ac99-0b0a82652dca", + "id" : 12, + "guid" : "c20ad4d7-6fe9-3759-aa27-a0c99bff6710", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", @@ -573,8 +639,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 12, - "guid" : "c20ad4d7-6fe9-3759-aa27-a0c99bff6710", + "id" : 13, + "guid" : "c51ce410-c124-310e-8db5-e4b97fc2af39", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", @@ -612,8 +678,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 13, - "guid" : "c51ce410-c124-310e-8db5-e4b97fc2af39", + "id" : 14, + "guid" : "aab32389-22bc-325a-af60-6eb525ffdc56", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", @@ -651,8 +717,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 14, - "guid" : "aab32389-22bc-325a-af60-6eb525ffdc56", + "id" : 15, + "guid" : "9bf31c7f-f062-336a-96d3-c8bd1f8f2ff3", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", @@ -690,8 +756,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 15, - "guid" : "9bf31c7f-f062-336a-96d3-c8bd1f8f2ff3", + "id" : 16, + "guid" : "c74d97b0-1eae-357e-84aa-9d5bade97baf", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", @@ -729,8 +795,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 16, - "guid" : "c74d97b0-1eae-357e-84aa-9d5bade97baf", + "id" : 17, + "guid" : "70efdf2e-c9b0-3607-9795-c442636b55fb", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", @@ -768,8 +834,8 @@ } ], "isDenyAllElse" : false }, { - "id" : 17, - "guid" : "70efdf2e-c9b0-3607-9795-c442636b55fb", + "id" : 18, + "guid" : "6f4922f4-5568-361a-8cdf-4ad2299f6d23", "isEnabled" : true, "version" : 1, "service" : "hive_jenkins", diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala index ad4b57faa93..5d8a824b4fc 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala @@ -193,4 +193,89 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite } } + test("Built in and UDF Function Call Query") { + val plan = sql( + s""" + |SELECT + | kyuubi_fun_0('TESTSTRING') AS col1, + | kyuubi_fun_0(value) AS col2, + | abs(key) AS col3, abs(-100) AS col4, + | lower(value) AS col5,lower('TESTSTRING') AS col6 + |FROM $reusedTable + |""".stripMargin).queryExecution.analyzed + val (inputs, _, _) = PrivilegesBuilder.buildFunctions(plan, spark) + assert(inputs.size === 2) + inputs.foreach { po => + assert(po.actionType === PrivilegeObjectActionType.OTHER) + assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION) + assert(po.dbname startsWith reusedDb.toLowerCase) + assert(po.objectName startsWith functionNamePrefix.toLowerCase) + val accessType = ranger.AccessType(po, QUERY, isInput = true) + assert(accessType === AccessType.SELECT) + } + } + + test("Function Call in Create Table/View") { + val plan1 = sql( + s""" + |CREATE TABLE table1 AS + |SELECT + | kyuubi_fun_0('KYUUBI_TESTSTRING'), + | kyuubi_fun_0(value) + |FROM $reusedTable + |""".stripMargin).queryExecution.analyzed + val (inputs1, _, _) = PrivilegesBuilder.buildFunctions(plan1, spark) + assert(inputs1.size === 2) + inputs1.foreach { po => + assert(po.actionType === PrivilegeObjectActionType.OTHER) + assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION) + assert(po.dbname startsWith reusedDb.toLowerCase) + assert(po.objectName startsWith functionNamePrefix.toLowerCase) + val accessType = ranger.AccessType(po, QUERY, isInput = true) + assert(accessType === AccessType.SELECT) + } + val plan2 = sql("DROP TABLE IF EXISTS table1").queryExecution.analyzed + val (inputs2, _, _) = PrivilegesBuilder.buildFunctions(plan2, spark) + assert(inputs2.size === 0) + + val plan3 = sql( + s""" + |CREATE VIEW view1 AS SELECT + | kyuubi_fun_0('KYUUBI_TESTSTRING') AS fun1, + | kyuubi_fun_0(value) AS fun2 + |FROM $reusedTable + |""".stripMargin).queryExecution.analyzed + val (inputs3, _, _) = PrivilegesBuilder.buildFunctions(plan3, spark) + assert(inputs3.size === 2) + inputs3.foreach { po => + assert(po.actionType === PrivilegeObjectActionType.OTHER) + assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION) + assert(po.dbname startsWith reusedDb.toLowerCase) + assert(po.objectName startsWith functionNamePrefix.toLowerCase) + val accessType = ranger.AccessType(po, QUERY, isInput = true) + assert(accessType === AccessType.SELECT) + } + val plan4 = sql("DROP VIEW IF EXISTS view1").queryExecution.analyzed + val (inputs4, _, _) = PrivilegesBuilder.buildFunctions(plan4, spark) + assert(inputs4.size === 0) + } + + test("Function Call in INSERT OVERWRITE") { + val plan = sql( + s""" + |INSERT OVERWRITE TABLE $reusedTable + |SELECT key, kyuubi_fun_0(value) + |FROM $reusedPartTable + |""".stripMargin).queryExecution.analyzed + val (inputsUpdate, _, _) = PrivilegesBuilder.buildFunctions(plan, spark) + assert(inputsUpdate.size === 1) + inputsUpdate.foreach { po => + assert(po.actionType === PrivilegeObjectActionType.OTHER) + assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION) + assert(po.dbname startsWith reusedDb.toLowerCase) + assert(po.objectName startsWith functionNamePrefix.toLowerCase) + val accessType = ranger.AccessType(po, QUERY, isInput = true) + assert(accessType === AccessType.SELECT) + } + } } diff --git a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala index 1fdea0ed969..1859450fe08 100644 --- a/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala +++ b/extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtensionSuite.scala @@ -47,6 +47,7 @@ import org.apache.kyuubi.plugin.spark.authz.rule.Authorization.KYUUBI_AUTHZ_TAG import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._ import org.apache.kyuubi.util.AssertionUtils._ import org.apache.kyuubi.util.reflect.ReflectUtils._ + abstract class RangerSparkExtensionSuite extends AnyFunSuite with SparkSessionProvider with BeforeAndAfterAll with MysqlContainerEnv { // scalastyle:on @@ -218,8 +219,16 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite val e = intercept[AccessControlException](sql(create)) assert(e.getMessage === errorMessage("create", "mydb")) withCleanTmpResources(Seq((testDb, "database"))) { - doAs(admin, assert(Try { sql(create) }.isSuccess)) - doAs(admin, assert(Try { sql(alter) }.isSuccess)) + doAs( + admin, + assert(Try { + sql(create) + }.isSuccess)) + doAs( + admin, + assert(Try { + sql(alter) + }.isSuccess)) val e1 = intercept[AccessControlException](sql(alter)) assert(e1.getMessage === errorMessage("alter", "mydb")) val e2 = intercept[AccessControlException](sql(drop)) @@ -241,14 +250,34 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite assert(e.getMessage === errorMessage("create")) withCleanTmpResources(Seq((s"$db.$table", "table"))) { - doAs(bob, assert(Try { sql(create0) }.isSuccess)) - doAs(bob, assert(Try { sql(alter0) }.isSuccess)) + doAs( + bob, + assert(Try { + sql(create0) + }.isSuccess)) + doAs( + bob, + assert(Try { + sql(alter0) + }.isSuccess)) val e1 = intercept[AccessControlException](sql(drop0)) assert(e1.getMessage === errorMessage("drop")) - doAs(bob, assert(Try { sql(alter0) }.isSuccess)) - doAs(bob, assert(Try { sql(select).collect() }.isSuccess)) - doAs(kent, assert(Try { sql(s"SELECT key FROM $db.$table").collect() }.isSuccess)) + doAs( + bob, + assert(Try { + sql(alter0) + }.isSuccess)) + doAs( + bob, + assert(Try { + sql(select).collect() + }.isSuccess)) + doAs( + kent, + assert(Try { + sql(s"SELECT key FROM $db.$table").collect() + }.isSuccess)) Seq( select, @@ -272,13 +301,21 @@ abstract class RangerSparkExtensionSuite extends AnyFunSuite test("auth: functions") { val db = defaultDb val func = "func" - val create0 = s"CREATE FUNCTION IF NOT EXISTS $db.$func AS 'abc.mnl.xyz'" - doAs( - kent, { - val e = intercept[AccessControlException](sql(create0)) - assert(e.getMessage === errorMessage("create", "default/func")) - }) - doAs(admin, assert(Try(sql(create0)).isSuccess)) + withCleanTmpResources(Seq( + (func, "function"))) { + val create0 = s"CREATE FUNCTION IF NOT EXISTS $db.$func AS 'abc.mnl.xyz'" + doAs( + bob, { + val e = intercept[AccessControlException](sql(create0)) + assert(e.getMessage === errorMessage("create", s"$db/$func")) + }) + doAs( + kent, { + val e = intercept[AccessControlException](sql(create0)) + assert(e.getMessage === errorMessage("create", s"$db/$func")) + }) + doAs(admin, assert(Try(sql(create0)).isSuccess)) + } } test("show tables") { @@ -628,12 +665,18 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { // query all columns of the permanent view // with access privileges to the permanent view but no privilege to the source table val sql1 = s"SELECT * FROM $db1.$permView" - doAs(userPermViewOnly, { sql(sql1).collect() }) + doAs( + userPermViewOnly, { + sql(sql1).collect() + }) // query the second column of permanent view with multiple columns // with access privileges to the permanent view but no privilege to the source table val sql2 = s"SELECT name FROM $db1.$permView" - doAs(userPermViewOnly, { sql(sql2).collect() }) + doAs( + userPermViewOnly, { + sql(sql2).collect() + }) } } @@ -1542,4 +1585,37 @@ class HiveCatalogRangerSparkExtensionSuite extends RangerSparkExtensionSuite { } } } + + test("[KYUUBI #7186] Introduce RuleFunctionAuthorization") { + val db = defaultDb + val kyuubiFunc = "kyuubi_func1" + withCleanTmpResources(Seq( + (kyuubiFunc, "function"))) { + val createKyuubiFunc = + s""" + |CREATE FUNCTION IF NOT EXISTS + | $db.$kyuubiFunc + | AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDFMaskHash' + |""".stripMargin + doAs( + kent, { + val e = intercept[AccessControlException](sql(createKyuubiFunc)) + assert(e.getMessage === errorMessage("create", s"$db/$kyuubiFunc")) + }) + doAs(bob, assert(Try(sql(createKyuubiFunc)).isSuccess)) + doAs(admin, assert(Try(sql(createKyuubiFunc)).isSuccess)) + + val selectKyuubiFunc = + s""" + |SELECT $db.$kyuubiFunc("KYUUBUI_TEST_STRING")""".stripMargin + doAs( + alice, { + val e = intercept[AccessControlException](sql(selectKyuubiFunc)) + assert(e.getMessage === errorMessage("select", s"$db/$kyuubiFunc")) + }) + doAs(kent, assert(Try(sql(selectKyuubiFunc)).isSuccess)) + doAs(bob, assert(Try(sql(selectKyuubiFunc)).isSuccess)) + doAs(admin, assert(Try(sql(selectKyuubiFunc)).isSuccess)) + } + } }