Skip to content

Conversation

@mazeboard
Copy link

@mazeboard mazeboard commented Apr 13, 2019

What changes were proposed in this pull request?

Currently we modified JavaTypeInference to be able to create encoders for Avro objects; we have now three solutions, the ones in the PR #24299, and #22878 and this PR (fewer code changes); which one is better?

How was this patch tested?

We added one test in ExpressionencoderSuite and used the following program to test it locally:

  implicit val avroExampleEncoder = Encoders.bean[AvroExample1](classOf[AvroExample1]).asInstanceOf[ExpressionEncoder[AvroExample1]]
  val input: AvroExample1 = AvroExample1.newBuilder()
    .setMyarray(List("Foo", "Bar").asJava)
    .setMyboolean(true)
    .setMybytes(java.nio.ByteBuffer.wrap("MyBytes".getBytes()))
    .setMydouble(2.5)
    .setMyfixed(new Magic("magic".getBytes))
    .setMyfloat(25.0F)
    .setMyint(100)
    .setMylong(10L)
    .setMystring("hello")
    .setMymap(Map(
      "foo" -> new java.lang.Integer(1),
      "bar" -> new java.lang.Integer(2)).asJava)
    .setMymoney(Money.newBuilder().setAmount(100.0F).setCurrency(Currency.EUR).build())
    .build()

  val row: InternalRow = avroExampleEncoder.toRow(input)

  val output: AvroExample1 = avroExampleEncoder.resolveAndBind().fromRow(row)

  val ds: Dataset[AvroExample1] = List(input).toDS()

  println(ds.schema)
  println(ds.collect().toList)

  ds.write.format("avro").save("example1")

  val fooDF = spark.read.format("avro").load("example1")

  val fooDS = fooDF.as[AvroExample1]

  println(fooDS.collect().toList)

method.getParameterCount == 1)
if (a.getName == b.getName ||
(a.getName.indexOf("get") == 0 && b.getName.indexOf("set") == 0 &&
a.getName.substring(3) == b.getName.substring(3))) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a bit hacky here, compared with #24299

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will try to simplify

We need to check that the getter and setter functions are either prefixed with get and set respectively or both functions are not prefixed

I did this modification in #24299

I will try to make the refactor the code if possible

encodeDecodeTest(Option("abc"), "option of string")
encodeDecodeTest(Option.empty[String], "empty option of string")

encodeDecodeTest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried and the test case failed:

Can't compare maps!
org.apache.avro.AvroRuntimeException: Can't compare maps!
	at org.apache.avro.generic.GenericData.compare(GenericData.java:984)
	at org.apache.avro.specific.SpecificData.compare(SpecificData.java:333)
	at org.apache.avro.generic.GenericData.compare(GenericData.java:961)
	at org.apache.avro.specific.SpecificData.compare(SpecificData.java:333)
	at org.apache.avro.generic.GenericData.compare(GenericData.java:946)
	at org.apache.avro.specific.SpecificRecordBase.compareTo(SpecificRecordBase.java:81)
	at org.apache.avro.specific.SpecificRecordBase.compareTo(SpecificRecordBase.java:30)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite.$anonfun$encodeDecodeTest$1(ExpressionEncoderSuite.scala:442)

Copy link
Author

@mazeboard mazeboard Apr 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue here is how the input and the convertedBack objects are compared

if we replace the check, in line ExpressionEncoderSuite.scala:442,

left.asInstanceOf[Comparable[Any]].compareTo(right) == 0
By
left.asInstanceOf[Comparable[Any]].equals(right) == 0

the test for Avro encoder passes, but unfortunately other tests fail

Equality of objects is tricky.

