Skip to content

Commit

Permalink
Disallow writing to Hudi universal format in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Aug 26, 2024
1 parent 6c56a34 commit 6db2db4
Show file tree
Hide file tree
Showing 28 changed files with 192 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.TIMESTAMP_NTZ_FEATURE_NAME;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.changeDataFeedEnabled;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.deserializeType;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.enabledUniversalFormats;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractPartitionColumns;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.extractSchema;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.generateColumnMetadata;
Expand Down Expand Up @@ -2828,9 +2829,18 @@ private void checkWriteSupported(DeltaLakeTableHandle handle)
if (getColumnIdentities(handle.getMetadataEntry(), handle.getProtocolEntry()).values().stream().anyMatch(identity -> identity)) {
throw new TrinoException(NOT_SUPPORTED, "Writing to tables with identity columns is not supported");
}
checkUnsupportedUniversalFormat(handle.getMetadataEntry());
checkUnsupportedWriterFeatures(handle.getProtocolEntry());
}

public static void checkUnsupportedUniversalFormat(MetadataEntry metadataEntry)
{
List<String> universalFormats = enabledUniversalFormats(metadataEntry);
if (!universalFormats.isEmpty()) {
throw new TrinoException(NOT_SUPPORTED, "Unsupported universal formats: " + universalFormats);
}
}

