Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support read 'p' type deletion vectors in Delta Lake #24946

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,21 @@ private DeletionVectors() {}
public static RoaringBitmapArray readDeletionVectors(TrinoFileSystem fileSystem, Location location, DeletionVectorEntry deletionVector)
throws IOException
{
if (deletionVector.storageType().equals(UUID_MARKER)) {
TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(deletionVector.pathOrInlineDv())));
ByteBuffer buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes());
return deserializeDeletionVectors(buffer);
}
if (deletionVector.storageType().equals(INLINE_MARKER) || deletionVector.storageType().equals(PATH_MARKER)) {
throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + deletionVector.storageType());
switch (deletionVector.storageType()) {
case UUID_MARKER -> {
TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(deletionVector.pathOrInlineDv())));
ByteBuffer buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes());
return deserializeDeletionVectors(buffer);
}
case PATH_MARKER -> {
TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(deletionVector.pathOrInlineDv()));
if (!inputFile.exists()) {
throw new IllegalArgumentException("Unable to find 'p' type deletion vector by path: " + deletionVector.pathOrInlineDv());
}
ByteBuffer buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes());
return deserializeDeletionVectors(buffer);
}
case INLINE_MARKER -> throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + deletionVector.storageType());
}
throw new IllegalArgumentException("Unexpected storage type: " + deletionVector.storageType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ public void testUuidStorageType()
assertThat(bitmaps.contains(2)).isFalse();
}

@Test
public void testUnsupportedPathStorageType()
{
TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION);
DeletionVectorEntry deletionVector = new DeletionVectorEntry("p", "s3://bucket/table/deletion_vector.bin", OptionalInt.empty(), 40, 1);
assertThatThrownBy(() -> readDeletionVectors(fileSystem, Location.of("s3://bucket/table"), deletionVector))
.hasMessageContaining("Unsupported storage type for deletion vector: p");
}

@Test
public void testUnsupportedInlineStorageType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,67 @@ private void testReadSchemaChangedCloneTable(String cloneType, boolean partition
}
}

@Test(groups = {DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS})
public void testReadShallowCloneTableWithSourceDeletionVector()
{
testReadShallowCloneTableWithSourceDeletionVector(true);
testReadShallowCloneTableWithSourceDeletionVector(false);
}

private void testReadShallowCloneTableWithSourceDeletionVector(boolean partitioned)
{
String baseTable = "test_dv_base_table_" + randomNameSuffix();
String clonedTable = "test_dv_clone_table_" + randomNameSuffix();
String directoryName = "clone-deletion-vector-compatibility-test-";
try {
onDelta().executeQuery("CREATE TABLE default." + baseTable +
" (a_int INT, b_string STRING) USING delta " +
(partitioned ? "PARTITIONED BY (b_string) " : "") +
"LOCATION 's3://" + bucketName + "/" + directoryName + baseTable + "'" +
"TBLPROPERTIES ('delta.enableDeletionVectors'='true')");

onDelta().executeQuery("INSERT INTO " + baseTable + " VALUES (1, 'aaa'), (2, 'aaa'), (3, 'bbb'), (4, 'bbb')");
// enforce the rows into one file, so that later is partial delete of the data file instead of remove all rows.
// This allows the cloned table to reference the same deletion vector but different offset
// and help us to test the read process of 'p' type deletion vector better.
onDelta().executeQuery("OPTIMIZE " + baseTable);
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
onDelta().executeQuery("DELETE FROM default." + baseTable + " WHERE a_int IN (2, 3)");
chenjian2664 marked this conversation as resolved.
Show resolved Hide resolved

onDelta().executeQuery("CREATE TABLE default." + clonedTable +
" SHALLOW CLONE default." + baseTable +
" LOCATION 's3://" + bucketName + "/" + directoryName + clonedTable + "'");

List<Row> expectedRows = ImmutableList.of(row(1, "aaa"), row(4, "bbb"));
assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable)).containsOnly(expectedRows);
assertThat(onDelta().executeQuery("SELECT * FROM default." + clonedTable)).containsOnly(expectedRows);
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable)).containsOnly(expectedRows);
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + clonedTable)).containsOnly(expectedRows);

assertThat(getDeletionVectorType(baseTable)).isNotEqualTo("p");
assertThat(getDeletionVectorType(clonedTable)).isEqualTo("p");
}
finally {
onDelta().executeQuery("DROP TABLE IF EXISTS default." + baseTable);
onDelta().executeQuery("DROP TABLE IF EXISTS default." + clonedTable);
}
}

private static String getDeletionVectorType(String tableName)
{
return (String) onTrino().executeQuery(
"""
SELECT json_extract_scalar(elem, '$.add.deletionVector.storageType') AS storage_type
FROM (
SELECT CAST(transaction AS JSON) AS json_arr
FROM default."%s$transactions"
ORDER BY version
) t, UNNEST(CAST(t.json_arr AS ARRAY(JSON))) AS u(elem)
WHERE json_extract_scalar(elem, '$.add.deletionVector.storageType') IS NOT NULL
LIMIT 1
""".formatted(tableName))
.getOnlyValue();
}

private List<String> getActiveDataFiles(String tableName)
{
return onTrino().executeQuery("SELECT DISTINCT \"$path\" FROM default." + tableName).column(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,7 @@ public void testDeletionVectorsAbsolutePath()
"LOCATION 's3://" + bucketName + "/databricks-compatibility-test-clone-" + baseTableName + "'");

assertThat(onDelta().executeQuery("SELECT * FROM default." + tableName)).contains(expected);
assertQueryFailure(() -> onTrino().executeQuery("SELECT * FROM delta.default." + tableName))
.hasMessageContaining("Unsupported storage type for deletion vector: p");
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + tableName)).contains(expected);
}
finally {
dropDeltaTableWithRetry("default." + baseTableName);
Expand Down