-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-20384][SQL] Support value class in schema of Dataset (without breaking existing current projection) #27153
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20384][SQL] Support value class in schema of Dataset (without breaking existing current projection) #27153
Conversation
|
jenkins this is ok to test |
|
Test build #116417 has finished for PR 27153 at commit
|
| * @return unwrapped param | ||
| */ | ||
| private def unwrapValueClassParam(param: (String, `Type`)): (String, `Type`) = { | ||
| val (name, typ) = param |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe use tpe instead of typ to be consistent with the naming in this file
| case class StrWrapper(s: String) extends AnyVal | ||
|
|
||
| case class ValueClassData( | ||
| intField: Int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please fix indentation here
|
@mickjermsurawong-stripe thanks for continue with this feature, I hope it will be merged this time |
| val df = spark.sparkContext.parallelize(Seq(a, b)).toDF | ||
| // flat value class, `s` field is not in schema | ||
| val filtered = df.where("wrapper = \"a\"") | ||
| checkAnswer(filtered, spark.sparkContext.parallelize(Seq(a)).toDF) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before this change we never support nested value class:
- Filter with
wrapperwould break with
org.apache.spark.sql.AnalysisException: cannot resolve '(`wrapper` = 'a')' due to data type mismatch: differing types in '(`wrapper` = 'a')' (struct<s:string> and string).; line 1 pos 0;
- Filter with
wrapper.swould break with:
java.lang.ClassCastException: java.lang.String cannot be cast to org.apache.spark.sql.test.SQLTestData$StringWrapper
|
Thanks @mt40! |
|
Test build #116688 has finished for PR 27153 at commit
|
|
retest this please |
|
Test build #117086 has finished for PR 27153 at commit
|
|
^ bumping this please. I suspect the patch failure here is flaky; change from the last build is only adding more tests. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
I am interested in support for value classes that is added/fixed in this branch. To me the changes looks like they are a still valid approach of adding the support. Rebasing this branch on master will cause some of the added test cases to fail. This is due to a PR #31766 that fixed the so that interpreted path test are run properly and a bug for If/after the bug fix gets merged, would the next step towards getting this merged? Should I open a new PR with the changes from @mickjermsurawong-stripe branch? Since to me it looks like the original author is no longer around and/or interested in these changes. |
|
@eejbyfeldt Nice to see someone having the same interest in this. I think you should continue working on this branch to avoid duplication but it's up to you though. In my opinion, the hardest part would be to convince Spark team that this change is necessary so this is can be merged once failed tests are fixed. |
|
hi @eejbyfeldt! We've had this equivalent change in our forked running in prod for a while now. One more follow-up we had internally is to handle value class wrapped in tuple like |
|
Hi @mickjermsurawong-stripe ! Great to hear, looking forward to updated changes. Also my PR #32783 got merged, so rebasing this branch on master should no longer fail tests. |
|
@mickjermsurawong-stripe Have you had anytime to look at updating the branch? I you don't have time to prepare full patch, I could help out with unittests and such if you point can provide some information about approximatly which change is needed for the value classes in tuples. |
|
I looked some more at this and realised what the problem with Based on this it seems something like the approach of the PR #22309 by @mt40 would be needed to handle all cases correctly. To me it also makes sense that a AnyVal would give the same schema regardless of where it is used like that PR. So unless there is someother good suggestion on this, that is the approach I am going to persue. |
|
Hey @eejbyfeldt sorry i realized that the internal fork we addressed this issue is in a different class of our custom encoder, so i didn't get to integrate it with the standard encoder here. The underlying issue we addressed is exactly the allocation you mentioned: From doc on value class: https://docs.scala-lang.org/overviews/core/value-classes.html Given:
Hope this could help. It's a long weekend here, so I will make sure I'll get to work on this. But please feel free to pursue independently as well. |
|
hi @eejbyfeldt i made another PR to address the tuple issue raised. #33205. |
### What changes were proposed in this pull request? - This PR revisits #22309, and [SPARK-20384](https://issues.apache.org/jira/browse/SPARK-20384) solving the original problem, but additionally will prevent backward-compat break on schema of top-level `AnyVal` value class. - Why previous break? We currently support top-level value classes just as any other case class; field of the underlying type is present in schema. This means any dataframe SQL filtering on this expects the field name to be present. The previous PR changes this schema and would result in breaking current usage. See test `"schema for case class that is a value class"`. This PR keeps the schema. - We actually currently support collection of value classes prior to this change, but not case class of nested value class. This means the schema of these classes shouldn't change to prevent breaking too. - However, what we can change, without breaking, is schema of nested value class, which will fails due to the compile problem, and thus its schema now isn't actually valid. After the change, the schema of this nested value class is now flattened - With this PR, there's flattening only for nested value class (new), but not for top-level and collection classes (existing behavior) - This PR revisits #27153 by handling tuple `Tuple2[AnyVal, AnyVal]` which is a constructor ("nested class") but is a generic type, so it should not be flattened behaving similarly to `Seq[AnyVal]` ### Why are the changes needed? - Currently, nested value class isn't supported. This is because when the generated code treats `anyVal` class in its unwrapped form, but we encode the type to be the wrapped case class. This results in compile of generated code For example, For a given `AnyVal` wrapper and its root-level class container ``` case class IntWrapper(i: Int) extends AnyVal case class ComplexValueClassContainer(c: IntWrapper) ``` The problematic part of generated code: ``` private InternalRow If_1(InternalRow i) { boolean isNull_42 = i.isNullAt(0); // 1) ******** The root-level case class we care org.apache.spark.sql.catalyst.encoders.ComplexValueClassContainer value_46 = isNull_42 ? null : ((org.apache.spark.sql.catalyst.encoders.ComplexValueClassContainer) i.get(0, null)); if (isNull_42) { throw new NullPointerException(((java.lang.String) references[5] /* errMsg */ )); } boolean isNull_39 = true; // 2) ******** We specify its member to be unwrapped case class extending `AnyVal` org.apache.spark.sql.catalyst.encoders.IntWrapper value_43 = null; if (!false) { isNull_39 = false; if (!isNull_39) { // 3) ******** ERROR: `c()` compiled however is of type `int` and thus we see error value_43 = value_46.c(); } } ``` We get this errror: Assignment conversion not possible from type "int" to type "org.apache.spark.sql.catalyst.encoders.IntWrapper" ``` java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 159, Column 1: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 159, Column 1: Assignment conversion not possible from type "int" to type "org.apache.spark.sql.catalyst.encoders.IntWrapper" ``` From [doc](https://docs.scala-lang.org/overviews/core/value-classes.html) on value class: , Given: `class Wrapper(val underlying: Int) extends AnyVal`, 1) "The type at compile time is `Wrapper`, but at runtime, the representation is an `Int`". This implies that when our struct has a field of value class, the generated code should support the underlying type during runtime execution. 2) `Wrapper` "must be instantiated... when a value class is used as a type argument". This implies that `scala.Tuple[Wrapper, ...], Seq[Wrapper], Map[String, Wrapper], Option[Wrapper]` will still contain Wrapper as-is in during runtime instead of `Int`. ### Does this PR introduce _any_ user-facing change? - Yes, this will allow support for the nested value class. ### How was this patch tested? - Added unit tests to illustrate - raw schema - projection - round-trip encode/decode Closes #33205 from mickjermsurawong-stripe/SPARK-20384-2. Lead-authored-by: Mick Jermsurawong <mickjermsurawong@stripe.com> Co-authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com> Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request? - This PR revisits apache/spark#22309, and [SPARK-20384](https://issues.apache.org/jira/browse/SPARK-20384) solving the original problem, but additionally will prevent backward-compat break on schema of top-level `AnyVal` value class. - Why previous break? We currently support top-level value classes just as any other case class; field of the underlying type is present in schema. This means any dataframe SQL filtering on this expects the field name to be present. The previous PR changes this schema and would result in breaking current usage. See test `"schema for case class that is a value class"`. This PR keeps the schema. - We actually currently support collection of value classes prior to this change, but not case class of nested value class. This means the schema of these classes shouldn't change to prevent breaking too. - However, what we can change, without breaking, is schema of nested value class, which will fails due to the compile problem, and thus its schema now isn't actually valid. After the change, the schema of this nested value class is now flattened - With this PR, there's flattening only for nested value class (new), but not for top-level and collection classes (existing behavior) - This PR revisits apache/spark#27153 by handling tuple `Tuple2[AnyVal, AnyVal]` which is a constructor ("nested class") but is a generic type, so it should not be flattened behaving similarly to `Seq[AnyVal]` ### Why are the changes needed? - Currently, nested value class isn't supported. This is because when the generated code treats `anyVal` class in its unwrapped form, but we encode the type to be the wrapped case class. This results in compile of generated code For example, For a given `AnyVal` wrapper and its root-level class container ``` case class IntWrapper(i: Int) extends AnyVal case class ComplexValueClassContainer(c: IntWrapper) ``` The problematic part of generated code: ``` private InternalRow If_1(InternalRow i) { boolean isNull_42 = i.isNullAt(0); // 1) ******** The root-level case class we care org.apache.spark.sql.catalyst.encoders.ComplexValueClassContainer value_46 = isNull_42 ? null : ((org.apache.spark.sql.catalyst.encoders.ComplexValueClassContainer) i.get(0, null)); if (isNull_42) { throw new NullPointerException(((java.lang.String) references[5] /* errMsg */ )); } boolean isNull_39 = true; // 2) ******** We specify its member to be unwrapped case class extending `AnyVal` org.apache.spark.sql.catalyst.encoders.IntWrapper value_43 = null; if (!false) { isNull_39 = false; if (!isNull_39) { // 3) ******** ERROR: `c()` compiled however is of type `int` and thus we see error value_43 = value_46.c(); } } ``` We get this errror: Assignment conversion not possible from type "int" to type "org.apache.spark.sql.catalyst.encoders.IntWrapper" ``` java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 159, Column 1: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 159, Column 1: Assignment conversion not possible from type "int" to type "org.apache.spark.sql.catalyst.encoders.IntWrapper" ``` From [doc](https://docs.scala-lang.org/overviews/core/value-classes.html) on value class: , Given: `class Wrapper(val underlying: Int) extends AnyVal`, 1) "The type at compile time is `Wrapper`, but at runtime, the representation is an `Int`". This implies that when our struct has a field of value class, the generated code should support the underlying type during runtime execution. 2) `Wrapper` "must be instantiated... when a value class is used as a type argument". This implies that `scala.Tuple[Wrapper, ...], Seq[Wrapper], Map[String, Wrapper], Option[Wrapper]` will still contain Wrapper as-is in during runtime instead of `Int`. ### Does this PR introduce _any_ user-facing change? - Yes, this will allow support for the nested value class. ### How was this patch tested? - Added unit tests to illustrate - raw schema - projection - round-trip encode/decode Closes #33205 from mickjermsurawong-stripe/SPARK-20384-2. Lead-authored-by: Mick Jermsurawong <mickjermsurawong@stripe.com> Co-authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com> Signed-off-by: Sean Owen <srowen@gmail.com>
What changes were proposed in this pull request?
AnyValvalue class."schema for case class that is a value class". This PR keeps the schema.c7aaae8
Why are the changes needed?
anyValclass in its unwrapped form, but we encode the type to be the wrapped case class. This results in compile of generated codeFor example,
For a given
AnyValwrapper and its root-level class containerThe problematic part of generated code:
We get this errror: Assignment conversion not possible from type "int" to type "org.apache.spark.sql.catalyst.encoders.IntWrapper"
Does this PR introduce any user-facing change?
How was this patch tested?
cc @mt40 please let me know if i'm missing something from your previous PR
cc @joshrosen-stripe