private static void checkUnsupportedWriterFeatures(ProtocolEntry protocolEntry)
{
Set<String> unsupportedWriterFeatures = unsupportedWriterFeatures(protocolEntry.writerFeatures().orElse(ImmutableSet.of()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.base.util.Procedures.checkProcedureArgument;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.MAX_WRITER_VERSION;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkUnsupportedUniversalFormat;
import static io.trino.plugin.deltalake.DeltaLakeMetadata.checkValidTableHandle;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.getVacuumMinRetention;
import static io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport.DELETION_VECTORS_FEATURE_NAME;
Expand Down Expand Up @@ -183,6 +184,8 @@ private void doVacuum(
accessControl.checkCanInsertIntoTable(null, tableName);
accessControl.checkCanDeleteFromTable(null, tableName);

checkUnsupportedUniversalFormat(handle.getMetadataEntry());

TableSnapshot tableSnapshot = metadata.getSnapshot(session, tableName, handle.getLocation(), Optional.of(handle.getReadVersion()));
ProtocolEntry protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
if (protocolEntry.minWriterVersion() > MAX_WRITER_VERSION) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Enums;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -94,12 +95,16 @@ private DeltaLakeSchemaSupport() {}

private static final Logger log = Logger.get(DeltaLakeSchemaSupport.class);

private static final Splitter SPLITTER = Splitter.on(',').trimResults().omitEmptyStrings().trimResults();

public static final String APPEND_ONLY_CONFIGURATION_KEY = "delta.appendOnly";
public static final String COLUMN_MAPPING_MODE_CONFIGURATION_KEY = "delta.columnMapping.mode";
public static final String COLUMN_MAPPING_PHYSICAL_NAME_CONFIGURATION_KEY = "delta.columnMapping.physicalName";
public static final String MAX_COLUMN_ID_CONFIGURATION_KEY = "delta.columnMapping.maxColumnId";
public static final String ISOLATION_LEVEL_CONFIGURATION_KEY = "delta.isolationLevel";
private static final String DELETION_VECTORS_CONFIGURATION_KEY = "delta.enableDeletionVectors";
// https://github.com/delta-io/delta/blob/master/docs/source/delta-uniform.md
private static final String UNIVERSAL_FORMAT_CONFIGURATION_KEY = "delta.universalFormat.enabledFormats";

// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#valid-feature-names-in-table-features
private static final String APPEND_ONLY_FEATURE_NAME = "appendOnly";
Expand Down Expand Up @@ -199,6 +204,12 @@ public static boolean isDeletionVectorEnabled(MetadataEntry metadataEntry, Proto
return parseBoolean(metadataEntry.getConfiguration().get(DELETION_VECTORS_CONFIGURATION_KEY));
}

public static List<String> enabledUniversalFormats(MetadataEntry metadataEntry)
{
String formats = metadataEntry.getConfiguration().get(UNIVERSAL_FORMAT_CONFIGURATION_KEY);
return formats == null ? ImmutableList.of() : SPLITTER.splitToList(formats);
}

public static ColumnMappingMode getColumnMappingMode(MetadataEntry metadata, ProtocolEntry protocolEntry)
{
if (protocolEntry.supportsReaderFeatures() || protocolEntry.supportsWriterFeatures()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public class TestDeltaLakeBasic
new ResourceTable("liquid_clustering", "deltalake/liquid_clustering"),
new ResourceTable("timestamp_ntz", "databricks131/timestamp_ntz"),
new ResourceTable("timestamp_ntz_partition", "databricks131/timestamp_ntz_partition"),
new ResourceTable("uniform_hudi", "deltalake/uniform_hudi"),
new ResourceTable("uniform_iceberg_v1", "databricks133/uniform_iceberg_v1"),
new ResourceTable("uniform_iceberg_v2", "databricks143/uniform_iceberg_v2"),
new ResourceTable("unsupported_writer_feature", "deltalake/unsupported_writer_feature"),
Expand Down Expand Up @@ -1142,14 +1143,25 @@ public void testLiquidClustering()
assertQueryFails("INSERT INTO liquid_clustering VALUES ('test 3', 2024, 3)", "Unsupported writer features: .*");
}

/**
* @see deltalake.uniform_hudi
*/
@Test
public void testUniFormHudi()
{
assertQuery("SELECT * FROM uniform_hudi", "VALUES (123)");
assertQueryFails("INSERT INTO uniform_hudi VALUES (456)", "\\QUnsupported universal formats: [hudi]");
assertQueryFails("CALL system.vacuum(CURRENT_SCHEMA, 'uniform_hudi', '7d')", "\\QUnsupported universal formats: [hudi]");
}

/**
* @see databricks133.uniform_iceberg_v1
*/
@Test
public void testUniFormIcebergV1()
{
assertQuery("SELECT * FROM uniform_iceberg_v1", "VALUES (1, 'test data')");
assertQueryFails("INSERT INTO uniform_iceberg_v1 VALUES (2, 'new data')", "\\QUnsupported writer features: [icebergCompatV1]");
assertQueryFails("INSERT INTO uniform_iceberg_v1 VALUES (2, 'new data')", "\\QUnsupported universal formats: [iceberg]");
}

/**
Expand All @@ -1159,7 +1171,7 @@ public void testUniFormIcebergV1()
public void testUniFormIcebergV2()
{
assertQuery("SELECT * FROM uniform_iceberg_v2", "VALUES (1, 'test data')");
assertQueryFails("INSERT INTO uniform_iceberg_v2 VALUES (2, 'new data')", "\\QUnsupported writer features: [icebergCompatV2]");
assertQueryFails("INSERT INTO uniform_iceberg_v2 VALUES (2, 'new data')", "\\QUnsupported universal formats: [iceberg]");
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"partitionToWriteStats" : { },
"compacted" : false,
"extraMetadata" : {
"schema" : "{\"type\":\"record\",\"name\":\"struct\",\"fields\":[{\"name\":\"col1\",\"type\":[\"null\",\"int\"]}]}",
"delta-timestamp" : "1718613517384",
"delta-version" : "0"
},
"operationType" : null,
"partitionToReplaceFileIds" : { }
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#Updated at 2024-06-17T08:38:46.008Z
#Mon Jun 17 01:38:46 PDT 2024
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.partition.fields=
hoodie.table.type=COPY_ON_WRITE
hoodie.archivelog.folder=archived
hoodie.populate.meta.fields=false
hoodie.timeline.layout.version=1
hoodie.table.version=6
hoodie.table.metadata.partitions=column_stats,files
hoodie.table.name=b9c8827f_414d_455e_bff6_18b0a2a8987a
hoodie.table.timeline.timezone=UTC
hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
hoodie.datasource.write.hive_style_partitioning=true
hoodie.table.metadata.partitions.inflight=
hoodie.table.checksum=2698693648
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"partitionToWriteStats" : {
"files" : [ {
"fileId" : "files-0000-0",
"path" : "files/files-0000-0_0-0-0_00000000000000010.hfile",
"cdcStats" : null,
"prevCommit" : "null",
"numWrites" : 1,
"numDeletes" : 0,
"numUpdateWrites" : 0,
"numInserts" : 1,
"totalWriteBytes" : 6804,
"totalWriteErrors" : 0,
"tempPath" : null,
"partitionPath" : "files",
"totalLogRecords" : 0,
"totalLogFilesCompacted" : 0,
"totalLogSizeCompacted" : 0,
"totalUpdatedRecordsCompacted" : 0,
"totalLogBlocks" : 0,
"totalCorruptLogBlock" : 0,
"totalRollbackBlocks" : 0,
"fileSizeInBytes" : 6804,
"minEventTime" : null,
"maxEventTime" : null,
"runtimeStats" : {
"totalScanTime" : 0,
"totalUpsertTime" : 0,
"totalCreateTime" : 508
}
} ]
},
"compacted" : false,
"extraMetadata" : {
"schema" : "{\"type\":\"record\",\"name\":\"HoodieMetadataRecord\",\"namespace\":\"org.apache.hudi.avro.model\",\"doc\":\"A record saved within the Metadata Table\",\"fields\":[{\"name\":\"key\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}},{\"name\":\"type\",\"type\":\"int\",\"doc\":\"Type of the metadata record\"},{\"name\":\"filesystemMetadata\",\"type\":[\"null\",{\"type\":\"map\",\"values\":{\"type\":\"record\",\"name\":\"HoodieMetadataFileInfo\",\"fields\":[{\"name\":\"size\",\"type\":\"long\",\"doc\":\"Size of the file\"},{\"name\":\"isDeleted\",\"type\":\"boolean\",\"doc\":\"True if this file has been deleted\"}]},\"avro.java.string\":\"String\"}],\"doc\":\"Contains information about partitions and files within the dataset\"},{\"name\":\"BloomFilterMetadata\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"HoodieMetadataBloomFilter\",\"doc\":\"Data file bloom filter details\",\"fields\":[{\"name\":\"type\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Bloom filter type code\"},{\"name\":\"timestamp\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"},\"doc\":\"Instant timestamp when this metadata was created/updated\"},{\"name\":\"bloomFilter\",\"type\":\"bytes\",\"doc\":\"Bloom filter binary byte array\"},{\"name\":\"isDeleted\",\"type\":\"boolean\",\"doc\":\"Bloom filter entry valid/deleted flag\"}]}],\"doc\":\"Metadata Index of bloom filters for all data files in the user table\",\"default\":null},{\"name\":\"ColumnStatsMetadata\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"HoodieMetadataColumnStats\",\"doc\":\"Data file column statistics\",\"fields\":[{\"name\":\"fileName\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"File name for which this column statistics applies\",\"default\":null},{\"name\":\"columnName\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"Column name for which this column statistics applies\",\"default\":null},{\"name\":\"minValue\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"BooleanWrapper\",\"doc\":\"A record wrapping boolean type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"boolean\"}]},{\"type\":\"record\",\"name\":\"IntWrapper\",\"doc\":\"A record wrapping int type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"int\"}]},{\"type\":\"record\",\"name\":\"LongWrapper\",\"doc\":\"A record wrapping long type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"long\"}]},{\"type\":\"record\",\"name\":\"FloatWrapper\",\"doc\":\"A record wrapping float type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"float\"}]},{\"type\":\"record\",\"name\":\"DoubleWrapper\",\"doc\":\"A record wrapping double type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"double\"}]},{\"type\":\"record\",\"name\":\"BytesWrapper\",\"doc\":\"A record wrapping bytes type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"bytes\"}]},{\"type\":\"record\",\"name\":\"StringWrapper\",\"doc\":\"A record wrapping string type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":{\"type\":\"string\",\"avro.java.string\":\"String\"}}]},{\"type\":\"record\",\"name\":\"DateWrapper\",\"doc\":\"A record wrapping Date logical type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"int\"}]},{\"type\":\"record\",\"name\":\"DecimalWrapper\",\"doc\":\"A record wrapping Decimal logical type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":30,\"scale\":15}}]},{\"type\":\"record\",\"name\":\"TimeMicrosWrapper\",\"doc\":\"A record wrapping Time-micros logical type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":{\"type\":\"long\",\"logicalType\":\"time-micros\"}}]},{\"type\":\"record\",\"name\":\"TimestampMicrosWrapper\",\"doc\":\"A record wrapping Timestamp-micros logical type to be able to be used it w/in Avro's Union\",\"fields\":[{\"name\":\"value\",\"type\":\"long\"}]}],\"doc\":\"Minimum value in the range. Based on user data table schema, we can convert this to appropriate type\",\"default\":null},{\"name\":\"maxValue\",\"type\":[\"null\",\"BooleanWrapper\",\"IntWrapper\",\"LongWrapper\",\"FloatWrapper\",\"DoubleWrapper\",\"BytesWrapper\",\"StringWrapper\",\"DateWrapper\",\"DecimalWrapper\",\"TimeMicrosWrapper\",\"TimestampMicrosWrapper\"],\"doc\":\"Maximum value in the range. Based on user data table schema, we can convert it to appropriate type\",\"default\":null},{\"name\":\"valueCount\",\"type\":[\"null\",\"long\"],\"doc\":\"Total count of values\",\"default\":null},{\"name\":\"nullCount\",\"type\":[\"null\",\"long\"],\"doc\":\"Total count of null values\",\"default\":null},{\"name\":\"totalSize\",\"type\":[\"null\",\"long\"],\"doc\":\"Total storage size on disk\",\"default\":null},{\"name\":\"totalUncompressedSize\",\"type\":[\"null\",\"long\"],\"doc\":\"Total uncompressed storage size on disk\",\"default\":null},{\"name\":\"isDeleted\",\"type\":\"boolean\",\"doc\":\"Column range entry valid/deleted flag\"}]}],\"doc\":\"Metadata Index of column statistics for all data files in the user table\",\"default\":null},{\"name\":\"recordIndexMetadata\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"HoodieRecordIndexInfo\",\"fields\":[{\"name\":\"partitionName\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"Refers to the partition name the record belongs to\",\"default\":null},{\"name\":\"fileIdHighBits\",\"type\":[\"null\",\"long\"],\"doc\":\"Refers to high 64 bits if the fileId is based on UUID format. \\nA UUID based fileId is stored as 3 pieces in RLI (fileIdHighBits, fileIdLowBits and fileIndex). \\nFileID format is {UUID}-{fileIndex}.\",\"default\":null},{\"name\":\"fileIdLowBits\",\"type\":[\"null\",\"long\"],\"doc\":\"Refers to low 64 bits if the fileId is based on UUID format. \\nA UUID based fileId is stored as 3 pieces in RLI (fileIdHighBits, fileIdLowBits and fileIndex). \\nFileID format is {UUID}-{fileIndex}.\",\"default\":null},{\"name\":\"fileIndex\",\"type\":[\"null\",\"int\"],\"doc\":\"Index representing file index which is used to re-construct UUID based fileID. Applicable when the fileId is based on UUID format. \\nA UUID based fileId is stored as 3 pieces in RLI (fileIdHighBits, fileIdLowBits and fileIndex). \\nFileID format is {UUID}-{fileIndex}.\",\"default\":null},{\"name\":\"fileId\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"doc\":\"Represents fileId of the location where record belongs to. When the encoding is 1, fileID is stored in raw string format.\",\"default\":null},{\"name\":\"instantTime\",\"type\":[\"null\",\"long\"],\"doc\":\"Epoch time in millisecond representing the commit time at which record was added\",\"default\":null},{\"name\":\"fileIdEncoding\",\"type\":\"int\",\"doc\":\"Represents fileId encoding. Possible values are 0 and 1. O represents UUID based fileID, and 1 represents raw string format of the fileId. \\nWhen the encoding is 0, reader can deduce fileID from fileIdLowBits, fileIdLowBits and fileIndex.\",\"default\":0}]}],\"doc\":\"Metadata Index that contains information about record keys and their location in the dataset\",\"default\":null}]}"
},
"operationType" : "BULK_INSERT"
}
Loading

0 comments on commit 6db2db4

Please sign in to comment.