-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-43333][SQL] Allow Avro to convert union type to SQL with field name stable with type #41263
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this. I left a few comments, would appreciate it if you could take a look.
@@ -144,11 +147,28 @@ object SchemaConverters { | |||
case _ => | |||
// Convert complex unions to struct types where field names are member0, member1, etc. | |||
// This is consistent with the behavior when converting between Avro and Parquet. | |||
val use_stable_id = SQLConf.get.getConf(SQLConf.AVRO_STABLE_ID_FOR_UNION_TYPE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could you use useStableId
?
// could be "a" and "A" and we need to distinguish them. | ||
var temp_name = s"member_${s.getName.toLowerCase(Locale.ROOT)}" | ||
while (fieldNameSet.contains(temp_name)) { | ||
temp_name = s"${temp_name}_$i" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking if we could simply throw an error when this case happens; the reason is that they might not be stable identifiers anymore. We could explain that stable identifiers can only be used if the types are unique, if you have more than one type that has the same name, please use sequential numeric ids instead.
Alternatively, we could just store them as is, without converting to upper or lower case - that could be an option.
@@ -144,11 +147,28 @@ object SchemaConverters { | |||
case _ => | |||
// Convert complex unions to struct types where field names are member0, member1, etc. | |||
// This is consistent with the behavior when converting between Avro and Parquet. | |||
val use_stable_id = SQLConf.get.getConf(SQLConf.AVRO_STABLE_ID_FOR_UNION_TYPE) | |||
|
|||
var fieldNameSet : Set[String] = Set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would copy the set every time you add an element. We can change it to a mutable set (https://www.scala-lang.org/api/2.13.6/scala/collection/mutable/Set$.html) or just java.util.HashSet
.
@@ -144,11 +147,28 @@ object SchemaConverters { | |||
case _ => | |||
// Convert complex unions to struct types where field names are member0, member1, etc. | |||
// This is consistent with the behavior when converting between Avro and Parquet. | |||
val use_stable_id = SQLConf.get.getConf(SQLConf.AVRO_STABLE_ID_FOR_UNION_TYPE) | |||
|
|||
var fieldNameSet : Set[String] = Set() | |||
val fields = avroSchema.getTypes.asScala.zipWithIndex.map { | |||
case (s, i) => | |||
val schemaType = toSqlTypeHelper(s, existingRecordNames) | |||
// All fields are nullable because only one of them is set at a time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you move this comment to L171 - it was referring to the nullable
flag.
@@ -98,6 +98,44 @@ abstract class AvroSuite | |||
}, new GenericDatumReader[Any]()).getSchema.toString(false) | |||
} | |||
|
|||
def checkUnionStableId( | |||
types: List[Schema], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 4 space indentation for method parameters. Could you also make it private?
"member_myrecord2: struct<field: float>>", | ||
Seq()) | ||
|
||
// Two array or map is not allowed in union. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could we change the comment to this: Two array types or two map types are not allowed in union.
def checkUnionStableId( | ||
types: List[Schema], | ||
expectedSchema: String, | ||
fieldsAndRow : Seq[(Any, Row)]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no space before :
.
Also, could you explain the type? I think you can just pass the expected DataFrame that would contain the schema and the data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied the way SQL schema is generated from test "Complex Union Type". I feel it is easier to write unit test this way and if possible I will maintain it. I will add comments to explain the parameters.
dataFileWriter.append(avroRec2) | ||
dataFileWriter.flush() | ||
dataFileWriter.close() | ||
test("union stable id") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost forgot to mention, could you change this to SPARK-43333: union type with stable ids
? Thanks.
Thank you for pinging me, @sadikovi . |
"spark.sql.avro.enableStableIdentifiersForUnionType") | ||
.doc("When Avro is desrialized to SQL schema, the union type is converted to structure in a " + | ||
"way that field names of the structure are stable with the type, in most cases.") | ||
.version("3.4.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3.5.0
?
val AVRO_STABLE_ID_FOR_UNION_TYPE = buildConf( | ||
"spark.sql.avro.enableStableIdentifiersForUnionType") | ||
.doc("When Avro is desrialized to SQL schema, the union type is converted to structure in a " + | ||
"way that field names of the structure are stable with the type, in most cases.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description seems to need revisions to clarify what is the difference between true
and false
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Especially, please mention the case-sensitivity
cases.
val fieldName = if (useSchemaId) { | ||
// Avro's field name may be case sensitive, so field names for two named type | ||
// could be "a" and "A" and we need to distinguish them. In this case, we throw | ||
// an option. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ... we throw an exception.
@@ -3413,6 +3413,17 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val AVRO_STABLE_ID_FOR_UNION_TYPE = buildConf( | |||
"spark.sql.avro.enableStableIdentifiersForUnionType") | |||
.doc("If it is set to true, then Avro is desrialized to SQL schema, the union type is " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rephrase the doc like this:
If it is set to true, Avro schema is deserialized into Spark SQL schema, and the Avro Union type is transformed into a structure where the field names remain consistent with their respective types. The resulting field names are converted to lowercase, e.g.
member_int
ormember_string
. If two user-defined type names are identical regardless of case, an exception will be raised. However, in other cases, the field names can be uniquely identified.
@@ -98,6 +98,52 @@ abstract class AvroSuite | |||
}, new GenericDatumReader[Any]()).getSchema.toString(false) | |||
} | |||
|
|||
/* Check whether an Avro schema of union type is converted to SQL in an expected way, when the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this supposed to be Javadoc? If so, then it should look like this:
/**
* <your text>
*/
If not, you can just use the inline comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for making the changes. LGTM.
@dongjoon-hyun Could you take another look when you have time? Thank you. |
@@ -144,11 +148,31 @@ object SchemaConverters { | |||
case _ => | |||
// Convert complex unions to struct types where field names are member0, member1, etc. | |||
// This is consistent with the behavior when converting between Avro and Parquet. | |||
val useSchemaId = SQLConf.get.getConf(SQLConf.AVRO_STABLE_ID_FOR_UNION_TYPE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally these configs are provided as options for functions (e.g. for from_avro()
).
For file source, it should be an option for the source.
Lets not use spark conf.
dataFileWriter.append(avroRec2) | ||
dataFileWriter.flush() | ||
dataFileWriter.close() | ||
test("SPARK-43333: union stable id") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove SPARK jira id here.
Can we include a user defined Avro struct also in addition to primitive types? Say 'CustomerInfo'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests need to include Spark Jira ids unless the test suite is new.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean we need to read the Spark Jira to understand the test? I would be surprised if there is a such policy. Do you have link?
It is a test for a new feature. Ideally it should be understandable by itself and should not need to go to jira ticket. I have added many new tests without adding Jira id.
I am ok if we want to include it here. I don't see any of use of doing so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, Jira number needs to be included, however, the test name should be descriptive enough to understand what the test does. Jira number is added for the reference, if the test breaks, it is much easier to track down the original change and understand the motivation behind it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can find a note on this in https://spark.apache.org/contributing.html (Pull request section).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the link. Sure.
@@ -3413,6 +3413,18 @@ object SQLConf { | |||
.booleanConf | |||
.createWithDefault(true) | |||
|
|||
val AVRO_STABLE_ID_FOR_UNION_TYPE = buildConf( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Commented above. I think it should be an option for Avro functions and Avro source, not a spark conf.
dataFileWriter.append(avroRec2) | ||
dataFileWriter.flush() | ||
dataFileWriter.close() | ||
test("SPARK-43333: union stable id") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update the test name, e.g. Stable field names when converting Union type
or Union type: stable field ids/names
? So other contributors could understand what is being tested here.
…roOptions, as SchemaConverters is public and AvroOptions is a private class.
@dongjoon-hyun I addressed the comments and the CI appears to pass now. Can you help take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Made a few suggestions.
@@ -154,4 +157,5 @@ private[sql] object AvroOptions extends DataSourceOptions { | |||
// datasource similarly to the SQL config `spark.sql.avro.datetimeRebaseModeInRead`, | |||
// and can be set to the same values: `EXCEPTION`, `LEGACY` or `CORRECTED`. | |||
val DATETIME_REBASE_MODE = newOption("datetimeRebaseMode") | |||
val STABLE_ID_FOR_UNION_TYPE = newOption("enableStableIdentifiersForUnionType") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add documentation for this? I think Spark conf version had long doc comment. We can reuse that here.
// types where field names are member0, member1, etc. This is consistent with the | ||
// behavior when converting between Avro and Parquet. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is Parquet connection here? Should this say "consistent with default behavior before adding support for stable names"?.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the existing comment. I just got in different lines after adding "When avroOptions.useStableIdForUnionType is false" in the beginning. I don't know what it is and I have no reason to doubt it is wrong.
toSqlType(avroSchema, options). | ||
dataType. | ||
asInstanceOf[StructType] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code style: .
should move to start. E.g. :
val sparkSchema = SchemaConverters
.toSqlType(avroSchema, options)
.dataType
.asInstanceOf[StructType]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching it. I don't know why it became like that.
} | ||
} | ||
|
||
test("SPARK-27858 Union type: More than one non-null type") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could add a short description of the test in a comment at the top? This helps in understanding the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a new test. It's an existing test. I just added the scenario of stable ID.
<tr> | ||
<td><code>enableStableIdentifiersForUnionType</code></td> | ||
<td>false</td> | ||
<td>If it is set to true, Avro schema is deserialized into Spark SQL schema, and the Avro Union type is transformed into a structure where the field names remain consistent with their respective types. The resulting field names are converted to lowercase, e.g. member_int or member_string. If two user-defined type names or a user-defined type name and a built-in type name are identical regardless of case, an exception will be raised. However, in other cases, the field names can be uniquely identified.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please copy this description to AvroOptions.scala
as well.
+CC @shardulm94 |
@dongjoon-hyun the tests are all pass now. Can you help take a look? |
@dongjoon-hyun do you plan to take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the work
… name stable with type ### What changes were proposed in this pull request? Introduce AvroOption "enableStableIdentifiersForUnionType". If it is set to true (default remains to be false), Avro's union is converted to SQL schema by naming field name "member_" + type name. This is to try to keep field name stable with type name. ### Why are the changes needed? The purpose of this is twofold: To allow adding or removing types to the union without affecting the record names of other member types. If the new or removed type is not ordered last, then existing queries referencing "member2" may need to be rewritten to reference "member1" or "member3". Referencing the type name in the query is more readable than referencing "member0". For example, our system produces an avro schema from a Java type structure where subtyping maps to union types whose members are ordered lexicographically. Adding a subtype can therefore easily result in all references to "member2" needing to be updated to "member3". ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add a unit test that covers all types supported in union, as well as some potential name conflict cases. Closes apache#41263 from siying/avro_stable_union. Authored-by: Siying Dong <siying.dong@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
What changes were proposed in this pull request?
Introduce AvroOption "enableStableIdentifiersForUnionType". If it is set to true (default remains to be false), Avro's union is converted to SQL schema by naming field name "member_" + type name. This is to try to keep field name stable with type name.
Why are the changes needed?
The purpose of this is twofold:
To allow adding or removing types to the union without affecting the record names of other member types. If the new or removed type is not ordered last, then existing queries referencing "member2" may need to be rewritten to reference "member1" or "member3".
Referencing the type name in the query is more readable than referencing "member0".
For example, our system produces an avro schema from a Java type structure where subtyping maps to union types whose members are ordered lexicographically. Adding a subtype can therefore easily result in all references to "member2" needing to be updated to "member3".
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Add a unit test that covers all types supported in union, as well as some potential name conflict cases.