Skip to content

Commit

Permalink
Support read 'p' type deletion vectors in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
chenjian2664 committed Feb 7, 2025
1 parent b1d2302 commit 7614584
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,21 @@ private DeletionVectors() {}
public static RoaringBitmapArray readDeletionVectors(TrinoFileSystem fileSystem, Location location, DeletionVectorEntry deletionVector)
throws IOException
{
if (deletionVector.storageType().equals(UUID_MARKER)) {
TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(deletionVector.pathOrInlineDv())));
ByteBuffer buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes());
return deserializeDeletionVectors(buffer);
}
if (deletionVector.storageType().equals(INLINE_MARKER) || deletionVector.storageType().equals(PATH_MARKER)) {
throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + deletionVector.storageType());
switch (deletionVector.storageType()) {
case UUID_MARKER -> {
TrinoInputFile inputFile = fileSystem.newInputFile(location.appendPath(toFileName(deletionVector.pathOrInlineDv())));
ByteBuffer buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes());
return deserializeDeletionVectors(buffer);
}
case PATH_MARKER -> {
TrinoInputFile inputFile = fileSystem.newInputFile(Location.of(deletionVector.pathOrInlineDv()));
if (!inputFile.exists()) {
throw new IllegalArgumentException("Unable to find 'p' type deletion vector by path: " + deletionVector.pathOrInlineDv());
}
ByteBuffer buffer = readDeletionVector(inputFile, deletionVector.offset().orElseThrow(), deletionVector.sizeInBytes());
return deserializeDeletionVectors(buffer);
}
case INLINE_MARKER -> throw new TrinoException(NOT_SUPPORTED, "Unsupported storage type for deletion vector: " + deletionVector.storageType());
}
throw new IllegalArgumentException("Unexpected storage type: " + deletionVector.storageType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ public void testUuidStorageType()
assertThat(bitmaps.contains(2)).isFalse();
}

@Test
public void testUnsupportedPathStorageType()
{
TrinoFileSystem fileSystem = HDFS_FILE_SYSTEM_FACTORY.create(SESSION);
DeletionVectorEntry deletionVector = new DeletionVectorEntry("p", "s3://bucket/table/deletion_vector.bin", OptionalInt.empty(), 40, 1);
assertThatThrownBy(() -> readDeletionVectors(fileSystem, Location.of("s3://bucket/table"), deletionVector))
.hasMessageContaining("Unsupported storage type for deletion vector: p");
}

@Test
public void testUnsupportedInlineStorageType()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,50 @@ private void testReadSchemaChangedCloneTable(String cloneType, boolean partition
}
}

@Test
public void testReadShallowCloneTableWithSourceDeletionVector()
{
testReadShallowCloneTableWithSourceDeletionVector(true);
testReadShallowCloneTableWithSourceDeletionVector(false);
}

private void testReadShallowCloneTableWithSourceDeletionVector(boolean partitioned)
{
String baseTable = "test_dv_base_table_" + randomNameSuffix();
String clonedTable = "test_dv_clone_table_" + randomNameSuffix();
String directoryName = "databricks-clone-deletion-vector-compatibility-test-";
try {
onDelta().executeQuery("CREATE TABLE default." + baseTable +
" (a_int INT, b_string STRING) USING delta " +
(partitioned ? "PARTITIONED BY b_string " : "") +
"LOCATION 's3://" + bucketName + "/" + directoryName + baseTable + "'" +
" TBLPROPERTIES (" +
" 'delta.enableDeletionVectors'='true')");

// generate deletion vector
onDelta().executeQuery("INSERT INTO " + baseTable + " VALUES (1, 'aaa'), (2, 'aaa'), (3, 'bbb'), (4, 'bbb')");
onDelta().executeQuery("OPTIMIZE " + baseTable);
onDelta().executeQuery("DELETE FROM default." + baseTable + " WHERE a_int IN (2, 3)");

onDelta().executeQuery("CREATE TABLE default." + clonedTable +
" SHALLOW CLONE default." + baseTable +
" LOCATION 's3://" + bucketName + "/" + directoryName + clonedTable + "'");

List<Row> expectedRows = ImmutableList.of(row(1, "aaa"), row(4, "bbb"));
assertThat(onDelta().executeQuery("SELECT * FROM default." + baseTable))
.containsOnly(expectedRows);
assertThat(onDelta().executeQuery("SELECT * FROM default." + clonedTable))
.containsOnly(expectedRows);
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + baseTable))
.containsOnly(expectedRows);
assertThat(onTrino().executeQuery("SELECT * FROM delta.default." + clonedTable))
.containsOnly(expectedRows);
} finally {
onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + baseTable);
onTrino().executeQuery("DROP TABLE IF EXISTS delta.default." + clonedTable);
}
}

private List<String> getActiveDataFiles(String tableName)
{
return onTrino().executeQuery("SELECT DISTINCT \"$path\" FROM default." + tableName).column(1);
Expand Down

0 comments on commit 7614584

Please sign in to comment.