Skip to content

Commit

Permalink
Add support for typeWidening(-preview) reader feature in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jun 12, 2024
1 parent 36a6c8a commit e035599
Show file tree
Hide file tree
Showing 36 changed files with 258 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ private DeltaLakeSchemaSupport() {}
private static final String IDENTITY_COLUMNS_FEATURE_NAME = "identityColumns";
private static final String INVARIANTS_FEATURE_NAME = "invariants";
public static final String TIMESTAMP_NTZ_FEATURE_NAME = "timestampNtz";
public static final String TYPE_WIDENING_FEATURE_NAME = "typeWidening";
public static final String TYPE_WIDENING_PREVIEW_FEATURE_NAME = "typeWidening-preview";
public static final String VACUUM_PROTOCOL_CHECK_FEATURE_NAME = "vacuumProtocolCheck";
public static final String VARIANT_TYPE_FEATURE_NAME = "variantType";
public static final String VARIANT_TYPE_PREVIEW_FEATURE_NAME = "variantType-preview";
Expand All @@ -117,6 +119,8 @@ private DeltaLakeSchemaSupport() {}
private static final Set<String> SUPPORTED_READER_FEATURES = ImmutableSet.<String>builder()
.add(COLUMN_MAPPING_FEATURE_NAME)
.add(TIMESTAMP_NTZ_FEATURE_NAME)
.add(TYPE_WIDENING_FEATURE_NAME)
.add(TYPE_WIDENING_PREVIEW_FEATURE_NAME)
.add(DELETION_VECTORS_FEATURE_NAME)
.add(VACUUM_PROTOCOL_CHECK_FEATURE_NAME)
.add(VARIANT_TYPE_FEATURE_NAME)
Expand Down Expand Up @@ -490,18 +494,22 @@ private static DeltaLakeColumnMetadata mapColumn(TypeManager typeManager, JsonNo
OptionalInt fieldId = OptionalInt.empty();
String physicalName;
Type physicalColumnType;
JsonNode metadata = node.get("metadata");
if (metadata.has("delta.typeChanges")) {
metadata.get("delta.typeChanges").elements().forEachRemaining(DeltaLakeSchemaSupport::verifyTypeChange);
}
switch (mappingMode) {
case ID:
String columnMappingId = node.get("metadata").get("delta.columnMapping.id").asText();
String columnMappingId = metadata.get("delta.columnMapping.id").asText();
verify(!isNullOrEmpty(columnMappingId), "id is null or empty");
fieldId = OptionalInt.of(Integer.parseInt(columnMappingId));
// Databricks stores column statistics with physical name
physicalName = node.get("metadata").get("delta.columnMapping.physicalName").asText();
physicalName = metadata.get("delta.columnMapping.physicalName").asText();
verify(!isNullOrEmpty(physicalName), "physicalName is null or empty");
physicalColumnType = buildType(typeManager, typeNode, true);
break;
case NAME:
physicalName = node.get("metadata").get("delta.columnMapping.physicalName").asText();
physicalName = metadata.get("delta.columnMapping.physicalName").asText();
verify(!isNullOrEmpty(physicalName), "physicalName is null or empty");
physicalColumnType = buildType(typeManager, typeNode, true);
break;
Expand All @@ -518,6 +526,19 @@ private static DeltaLakeColumnMetadata mapColumn(TypeManager typeManager, JsonNo
return new DeltaLakeColumnMetadata(columnMetadata, fieldName, fieldId, physicalName, physicalColumnType);
}

private static void verifyTypeChange(JsonNode typeChange)
{
String fromType = typeChange.get("fromType").asText();
String toType = typeChange.get("toType").asText();

if ((fromType.equals("byte") && (toType.equals("short") || toType.equals("integer"))) ||
(fromType.equals("short") && toType.equals("integer"))) {
return;
}
// TODO: Skip unsupported columns instead of throwing an exception
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Type change from '%s' to '%s' is not supported".formatted(fromType, toType));
}

public static Map<String, Object> getColumnTypes(MetadataEntry metadataEntry)
{
return getColumnProperties(metadataEntry, node -> OBJECT_MAPPER.convertValue(node.get("type"), new TypeReference<>(){}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1487,6 +1487,91 @@ private void testReadV2Checkpoint(String resourceName)
assertQuery("SELECT * FROM " + tableName, "VALUES (1, 2)");
}

@Test
public void testTypeWidening()
throws Exception
{
String tableName = "test_type_widening_" + randomNameSuffix();
Path tableLocation = Files.createTempFile(tableName, null);
copyDirectoryContents(new File(Resources.getResource("deltalake/type_widening").toURI()).toPath(), tableLocation);
assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));

assertThat(query("DESCRIBE " + tableName)).result().projected("Column", "Type")
.skippingTypesCheck()
.matches("VALUES ('col', 'integer')");
assertQuery("SELECT * FROM " + tableName, "VALUES 127, 32767, 2147483647");

assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 0"))
.returnsEmptyResult();
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 1"))
.matches("VALUES tinyint '127'");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 2"))
.matches("VALUES smallint '127'");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 3"))
.matches("VALUES smallint '127', smallint '32767'");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 4"))
.matches("VALUES integer '127', integer '32767'");
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 5"))
.matches("VALUES integer '127', integer '32767', integer '2147483647'");
}

@Test
public void testTypeWideningNested()
throws Exception
{
String tableName = "test_type_widening_nestd_" + randomNameSuffix();
Path tableLocation = Files.createTempFile(tableName, null);
copyDirectoryContents(new File(Resources.getResource("deltalake/type_widening_nested").toURI()).toPath(), tableLocation);
assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));

assertThat(query("DESCRIBE " + tableName)).result().projected("Column", "Type")
.skippingTypesCheck()
.matches("VALUES ('s', 'row(field integer)'), ('m', 'map(integer, integer)'), ('a', 'array(integer)')");
assertThat(query("SELECT * FROM " + tableName))
.matches("VALUES " +
"(CAST(ROW(127) AS ROW(field integer)), MAP(ARRAY[-128], ARRAY[127]), ARRAY[127])," +
"(CAST(ROW(32767) AS ROW(field integer)), MAP(ARRAY[-32768], ARRAY[32767]), ARRAY[32767])," +
"(CAST(ROW(2147483647) AS ROW(field integer)), MAP(ARRAY[-2147483648], ARRAY[2147483647]), ARRAY[2147483647])");

assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 0"))
.returnsEmptyResult();
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 1"))
.matches("VALUES (CAST(ROW(127) AS ROW(field tinyint)), MAP(ARRAY[tinyint '-128'], ARRAY[tinyint '127']), ARRAY[tinyint '127'])");

// 2,3,4 versions changed nested fields from byte to short
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 5"))
.matches("VALUES (CAST(ROW(127) AS ROW(field smallint)), MAP(ARRAY[smallint '-128'], ARRAY[smallint '127']), ARRAY[smallint '127'])");

assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 6"))
.matches("VALUES " +
"(CAST(ROW(127) AS ROW(field smallint)), MAP(ARRAY[smallint '-128'], ARRAY[smallint '127']), ARRAY[smallint '127'])," +
"(CAST(ROW(32767) AS ROW(field smallint)), MAP(ARRAY[smallint '-32768'], ARRAY[smallint '32767']), ARRAY[smallint '32767'])");

// 7,8,9 versions changed nested fields from short to int
assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 10"))
.matches("VALUES " +
"(CAST(ROW(127) AS ROW(field integer)), MAP(ARRAY[integer '-128'], ARRAY[integer '127']), ARRAY[integer '127'])," +
"(CAST(ROW(32767) AS ROW(field integer)), MAP(ARRAY[integer '-32768'], ARRAY[integer '32767']), ARRAY[integer '32767'])");

assertThat(query("SELECT * FROM " + tableName + " FOR VERSION AS OF 11"))
.matches("VALUES " +
"(CAST(ROW(127) AS ROW(field integer)), MAP(ARRAY[-128], ARRAY[127]), ARRAY[127])," +
"(CAST(ROW(32767) AS ROW(field integer)), MAP(ARRAY[-32768], ARRAY[32767]), ARRAY[32767])," +
"(CAST(ROW(2147483647) AS ROW(field integer)), MAP(ARRAY[-2147483648], ARRAY[2147483647]), ARRAY[2147483647])");
}

