Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for typeWidening(-preview) reader feature in Delta Lake #22142

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,16 @@ private DeltaLakeSchemaSupport() {}
private static final String IDENTITY_COLUMNS_FEATURE_NAME = "identityColumns";
private static final String INVARIANTS_FEATURE_NAME = "invariants";
public static final String TIMESTAMP_NTZ_FEATURE_NAME = "timestampNtz";
public static final String TYPE_WIDENING_FEATURE_NAME = "typeWidening";
public static final String TYPE_WIDENING_PREVIEW_FEATURE_NAME = "typeWidening-preview";
public static final String VACUUM_PROTOCOL_CHECK_FEATURE_NAME = "vacuumProtocolCheck";
public static final String V2_CHECKPOINT_FEATURE_NAME = "v2Checkpoint";

private static final Set<String> SUPPORTED_READER_FEATURES = ImmutableSet.<String>builder()
.add(COLUMN_MAPPING_FEATURE_NAME)
.add(TIMESTAMP_NTZ_FEATURE_NAME)
.add(TYPE_WIDENING_FEATURE_NAME)
.add(TYPE_WIDENING_PREVIEW_FEATURE_NAME)
.add(DELETION_VECTORS_FEATURE_NAME)
.add(VACUUM_PROTOCOL_CHECK_FEATURE_NAME)
.add(V2_CHECKPOINT_FEATURE_NAME)
Expand Down Expand Up @@ -466,18 +470,22 @@ private static DeltaLakeColumnMetadata mapColumn(TypeManager typeManager, JsonNo
OptionalInt fieldId = OptionalInt.empty();
String physicalName;
Type physicalColumnType;
JsonNode metadata = node.get("metadata");
if (metadata.has("delta.typeChanges")) {
metadata.get("delta.typeChanges").elements().forEachRemaining(DeltaLakeSchemaSupport::verifyTypeChange);
}
switch (mappingMode) {
case ID:
String columnMappingId = node.get("metadata").get("delta.columnMapping.id").asText();
String columnMappingId = metadata.get("delta.columnMapping.id").asText();
verify(!isNullOrEmpty(columnMappingId), "id is null or empty");
fieldId = OptionalInt.of(Integer.parseInt(columnMappingId));
// Databricks stores column statistics with physical name
physicalName = node.get("metadata").get("delta.columnMapping.physicalName").asText();
physicalName = metadata.get("delta.columnMapping.physicalName").asText();
verify(!isNullOrEmpty(physicalName), "physicalName is null or empty");
physicalColumnType = buildType(typeManager, typeNode, true);
break;
case NAME:
physicalName = node.get("metadata").get("delta.columnMapping.physicalName").asText();
physicalName = metadata.get("delta.columnMapping.physicalName").asText();
verify(!isNullOrEmpty(physicalName), "physicalName is null or empty");
physicalColumnType = buildType(typeManager, typeNode, true);
break;
Expand All @@ -494,6 +502,19 @@ private static DeltaLakeColumnMetadata mapColumn(TypeManager typeManager, JsonNo
return new DeltaLakeColumnMetadata(columnMetadata, fieldName, fieldId, physicalName, physicalColumnType);
}

private static void verifyTypeChange(JsonNode typeChange)
{
String fromType = typeChange.get("fromType").asText();
String toType = typeChange.get("toType").asText();

if ((fromType.equals("byte") && (toType.equals("short") || toType.equals("integer"))) ||
(fromType.equals("short") && toType.equals("integer"))) {
return;
}
// TODO: Skip unsupported columns instead of throwing an exception
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Type change from '%s' to '%s' is not supported".formatted(fromType, toType));
}

public static Map<String, Object> getColumnTypes(MetadataEntry metadataEntry)
{
return getColumnProperties(metadataEntry, node -> OBJECT_MAPPER.convertValue(node.get("type"), new TypeReference<>(){}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,91 @@ private void testReadV2Checkpoint(String resourceName)
assertQuery("SELECT * FROM " + tableName, "VALUES (1, 2)");
}

@Test
public void testTypeWidening()
throws Exception
{
String tableName = "test_type_widening_" + randomNameSuffix();
Path tableLocation = Files.createTempFile(tableName, null);
copyDirectoryContents(new File(Resources.getResource("deltalake/type_widening").toURI()).toPath(), tableLocation);
assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));

assertThat(query("DESCRIBE " + tableName)).result().projected("Column", "Type")
.skippingTypesCheck()
.matches("VALUES ('col', 'integer')");
assertQuery("SELECT * FROM " + tableName, "VALUES 127, 32767, 2147483647");

assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 0"))
.returnsEmptyResult();
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 1"))
.matches("VALUES tinyint '127'");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 2"))
.matches("VALUES smallint '127'");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 3"))
.matches("VALUES smallint '127', smallint '32767'");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 4"))
.matches("VALUES integer '127', integer '32767'");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 5"))
.matches("VALUES integer '127', integer '32767', integer '2147483647'");
}

@Test
public void testTypeWideningNested()
throws Exception
{
String tableName = "test_type_widening_nestd_" + randomNameSuffix();
Path tableLocation = Files.createTempFile(tableName, null);
copyDirectoryContents(new File(Resources.getResource("deltalake/type_widening_nested").toURI()).toPath(), tableLocation);
assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));

assertThat(query("DESCRIBE " + tableName)).result().projected("Column", "Type")
.skippingTypesCheck()
.matches("VALUES ('s', 'row(field integer)'), ('m', 'map(integer, integer)'), ('a', 'array(integer)')");
assertThat(query("SELECT * FROM " + tableName))
.matches("VALUES " +
"(CAST(ROW(127) AS ROW(field integer)), MAP(ARRAY[-128], ARRAY[127]), ARRAY[127])," +
"(CAST(ROW(32767) AS ROW(field integer)), MAP(ARRAY[-32768], ARRAY[32767]), ARRAY[32767])," +
"(CAST(ROW(2147483647) AS ROW(field integer)), MAP(ARRAY[-2147483648], ARRAY[2147483647]), ARRAY[2147483647])");

assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 0"))
.returnsEmptyResult();
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 1"))
.matches("VALUES (CAST(ROW(127) AS ROW(field tinyint)), MAP(ARRAY[tinyint '-128'], ARRAY[tinyint '127']), ARRAY[tinyint '127'])");

// 2,3,4 versions changed nested fields from byte to short
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 5"))
.matches("VALUES (CAST(ROW(127) AS ROW(field smallint)), MAP(ARRAY[smallint '-128'], ARRAY[smallint '127']), ARRAY[smallint '127'])");

assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 6"))
.matches("VALUES " +
"(CAST(ROW(127) AS ROW(field smallint)), MAP(ARRAY[smallint '-128'], ARRAY[smallint '127']), ARRAY[smallint '127'])," +
"(CAST(ROW(32767) AS ROW(field smallint)), MAP(ARRAY[smallint '-32768'], ARRAY[smallint '32767']), ARRAY[smallint '32767'])");

// 7,8,9 versions changed nested fields from short to int
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 10"))
.matches("VALUES " +
"(CAST(ROW(127) AS ROW(field integer)), MAP(ARRAY[integer '-128'], ARRAY[integer '127']), ARRAY[integer '127'])," +
"(CAST(ROW(32767) AS ROW(field integer)), MAP(ARRAY[integer '-32768'], ARRAY[integer '32767']), ARRAY[integer '32767'])");

assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 11"))
.matches("VALUES " +
"(CAST(ROW(127) AS ROW(field integer)), MAP(ARRAY[-128], ARRAY[127]), ARRAY[127])," +
"(CAST(ROW(32767) AS ROW(field integer)), MAP(ARRAY[-32768], ARRAY[32767]), ARRAY[32767])," +
"(CAST(ROW(2147483647) AS ROW(field integer)), MAP(ARRAY[-2147483648], ARRAY[2147483647]), ARRAY[2147483647])");
}

