Skip to content

Commit

Permalink
[SPARK-48175][SQL][PYTHON] Store collation information in metadata an…
Browse files Browse the repository at this point in the history
…d not in type for SER/DE

### What changes were proposed in this pull request?
Changing serialization and deserialization of collated strings so that the collation information is put in the metadata of the enclosing struct field - and then read back from there during parsing.

Format of serialization will look something like this:
```json
{
  "type": "struct",
  "fields": [
    "name": "colName",
    "type": "string",
    "nullable": true,
    "metadata": {
      "__COLLATIONS": {
        "colName": "UNICODE"
      }
    }
  ]
}
```

If we have a map we will add suffixes `.key` and `.value` in the metadata:
```json
{
  "type": "struct",
  "fields": [
    {
      "name": "mapField",
      "type": {
        "type": "map",
        "keyType": "string",
        "valueType": "string",
        "valueContainsNull": true
      },
      "nullable": true,
      "metadata": {
        "__COLLATIONS": {
          "mapField.key": "UNICODE",
          "mapField.value": "UNICODE"
        }
      }
    }
  ]
}
```
It will be a similar story for arrays (we will add `.element` suffix). We could have multiple suffixes when working with deeply nested data types (Map[String, Array[Array[String]]] - see tests for this example)

### Why are the changes needed?
Putting collation info in field metadata is the only way to not break old clients reading new tables with collations. `CharVarcharUtils` does a similar thing but this is much less hacky, and more friendly for all 3p clients - which is especially important since delta also uses spark for schema ser/de.

It will also remove the need for additional logic introduced in apache#46083 to remove collations before writing to HMS as this way the tables will be fully HMS compatible.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
With unit tests

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46280 from stefankandic/newDeltaSchema.