@Test
public void testTypeWideningUnsupported()
throws Exception
{
String tableName = "test_type_widening_" + randomNameSuffix();
Path tableLocation = Files.createTempFile(tableName, null);
copyDirectoryContents(new File(Resources.getResource("deltalake/type_widening_unsupported").toURI()).toPath(), tableLocation);
assertUpdate("CALL system.register_table(CURRENT_SCHEMA, '%s', '%s')".formatted(tableName, tableLocation.toUri()));

assertQueryFails("SELECT * FROM " + tableName, "Type change from 'byte' to 'unsupported' is not supported");
}

private static MetadataEntry loadMetadataEntry(long entryNumber, Path tableLocation)
throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Data generated using OSS Delta Lake 3.2.0:

```sql
CREATE TABLE default.test_type_widening
(col byte)
USING DELTA
LOCATION 's3://test-bucket/databricks-compatibility-test-test-widening'
TBLPROPERTIES ('delta.enableTypeWidening'=true);
INSERT INTO default.test_type_widening VALUES 127;

ALTER TABLE default.test_type_widening CHANGE COLUMN col TYPE short;
INSERT INTO default.test_type_widening VALUES 32767;

ALTER TABLE default.test_type_widening CHANGE COLUMN col TYPE integer;
INSERT INTO default.test_type_widening VALUES 2147483647;
```