@Test
public void testTypeWideningUnsupported()
throws Exception
{
String tableName = "test_type_widening_" + randomNameSuffix();
Path tableLocation = Files.createTempFile(tableName, null);
copyDirectoryContents(new File(Resources.getResource("deltalake/type_widening_unsupported").toURI()).toPath(), tableLocation);
assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));

assertQueryFails("SELECT * FROM " + tableName, "Type change from 'byte' to 'unsupported' is not supported");
}

private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocation)
throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Data generated using OSS Delta Lake 3.2.0:

```sql
CREATE TABLE default.test_type_widening
(col byte)
USING DELTA
LOCATION 's3://test-bucket/databricks-compatibility-test-test-widening'
TBLPROPERTIES ('delta.enableTypeWidening'=true);
INSERT INTO default.test_type_widening VALUES 127;

ALTER TABLE default.test_type_widening CHANGE COLUMN col TYPE short;
INSERT INTO default.test_type_widening VALUES 32767;

ALTER TABLE default.test_type_widening CHANGE COLUMN col TYPE integer;
INSERT INTO default.test_type_widening VALUES 2147483647;
```

Other type widening including from integer to long is not supported in 3.2.0.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1716683236025,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{\"delta.enableTypeWidening\":\"true\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"ad489be2-2283-4cbf-8959-fe1ce69d9877"}}
{"metaData":{"id":"3b653ec1-f12b-40c9-9e27-f05fa9941db3","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1716683235880}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["typeWidening-preview"],"writerFeatures":["typeWidening-preview"]}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716683278791,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"461"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"ba5336d4-ee05-4405-8c84-1694276e49b6"}}
{"add":{"path":"part-00000-564b0b78-9796-45f9-8f3d-6f36fb01929b-c000.snappy.parquet","partitionValues":{},"size":461,"modificationTime":1716683278000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col\":127},\"maxValues\":{\"col\":127},\"nullCount\":{\"col\":0}}","defaultRowCommitVersion":1}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716683348167,"operation":"CHANGE COLUMN","operationParameters":{"column":"{\"name\":\"col\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}}"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"9580b64d-9d56-4fef-bede-35cca56df178"}}
{"metaData":{"id":"3b653ec1-f12b-40c9-9e27-f05fa9941db3","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col\",\"type\":\"short\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"short\",\"fromType\":\"byte\",\"tableVersion\":2}]}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1716683235880}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716683349676,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"538"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"f3405988-757d-4959-96d2-d7142d7162b6"}}
{"add":{"path":"part-00000-e15d8e8c-2e34-4d76-bacf-6315428daba0-c000.snappy.parquet","partitionValues":{},"size":538,"modificationTime":1716683349000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col\":32767},\"maxValues\":{\"col\":32767},\"nullCount\":{\"col\":0}}","defaultRowCommitVersion":3}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716683377041,"operation":"CHANGE COLUMN","operationParameters":{"column":"{\"name\":\"col\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"short\",\"fromType\":\"byte\",\"tableVersion\":2}]}}"},"readVersion":3,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"8f7e22df-f174-46a5-ba63-aa97c99a33cb"}}
{"metaData":{"id":"3b653ec1-f12b-40c9-9e27-f05fa9941db3","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"short\",\"fromType\":\"byte\",\"tableVersion\":2},{\"toType\":\"integer\",\"fromType\":\"short\",\"tableVersion\":4}]}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1716683235880}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716683378212,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":4,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"588"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"eabc76f4-236c-4113-8d8e-df1eface2ac3"}}
{"add":{"path":"part-00000-9dc5a505-e7ae-4958-901d-f05b8f6a5209-c000.snappy.parquet","partitionValues":{},"size":588,"modificationTime":1716683378000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col\":2147483647},\"maxValues\":{\"col\":2147483647},\"nullCount\":{\"col\":0}}","defaultRowCommitVersion":5}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
Data generated using OSS Delta Lake 3.2.0:

```sql
CREATE TABLE default.test_type_widening_nested
(s struct<field: byte>, m map<byte, byte>, a array<byte>)
USING DELTA
LOCATION 's3://test-bucket/databricks-compatibility-test-test-widening-nested'
TBLPROPERTIES ('delta.enableTypeWidening'=true);
INSERT INTO default.test_type_widening_nested VALUES (named_struct('field',127), map(-128,127), array(127));

ALTER TABLE default.test_type_widening_nested CHANGE COLUMN s.field TYPE short;
ALTER TABLE default.test_type_widening_nested CHANGE COLUMN m.key TYPE short;
ALTER TABLE default.test_type_widening_nested CHANGE COLUMN m.value TYPE short;
ALTER TABLE default.test_type_widening_nested CHANGE COLUMN a.element TYPE short;
INSERT INTO default.test_type_widening_nested VALUES (named_struct('field',32767), map(-32768,32767), array(32767));

ALTER TABLE default.test_type_widening_nested CHANGE COLUMN s.field TYPE integer;
ALTER TABLE default.test_type_widening_nested CHANGE COLUMN m.key TYPE integer;
ALTER TABLE default.test_type_widening_nested CHANGE COLUMN m.value TYPE integer;
ALTER TABLE default.test_type_widening_nested CHANGE COLUMN a.element TYPE integer;
INSERT INTO default.test_type_widening_nested VALUES (named_struct('field',2147483647), map(-2147483648,2147483647), array(2147483647));
```

Other type widening including from integer to long is not supported in 3.2.0.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1716686845777,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{\"delta.enableTypeWidening\":\"true\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"2ab59e14-0675-443b-86af-4a619fc97bab"}}
{"metaData":{"id":"628f6682-9791-4ee0-a0c4-502239349046","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"s\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"field\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"m\",\"type\":{\"type\":\"map\",\"keyType\":\"byte\",\"valueType\":\"byte\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"a\",\"type\":{\"type\":\"array\",\"elementType\":\"byte\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1716686845629}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["typeWidening-preview"],"writerFeatures":["typeWidening-preview"]}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716686867493,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"1413"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"c162f32f-0e31-4967-943a-66787c2623b9"}}
{"add":{"path":"part-00000-aa99b42e-b5ca-4faf-a9ff-cbcd951cc1b7-c000.snappy.parquet","partitionValues":{},"size":1413,"modificationTime":1716686867000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"s\":{\"field\":127}},\"maxValues\":{\"s\":{\"field\":127}},\"nullCount\":{\"s\":{\"field\":0},\"m\":0,\"a\":0}}","defaultRowCommitVersion":1}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716686871922,"operation":"CHANGE COLUMN","operationParameters":{"column":"{\"name\":\"s.field\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}}"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"265c814a-5bf9-41cd-9170-99b26b584b31"}}
{"metaData":{"id":"628f6682-9791-4ee0-a0c4-502239349046","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"s\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"field\",\"type\":\"short\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"short\",\"fromType\":\"byte\",\"tableVersion\":2}]}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"m\",\"type\":{\"type\":\"map\",\"keyType\":\"byte\",\"valueType\":\"byte\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"a\",\"type\":{\"type\":\"array\",\"elementType\":\"byte\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1716686845629}}
Loading
Loading