Lead-authored-by: Stefan Kandic <stefan.kandic@databricks.com>
Co-authored-by: Stefan Kandic <154237371+stefankandic@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
2 people authored and cloud-fan committed May 18, 2024
1 parent 5162378 commit 6f6b486
Show file tree
Hide file tree
Showing 13 changed files with 1,004 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,62 @@
* Provides functionality to the UTF8String object which respects defined collation settings.
*/
public final class CollationFactory {

/**
* Identifier for single a collation.
*/
public static class CollationIdentifier {
private final String provider;
private final String name;
private final String version;

public CollationIdentifier(String provider, String collationName, String version) {
this.provider = provider;
this.name = collationName;
this.version = version;
}

public static CollationIdentifier fromString(String identifier) {
long numDots = identifier.chars().filter(ch -> ch == '.').count();
assert(numDots > 0);

if (numDots == 1) {
String[] parts = identifier.split("\\.", 2);
return new CollationIdentifier(parts[0], parts[1], null);
}

String[] parts = identifier.split("\\.", 3);
return new CollationIdentifier(parts[0], parts[1], parts[2]);
}

/**
* Returns the identifier's string value without the version.
* This is used for the table schema as the schema doesn't care about the version,
* only the statistics do.
*/
public String toStringWithoutVersion() {
return String.format("%s.%s", provider, name);
}

public String getProvider() {
return provider;
}

public String getName() {
return name;
}

public Optional<String> getVersion() {
return Optional.ofNullable(version);
}
}

/**
* Entry encapsulating all information about a collation.
*/
public static class Collation {
public final String collationName;
public final String provider;
public final Collator collator;
public final Comparator<UTF8String> comparator;

Expand Down Expand Up @@ -89,6 +140,7 @@ public static class Collation {

public Collation(
String collationName,
String provider,
Collator collator,
Comparator<UTF8String> comparator,
String version,
Expand All @@ -97,6 +149,7 @@ public Collation(
boolean supportsBinaryOrdering,
boolean supportsLowercaseEquality) {
this.collationName = collationName;
this.provider = provider;
this.collator = collator;
this.comparator = comparator;
this.version = version;
Expand All @@ -110,6 +163,8 @@ public Collation(
// No Collation can simultaneously support binary equality and lowercase equality
assert(!supportsBinaryEquality || !supportsLowercaseEquality);

assert(SUPPORTED_PROVIDERS.contains(provider));

if (supportsBinaryEquality) {
this.equalsFunction = UTF8String::equals;
} else {
Expand All @@ -122,13 +177,15 @@ public Collation(
*/
public Collation(
String collationName,
String provider,
Collator collator,
String version,
boolean supportsBinaryEquality,
boolean supportsBinaryOrdering,
boolean supportsLowercaseEquality) {
this(
collationName,
provider,
collator,
(s1, s2) -> collator.compare(s1.toString(), s2.toString()),
version,
Expand All @@ -137,6 +194,11 @@ public Collation(
supportsBinaryOrdering,
supportsLowercaseEquality);
}

/** Returns the collation identifier. */
public CollationIdentifier identifier() {
return new CollationIdentifier(provider, collationName, version);
}
}

private static final Collation[] collationTable = new Collation[4];
Expand All @@ -145,12 +207,17 @@ public Collation(
public static final int UTF8_BINARY_COLLATION_ID = 0;
public static final int UTF8_BINARY_LCASE_COLLATION_ID = 1;

public static final String PROVIDER_SPARK = "spark";
public static final String PROVIDER_ICU = "icu";
public static final List<String> SUPPORTED_PROVIDERS = List.of(PROVIDER_SPARK, PROVIDER_ICU);

static {
// Binary comparison. This is the default collation.
// No custom comparators will be used for this collation.
// Instead, we rely on byte for byte comparison.
collationTable[0] = new Collation(
"UTF8_BINARY",
PROVIDER_SPARK,
null,
UTF8String::binaryCompare,
"1.0",
Expand All @@ -163,6 +230,7 @@ public Collation(
// TODO: Do in place comparisons instead of creating new strings.
collationTable[1] = new Collation(
"UTF8_BINARY_LCASE",
PROVIDER_SPARK,
null,
UTF8String::compareLowerCase,
"1.0",
Expand All @@ -173,13 +241,28 @@ public Collation(

// UNICODE case sensitive comparison (ROOT locale, in ICU).
collationTable[2] = new Collation(
"UNICODE", Collator.getInstance(ULocale.ROOT), "153.120.0.0", true, false, false);
"UNICODE",
PROVIDER_ICU,
Collator.getInstance(ULocale.ROOT),
"153.120.0.0",
true,
false,
false
);

collationTable[2].collator.setStrength(Collator.TERTIARY);
collationTable[2].collator.freeze();

// UNICODE case-insensitive comparison (ROOT locale, in ICU + Secondary strength).
collationTable[3] = new Collation(
"UNICODE_CI", Collator.getInstance(ULocale.ROOT), "153.120.0.0", false, false, false);
"UNICODE_CI",
PROVIDER_ICU,
Collator.getInstance(ULocale.ROOT),
"153.120.0.0",
false,
false,
false
);
collationTable[3].collator.setStrength(Collator.SECONDARY);
collationTable[3].collator.freeze();

Expand Down Expand Up @@ -263,6 +346,18 @@ public static int collationNameToId(String collationName) throws SparkException
}
}

public static void assertValidProvider(String provider) throws SparkException {
if (!SUPPORTED_PROVIDERS.contains(provider.toLowerCase())) {
Map<String, String> params = Map.of(
"provider", provider,
"supportedProviders", String.join(", ", SUPPORTED_PROVIDERS)
);

throw new SparkException(
"COLLATION_INVALID_PROVIDER", SparkException.constructMessageParams(params), null);
}
}

public static Collation fetchCollation(int collationId) {
return collationTable[collationId];
}
Expand Down
12 changes: 12 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,12 @@
],
"sqlState" : "42704"
},
"COLLATION_INVALID_PROVIDER" : {
"message" : [
"The value <provider> does not represent a correct collation provider. Supported providers are: [<supportedProviders>]."
],
"sqlState" : "42704"
},
"COLLATION_MISMATCH" : {
"message" : [
"Could not determine which collation to use for string functions and operators."
Expand Down Expand Up @@ -2342,6 +2348,12 @@
],
"sqlState" : "2203G"
},
"INVALID_JSON_DATA_TYPE_FOR_COLLATIONS" : {
"message" : [
"Collations can only be applied to string types, but the JSON data type is <jsonType>."
],
"sqlState" : "2203G"
},
"INVALID_JSON_ROOT_FIELD" : {
"message" : [
"Cannot convert JSON root field to target Spark type."
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@
"Cannot <condition1> without <condition2>."
]
},
"COLLATION_INVALID_PROVIDER" : {
"message" : [
"The value <provider> does not represent a correct collation provider. Supported providers are: [<supportedProviders>]."
]
},
"COLUMN_IN_LIST": {
"message": [
"`<func_name>` does not allow a Column in a list."
Expand Down Expand Up @@ -357,6 +362,11 @@
"All items in `<arg_name>` should be in <allowed_types>, got <item_type>."
]
},
"INVALID_JSON_DATA_TYPE_FOR_COLLATIONS" : {
"message" : [
"Collations can only be applied to string types, but the JSON data type is <jsonType>."
]
},
"INVALID_MULTIPLE_ARGUMENT_CONDITIONS": {
"message": [
"[{arg_names}] cannot be <condition>."
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/sql/tests/connect/test_parity_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ def test_rdd_with_udt(self):
def test_udt(self):
super().test_udt()

@unittest.skip("Requires JVM access.")
def test_schema_with_collations_json_ser_de(self):
super().test_schema_with_collations_json_ser_de()

@unittest.skip("Does not test anything related to Spark Connect")
def test_parse_datatype_string(self):
super().test_parse_datatype_string()
Expand Down
Loading

0 comments on commit 6f6b486

Please sign in to comment.