The GenericData compare function

  /** Comparison implementation.  When equals is true, only checks for equality,
   * not for order. */
  @SuppressWarnings(value="unchecked")
  protected int compare(Object o1, Object o2, Schema s, boolean equals) {

fails to compare Maps when the parameter equals is false

I propose the following

replace ExpressionEncoderSuite:434:444 lines

      val isCorrect = (input, convertedBack) match {
        case (b1: Array[Byte], b2: Array[Byte]) => Arrays.equals(b1, b2)
        case (b1: Array[Int], b2: Array[Int]) => Arrays.equals(b1, b2)
        case (b1: Array[Array[_]], b2: Array[Array[_]]) =>
          Arrays.deepEquals(b1.asInstanceOf[Array[AnyRef]], b2.asInstanceOf[Array[AnyRef]])
        case (b1: Array[_], b2: Array[_]) =>
          Arrays.equals(b1.asInstanceOf[Array[AnyRef]], b2.asInstanceOf[Array[AnyRef]])
        case (left: Comparable[_], right: Comparable[_]) =>
          left.asInstanceOf[Comparable[Any]].compareTo(right) == 0
        case _ => input == convertedBack
      }

by

      val convertedBackRow = encoder.toRow(convertedBack)
      val isCorrect = row == convertedBackRow

With the proposed modification all the tests in ExpressionEncoderSuite passes

I think this makes sense, the check should not depend on how the foreign objects implement equality (see next comment)

Copy link
Author

@mazeboard mazeboard Apr 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we must compare input and convertedBack ; the equality row==convertedBackRow does not guarantee that the encoder is correct; indeed a buggy encoder may create a row with missing fields such that row==convertedBackRow is true while input==convertedBack is false.

For the avro encoder test the test input == convertedBack is true when the encoder is correct, thus we propose the following modification:

      val isCorrect = input == convertedBack || ((input, convertedBack) match {
        case (b1: Array[Byte], b2: Array[Byte]) => Arrays.equals(b1, b2)
        case (b1: Array[Int], b2: Array[Int]) => Arrays.equals(b1, b2)
        case (b1: Array[Array[_]], b2: Array[Array[_]]) =>
          Arrays.deepEquals(b1.asInstanceOf[Array[AnyRef]], b2.asInstanceOf[Array[AnyRef]])
        case (b1: Array[_], b2: Array[_]) =>
          Arrays.equals(b1.asInstanceOf[Array[AnyRef]], b2.asInstanceOf[Array[AnyRef]])
        case (left: Comparable[_], right: Comparable[_]) =>
          left.asInstanceOf[Comparable[Any]].compareTo(right) == 0
        case _ => false
      })

With this modification all the tests pass

The issue here is how the input and the convertedBack objects are compared

if we replace the check, in line ExpressionEncoderSuite.scala:442,

left.asInstanceOf[Comparable[Any]].compareTo(right) == 0
By
left.asInstanceOf[Comparable[Any]].equals(right) == 0

the test for Avro encoder passes, but unfortunately other tests fail

Equality of objects is tricky.

The GenericData compare function

  /** Comparison implementation.  When equals is true, only checks for equality,
   * not for order. */
  @SuppressWarnings(value="unchecked")
  protected int compare(Object o1, Object o2, Schema s, boolean equals) {
fails to compare Maps when the parameter equals is false

I propose the following

replace ExpressionEncoderSuite:434:444 lines

      val isCorrect = (input, convertedBack) match {
        case (b1: Array[Byte], b2: Array[Byte]) => Arrays.equals(b1, b2)
        case (b1: Array[Int], b2: Array[Int]) => Arrays.equals(b1, b2)
        case (b1: Array[Array[_]], b2: Array[Array[_]]) =>
          Arrays.deepEquals(b1.asInstanceOf[Array[AnyRef]], b2.asInstanceOf[Array[AnyRef]])
        case (b1: Array[_], b2: Array[_]) =>
          Arrays.equals(b1.asInstanceOf[Array[AnyRef]], b2.asInstanceOf[Array[AnyRef]])
        case (left: Comparable[_], right: Comparable[_]) =>
          left.asInstanceOf[Comparable[Any]].compareTo(right) == 0
        case _ => input == convertedBack
      }
by

      val convertedBackRow = encoder.toRow(convertedBack)
      val isCorrect = row == convertedBackRow
With the proposed modification all the tests in ExpressionEncoderSuite passes
@ravetdavid
Copy link

👍 it's been a long time since we wanted to use dataset [T <: SpecificRecord]

@mazeboard
Copy link
Author

mazeboard commented May 18, 2019

The following example shows why we prefer the solution that modifies ScalaReflection (#24299)

In the example below, If we use bean encoder we must declare a tuple encoder as shown:

    implicit val barcodeEncoder = Encoders.bean[Barcode](classOf[Barcode]).asInstanceOf[ExpressionEncoder[Barcode]]
    implicit val tuplEncoder = Encoders.tuple[String, Money, CrpAddDesc](
      Encoders.STRING,
      Encoders.bean[Money](classOf[Money]).asInstanceOf[ExpressionEncoder[Money]],
      Encoders.bean[CrpAddDesc](classOf[CrpAddDesc]).asInstanceOf[ExpressionEncoder[CrpAddDesc]])
...
...
    val ds: Dataset[Barcode] = List(barcode).toDS()
    val x: Dataset[(String, Money, CrpAddDesc)] = ds.map(a => {
      (
        a.getBarcode,
        a.getPrdTaxVal,
        a.getCrpAttributes.get(0).getCrpAddDesc.get(0))
    })

if we do not declare the tuple encoder we get an error No Encoder found for common.lib.v1.Money

But with the ScalaReflection (#24299) modification we just need to declare an encoder for Barcode:

    implicit val barcodeEncoder = ExpressionEncoder[Barcode]()
      ...
      ...
    val ds: Dataset[Barcode] = List(barcode).toDS()
    val x: Dataset[(String, Money, CrpAddDesc)] = ds.map(a => {
      (
        a.getBarcode,
        a.getPrdTaxVal,
        a.getCrpAttributes.get(0).getCrpAddDesc.get(0))
    })

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 31, 2019
@github-actions github-actions bot closed this Jan 1, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants