From 075752b92f04ede70f881651230a2d1b3e3f9817 Mon Sep 17 00:00:00 2001 From: chenjian2664 Date: Fri, 7 Feb 2025 11:40:56 +0800 Subject: [PATCH] Support read 'p' type deletion vectors in Delta Lake --- .../deltalake/delete/DeletionVectors.java | 22 ++++--- .../deltalake/delete/TestDeletionVectors.java | 9 --- .../TestDeltaLakeCloneTableCompatibility.java | 61 +++++++++++++++++++ .../TestDeltaLakeDeleteCompatibility.java | 3 +- 4 files changed, 77 insertions(+), 18 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java index fb7e7a82c55f..2ade9c976859 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/delete/DeletionVectors.java @@ -62,13 +62,21 @@ private DeletionVectors() {} public static RoaringBitmapArray readDeletionVectors(TrinoFileSystem fileSystem, Location location, DeletionVectorEntry deletionVector) throws IOException { - if (deletionVector.storageType().equals(UUID_MARKER)) { - TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(deletionVector.pathOrInlineDv()))); - ByteBuffer buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes()); - return deserializeDeletionVectors(buffer); - } - if (deletionVector.storageType().equals(INLINE_MARKER) || deletionVector.storageType().equals(PATH_MARKER)) { - throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + deletionVector.storageType()); + switch (deletionVector.storageType()) { + case UUID_MARKER -> { + TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(deletionVector.pathOrInlineDv()))); + ByteBuffer buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes()); + return deserializeDeletionVectors(buffer); + } + case PATH_MARKER -> { + TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(deletionVector.pathOrInlineDv())); + if (!inputFile.exists()) { + throw new IllegalArgumentException("Unable to find 'p' type deletion vector by path: " + deletionVector.pathOrInlineDv()); + } + ByteBuffer buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes()); + return deserializeDeletionVectors(buffer); + } + case INLINE_MARKER -> throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + deletionVector.storageType()); } throw new IllegalArgumentException("Unexpected storage type: " + deletionVector.storageType()); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java index c118ac743a08..b91004fefad9 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/delete/TestDeletionVectors.java @@ -47,15 +47,6 @@ public void testUuidStorageType() assertThat(bitmaps.contains(2)).isFalse(); } - @Test - public void testUnsupportedPathStorageType() - { - TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION); - DeletionVectorEntry deletionVector = new DeletionVectorEntry("p", "s3://bucket/table/deletion_vector.bin", OptionalInt.empty(), 40, 1); - assertThatThrownBy(() -> readDeletionVectors(fileSystem, Location.of("s3://bucket/table"), deletionVector)) - .hasMessageContaining("Unsupported storage type for deletion vector: p"); - } - @Test public void testUnsupportedInlineStorageType() { diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCloneTableCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCloneTableCompatibility.java index d773c66a6547..1b2faad6cfe7 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCloneTableCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeCloneTableCompatibility.java @@ -380,6 +380,67 @@ private void testReadSchemaChangedCloneTable(String cloneType, boolean partition } } + @Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testReadShallowCloneTableWithSourceDeletionVector() + { + testReadShallowCloneTableWithSourceDeletionVector(true); + testReadShallowCloneTableWithSourceDeletionVector(false); + } + + private void testReadShallowCloneTableWithSourceDeletionVector(boolean partitioned) + { + String baseTable = "test_dv_base_table_" + randomNameSuffix(); + String clonedTable = "test_dv_clone_table_" + randomNameSuffix(); + String directoryName = "clone-deletion-vector-compatibility-test-"; + try { + onDelta().executeQuery("CREATE TABLE default." + baseTable + + " (a_int INT, b_string STRING) USING delta " + + (partitioned ? "PARTITIONED BY (b_string) " : "") + + "LOCATION 's3://" + bucketName + "/" + directoryName + baseTable + "'" + + "TBLPROPERTIES ('delta.enableDeletionVectors'='true')"); + + onDelta().executeQuery("INSERT INTO " + baseTable + " VALUES (1, 'aaa'), (2, 'aaa'), (3, 'bbb'), (4, 'bbb')"); + // enforce the rows into one file, so that later is partial delete of the data file instead of remove all rows. + // This allows the cloned table to reference the same deletion vector but different offset + // and help us to test the read process of 'p' type deletion vector better. + onDelta().executeQuery("OPTIMIZE " + baseTable); + onDelta().executeQuery("DELETE FROM default." + baseTable + " WHERE a_int IN (2, 3)"); + + onDelta().executeQuery("CREATE TABLE default." + clonedTable + + " SHALLOW CLONE default." + baseTable + + " LOCATION 's3://" + bucketName + "/" + directoryName + clonedTable + "'"); + + List expectedRows = ImmutableList.of(row(1, "aaa"), row(4, "bbb")); + assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable)).containsOnly(expectedRows); + assertThat(onDelta().executeQuery("SELECT * FROM default." + clonedTable)).containsOnly(expectedRows); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable)).containsOnly(expectedRows); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + clonedTable)).containsOnly(expectedRows); + + assertThat(getDeletionVectorType(baseTable)).isNotEqualTo("p"); + assertThat(getDeletionVectorType(clonedTable)).isEqualTo("p"); + } + finally { + onDelta().executeQuery("DROP TABLE IF EXISTS default." + baseTable); + onDelta().executeQuery("DROP TABLE IF EXISTS default." + clonedTable); + } + } + + private static String getDeletionVectorType(String tableName) + { + return (String) onTrino().executeQuery( + """ + SELECT json_extract_scalar(elem, '$.add.deletionVector.storageType') AS storage_type + FROM ( + SELECT CAST(transaction AS JSON) AS json_arr + FROM default."%s$transactions" + ORDER BY version + ) t, UNNEST(CAST(t.json_arr AS ARRAY(JSON))) AS u(elem) + WHERE json_extract_scalar(elem, '$.add.deletionVector.storageType') IS NOT NULL + LIMIT 1 + """.formatted(tableName)) + .getOnlyValue(); + } + private List getActiveDataFiles(String tableName) { return onTrino().executeQuery("SELECT DISTINCT \"$path\" FROM default." + tableName).column(1); diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java index ef24c834f3a8..86dfc5ac83b2 100644 --- a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeDeleteCompatibility.java @@ -595,8 +595,7 @@ public void testDeletionVectorsAbsolutePath() "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-clone-" + baseTableName + "'"); assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected); - assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName)) - .hasMessageContaining("Unsupported storage type for deletion vector: p"); + assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected); } finally { dropDeltaTableWithRetry("default." + baseTableName);