-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-37018][SQL] Spark SQL should support create function with Aggregator #34303
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144330 has finished for PR 34303 at commit
|
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveSQLViewSuite.scala
Outdated
Show resolved
Hide resolved
…veSQLViewSuite.scala Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
|
ping @cloud-fan |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144346 has finished for PR 34303 at commit
|
| // ScalaAggregator or Hive UDF/UDAF/UDTF with function definition. Otherwise, | ||
| // we just throw it earlier. | ||
| // Unfortunately we need to use reflection here because Aggregator | ||
| // and ScalaAggregator are defined in sql/core module. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
classes in sql/core are available in sql/hive. What's the problem you hit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or you want to move the code to SessionCatalog? then reflection makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh. Thank you for your remind.
|
Test build #144365 has finished for PR 34303 at commit
|
|
retest this please |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144366 has finished for PR 34303 at commit
|
| deserializer, | ||
| ClassTag(cls)) | ||
|
|
||
| val e = classOf[ScalaAggregator[_, _, _]].getConstructor(classOf[Seq[Expression]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we just new ScalaAggregator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your reminder.
| val baseClassType = typeOf[Aggregator[_, _, _]].typeSymbol.asClass | ||
| val baseType = internal.thisType(classType).baseType(baseClassType) | ||
| val tpe = baseType.typeArgs.head | ||
| val cls = mirror.runtimeClass(tpe) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you copy the code above from somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code references
spark/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
Line 55 in 4072a22
| val serializer = ScalaReflection.serializerForType(tpe) |
| functionName -> true) { | ||
| // create a function in default database | ||
| sql("USE DEFAULT") | ||
| sql(s"CREATE FUNCTION $functionName AS '$avgFuncClass'") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we do some basic test to make sure the function can be called? and with compatible input types to test implicit cast, incompatible input types to make sure the type check works.
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144393 has finished for PR 34303 at commit
|
… sql/core ### What changes were proposed in this pull request? This PR adds a new internal interface `FunctionExpressionBuilder`, to replace `SessionCatalog.makeFunctionExpression`. Then we can put the interface implementation in sql/core, to avoid using reflection in `SessionCatalog.makeFunctionExpression`, because the class `UserDefinedAggregateFunction` is not available in sql/catalyst. ### Why are the changes needed? code cleanup, and make it easier to support using `Aggregator` as UDAF later (#34303). ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes #34340 from cloud-fan/function. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…sql/core This PR adds a new internal interface `FunctionExpressionBuilder`, to replace `SessionCatalog.makeFunctionExpression`. Then we can put the interface implementation in sql/core, to avoid using reflection in `SessionCatalog.makeFunctionExpression`, because the class `UserDefinedAggregateFunction` is not available in sql/catalyst. code cleanup, and make it easier to support using `Aggregator` as UDAF later (apache#34303). no existing tests Closes apache#34340 from cloud-fan/function. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Spark SQL not supports to create function of
Aggregatoryet and deprecatedUserDefinedAggregateFunction.If we want remove
UserDefinedAggregateFunction, Spark SQL should provide a new option.Why are the changes needed?
We need to provide a new way to create user defined aggregate function so as remove
UserDefinedAggregateFunctionin future.Does this PR introduce any user-facing change?
Yes. Users will create user defined aggregate function by implement
Aggregator.How was this patch tested?
New tests.