Other type widening including from integer to long is not supported in 3.2.0.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1716683236025,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{\"delta.enableTypeWidening\":\"true\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"ad489be2-2283-4cbf-8959-fe1ce69d9877"}}
{"metaData":{"id":"3b653ec1-f12b-40c9-9e27-f05fa9941db3","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1716683235880}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["typeWidening-preview"],"writerFeatures":["typeWidening-preview"]}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716683278791,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"461"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"ba5336d4-ee05-4405-8c84-1694276e49b6"}}
{"add":{"path":"part-00000-564b0b78-9796-45f9-8f3d-6f36fb01929b-c000.snappy.parquet","partitionValues":{},"size":461,"modificationTime":1716683278000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col\":127},\"maxValues\":{\"col\":127},\"nullCount\":{\"col\":0}}","defaultRowCommitVersion":1}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716683348167,"operation":"CHANGE COLUMN","operationParameters":{"column":"{\"name\":\"col\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}}"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"9580b64d-9d56-4fef-bede-35cca56df178"}}
{"metaData":{"id":"3b653ec1-f12b-40c9-9e27-f05fa9941db3","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col\",\"type\":\"short\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"short\",\"fromType\":\"byte\",\"tableVersion\":2}]}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1716683235880}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716683349676,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"538"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"f3405988-757d-4959-96d2-d7142d7162b6"}}
{"add":{"path":"part-00000-e15d8e8c-2e34-4d76-bacf-6315428daba0-c000.snappy.parquet","partitionValues":{},"size":538,"modificationTime":1716683349000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col\":32767},\"maxValues\":{\"col\":32767},\"nullCount\":{\"col\":0}}","defaultRowCommitVersion":3}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716683377041,"operation":"CHANGE COLUMN","operationParameters":{"column":"{\"name\":\"col\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"short\",\"fromType\":\"byte\",\"tableVersion\":2}]}}"},"readVersion":3,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"8f7e22df-f174-46a5-ba63-aa97c99a33cb"}}
{"metaData":{"id":"3b653ec1-f12b-40c9-9e27-f05fa9941db3","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"short\",\"fromType\":\"byte\",\"tableVersion\":2},{\"toType\":\"integer\",\"fromType\":\"short\",\"tableVersion\":4}]}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1716683235880}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716683378212,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":4,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"588"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"eabc76f4-236c-4113-8d8e-df1eface2ac3"}}
{"add":{"path":"part-00000-9dc5a505-e7ae-4958-901d-f05b8f6a5209-c000.snappy.parquet","partitionValues":{},"size":588,"modificationTime":1716683378000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col\":2147483647},\"maxValues\":{\"col\":2147483647},\"nullCount\":{\"col\":0}}","defaultRowCommitVersion":5}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
Data generated using OSS Delta Lake 3.2.0:

```sql
CREATE TABLE default.test_type_widening_nested
(s struct<field: byte>, m map<byte, byte>, a array<byte>)
USING DELTA
LOCATION 's3://test-bucket/databricks-compatibility-test-test-widening-nested'
TBLPROPERTIES ('delta.enableTypeWidening'=true);
INSERT INTO default.test_type_widening_nested VALUES (named_struct('field',127), map(-128,127), array(127));

ALTER TABLE default.test_type_widening_nested CHANGE COLUMN s.field TYPE short;
ALTER TABLE default.test_type_widening_nested CHANGE COLUMN m.key TYPE short;
ALTER TABLE default.test_type_widening_nested CHANGE COLUMN m.value TYPE short;
ALTER TABLE default.test_type_widening_nested CHANGE COLUMN a.element TYPE short;
INSERT INTO default.test_type_widening_nested VALUES (named_struct('field',32767), map(-32768,32767), array(32767));

ALTER TABLE default.test_type_widening_nested CHANGE COLUMN s.field TYPE integer;
ALTER TABLE default.test_type_widening_nested CHANGE COLUMN m.key TYPE integer;
ALTER TABLE default.test_type_widening_nested CHANGE COLUMN m.value TYPE integer;
ALTER TABLE default.test_type_widening_nested CHANGE COLUMN a.element TYPE integer;
INSERT INTO default.test_type_widening_nested VALUES (named_struct('field',2147483647), map(-2147483648,2147483647), array(2147483647));
```

Other type widening including from integer to long is not supported in 3.2.0.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1716686845777,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{\"delta.enableTypeWidening\":\"true\"}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"2ab59e14-0675-443b-86af-4a619fc97bab"}}
{"metaData":{"id":"628f6682-9791-4ee0-a0c4-502239349046","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"s\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"field\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"m\",\"type\":{\"type\":\"map\",\"keyType\":\"byte\",\"valueType\":\"byte\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"a\",\"type\":{\"type\":\"array\",\"elementType\":\"byte\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1716686845629}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["typeWidening-preview"],"writerFeatures":["typeWidening-preview"]}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716686867493,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"1413"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"c162f32f-0e31-4967-943a-66787c2623b9"}}
{"add":{"path":"part-00000-aa99b42e-b5ca-4faf-a9ff-cbcd951cc1b7-c000.snappy.parquet","partitionValues":{},"size":1413,"modificationTime":1716686867000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"s\":{\"field\":127}},\"maxValues\":{\"s\":{\"field\":127}},\"nullCount\":{\"s\":{\"field\":0},\"m\":0,\"a\":0}}","defaultRowCommitVersion":1}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1716686871922,"operation":"CHANGE COLUMN","operationParameters":{"column":"{\"name\":\"s.field\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}}"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.2.0","txnId":"265c814a-5bf9-41cd-9170-99b26b584b31"}}
{"metaData":{"id":"628f6682-9791-4ee0-a0c4-502239349046","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"s\",\"type\":{\"type\":\"struct\",\"fields\":[{\"name\":\"field\",\"type\":\"short\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"short\",\"fromType\":\"byte\",\"tableVersion\":2}]}}]},\"nullable\":true,\"metadata\":{}},{\"name\":\"m\",\"type\":{\"type\":\"map\",\"keyType\":\"byte\",\"valueType\":\"byte\",\"valueContainsNull\":true},\"nullable\":true,\"metadata\":{}},{\"name\":\"a\",\"type\":{\"type\":\"array\",\"elementType\":\"byte\",\"containsNull\":true},\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1716686845629}}
Loading

0 comments on commit e035599

Please sign in to comment.