Skip to content

Conversation

@ulysses-you
Copy link
Contributor

@ulysses-you ulysses-you commented Sep 14, 2020

What changes were proposed in this pull request?

This pr aims to support Hive UDF when input type is not expected. We make HiveSimpleUDF extends ImplicitCastInputTypes then using Analyzer to implicit cast input data type.

Why are the changes needed?

It's a compatible issue. In normal case, data type is converted by Hive ObjectInspector at running time. But Hive not support input decimal type when method required double type. Unfortunately the default type of 1.1 is different between Spark and Hive, which are decimal and double. Then caused this issue.

Before this pr, we failed in this code.

class ArraySumUDF extends UDF {
  import scala.collection.JavaConverters._
  def evaluate(values: java.util.List[java.lang.Double]): java.lang.Double = {
    var r = 0d
    for (v <- values.asScala) {
      r += v
    }
    r
  }
}

sql(s"CREATE FUNCTION testArraySum AS '${classOf[ArraySumUDF].getName}'")
sql("SELECT testArraySum(array(1, 1.1, 1.2))")

-- error msg
Error in query: No handler for UDF/UDAF/UDTF 'ArraySumUDF': org.apache.hadoop.hive.ql.exec.NoMatchingMethodException: No matching method for class ArraySumUDF with (array<decimal(11,1)>). Possible choices: _FUNC_(array<double>)  ; line 1 pos 7

Does this PR introduce any user-facing change?

Yes, user can get correct result.

How was this patch tested?

add test.

children.map(_.dataType.toTypeInfo).asJava, methods.toSeq.asJava)
}
// if there exists many method, we choose the first
methods.head
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, UDF just has one method of evaluate.

@SparkQA
Copy link

SparkQA commented Sep 14, 2020

Test build #128642 has finished for PR 29749 at commit e692b9f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 15, 2020

Test build #128674 has finished for PR 29749 at commit e73ccbf.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 15, 2020

Test build #128696 has finished for PR 29749 at commit f9e3d57.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.


override def inputTypes: Seq[AbstractDataType] = {
val inTypes = children.map(_.dataType)
if (!inTypes.exists(_.existsRecursively(_.isInstanceOf[DecimalType]))) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need special handling for decimal types?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a compatible issue. In normal case, data type is converted by Hive ObjectInspector at running time. But Hive not support input decimal type when method required double type. Unfortunately the default type of 1.1 is different between Spark and Hive, which are decimal and double. Then caused this issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Could you describe it in the PR description?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I didn't get it. ImplicitCastInputTypes can implicit cast decimal to double, doesn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In first commit I did it for all types but test not passed. The reason is a UDF required an Object type.

We convert Java Object Type to NullType in HiveInspectors.javaTypeToDataType (seems this can be changed ?)

 // Hive seems to return this for struct types?
    case c: Class[_] if c == classOf[java.lang.Object] => NullType

So we can't reflect the UDF method using a NullType, the error msg is:

in query: cannot resolve 'example_format('%o', 93)' due to data type mismatch: argument 2 requires array<null> type, however, '93' is of int type.; line 1 pos 7;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if a hive udf requires Object, I think it means AnyDataType. We should only special case it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this ?

    val expectTypes = method.getGenericParameterTypes.map(javaTypeToDataType)
    if (expectTypes.exists(_.existsRecursively(_.isInstanceOf[NullType]))) {
      children.map(_.dataType)
    } else {
      expectTypes
    }

Copy link
Contributor

@cloud-fan cloud-fan Sep 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

method.getGenericParameterTypes.map(javaTypeToDataType).map { dt =>
  if (dt.existsRecursively(_.isInstanceOf[NullType])) AnyDataType else dt
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we should check data type and replace NullType to AnyDataType on by one to avoid such case Map<Double, Object> input.

I will do an another check if we can change HiveInspectors.javaTypeToDataType that using AnyDataType directly.

@ulysses-you
Copy link
Contributor Author

also cc @cloud-fan @dongjoon-hyun the similar issue with #13930

with Logging
with UserDefinedExpression {
with UserDefinedExpression
with ImplicitCastInputTypes {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ScalaUDF doesn't extend ImplicitCastInputTypes either. Does it have the same problem?

Copy link
Contributor Author

@ulysses-you ulysses-you Sep 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, ImplicitTypeCasts has checked ScalaUDF as case udf: ScalaUDF if udf.inputTypes.nonEmpty =>

}

override def inputTypes: Seq[AbstractDataType] = {
val inTypes = children.map(_.dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. The expected type should be defined by the function signature, but not the actual function inputs. What are we doing here?

As an example, ScalaUDF.inputTypes is derived from the function signature (captured by encoders).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ScalaUDF also implicit convert input type to expected type at ImplicitTypeCasts.

Let's say we have a udf Array[Double] => Double = { data => data.sum } and we run spark.sql("select udf(array(1.0, 1.1, 1.2))"). Then ImplicitTypeCasts will cast array<decimal> to array<double>.

But Hive udf can't enjoy it, now we only use Hive ObjectInspector to convert data type at running time. It's the difference.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I misread the code. We get children data type only to skip decimal.

@SparkQA
Copy link

SparkQA commented Sep 18, 2020

Test build #128879 has finished for PR 29749 at commit e13f2b9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

My last concern is to check ScalaUDF and see if the type coercion rules are the same with HiveUDF. ScalaUDF doesn't extend ImplicitCastInputTypes.

@ulysses-you
Copy link
Contributor Author

ScalaUDF has been checked in ImplicitTypeCasts just like extends ImplicitCastInputTypes. Here is some code from ImplicitTypeCasts.

      case e: ImplicitCastInputTypes if e.inputTypes.nonEmpty =>
        val children: Seq[Expression] = e.children.zip(e.inputTypes).map { case (in, expected) =>
          // If we cannot do the implicit cast, just use the original input.
          implicitCast(in, expected).getOrElse(in)
        }
        e.withNewChildren(children)

      case udf: ScalaUDF if udf.inputTypes.nonEmpty =>
        val children = udf.children.zip(udf.inputTypes).map { case (in, expected) =>
          // Currently Scala UDF will only expect `AnyDataType` at top level, so this trick works.
          // In the future we should create types like `AbstractArrayType`, so that Scala UDF can
          // accept inputs of array type of arbitrary element type.
          if (expected == AnyDataType) {
            in
          } else {
            implicitCast(
              in,
              udfInputToCastType(in.dataType, expected.asInstanceOf[DataType])
            ).getOrElse(in)
          }

        }

@ulysses-you
Copy link
Contributor Author

Seems there is somethings wrong, HiveSimpleUDF.method is called before ImplicitCastInputTypes that will throw exception before we cast input types.

@ulysses-you
Copy link
Contributor Author

After checked Hive code, it's a Hive version issue. In Hive 2.3.x this pr will not happen since the HIVE-13380 merged. And what I said is based on Hive 1.2.x.

An other thing is we cannot implicit cast input type because we need use children data type to reflect UDF method first and get exception if it fails. So we have no chance to go in ImplicitTypeCasts. We should skip method input data type check like my first commit 89a2966.

Sorry for the mistake, seems it's not need to fix the issue.

@cloud-fan
Copy link
Contributor

Thanks for the investigation! You can probably open a new PR to add comments for HiveSimpleUDF, explaining why it can't extend ImplicitTypeCasts

@ulysses-you
Copy link
Contributor Author

thanks @cloud-fan @maropu

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Sep 22, 2020

Hi, @ulysses-you and @cloud-fan and @maropu ?
Can we have this test case in our Apache Spark repository?

}
}

test("SPARK-32877: Fix Hive UDF not support decimal type in complex type") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not covered in any other test case, it looks worth of having this.

@dongjoon-hyun
Copy link
Member

I'm wondering if this test fails with '-Phive1.2' still.

@ulysses-you
Copy link
Contributor Author

@dongjoon-hyun I will open a new PR that only add test and some comment.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants