diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java index daa1dcb0207ff..dd3ac663d048b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DefaultHoodieRecordPayload.java @@ -20,6 +20,8 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.common.util.OrderingValueUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; @@ -49,6 +51,8 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload { private AtomicBoolean isDeleteComputed = new AtomicBoolean(false); private boolean isDefaultRecordPayloadDeleted = false; + protected static final Comparable DEFAULT_VALUE = 0; + public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal) { super(record, orderingVal); } @@ -59,11 +63,7 @@ public DefaultHoodieRecordPayload(Option record) { @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { - if (recordBytes.length == 0) { - return Option.empty(); - } - - GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema); + Option incomingRecord = recordBytes.length == 0 ? Option.empty() : Option.of(HoodieAvroUtils.bytesToAvro(recordBytes,schema)); // Null check is needed here to support schema evolution. The record in storage may be from old schema where // the new ordering column might not be present and hence returns null. @@ -74,15 +74,17 @@ public Option combineAndGetUpdateValue(IndexedRecord currentValue /* * We reached a point where the value is disk is older than the incoming record. */ - eventTime = updateEventTime(incomingRecord, properties); + if (incomingRecord.isPresent()) { + eventTime = updateEventTime((GenericRecord) incomingRecord.get(), properties); + } if (!isDeleteComputed.getAndSet(true)) { - isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, properties); + isDefaultRecordPayloadDeleted = incomingRecord.map(record -> isDeleteRecord((GenericRecord) record, properties)).orElse(true); } /* * Now check if the incoming record is a delete record. */ - return isDefaultRecordPayloadDeleted ? Option.empty() : Option.of(incomingRecord); + return isDefaultRecordPayloadDeleted ? Option.empty() : incomingRecord; } @Override @@ -165,7 +167,7 @@ public Option> getMetadata() { } protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue, - IndexedRecord incomingRecord, Properties properties) { + Option incomingRecord, Properties properties) { /* * Combining strategy here returns currentValue on disk if incoming record is older. * The incoming record can be either a delete (sent as an upsert with _hoodie_is_deleted set to true) @@ -185,10 +187,15 @@ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue, Object persistedOrderingVal = HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue, orderField, true, consistentLogicalTimestampEnabled); - Comparable incomingOrderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord, + Comparable incomingOrderingVal = incomingRecord.map(record -> (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) record, orderField, - true, consistentLogicalTimestampEnabled); + true, consistentLogicalTimestampEnabled)).orElse(orderingVal); + if (incomingRecord.isEmpty() && DEFAULT_VALUE.equals(incomingOrderingVal)) { + return true; + } + Pair comparablePair = OrderingValueUtils.canonicalizeOrderingValue((Comparable) persistedOrderingVal, incomingOrderingVal); + persistedOrderingVal = comparablePair.getLeft(); + incomingOrderingVal = comparablePair.getRight(); return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0; } - } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java index a5de6034632cd..e8a37caf40e64 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/EventTimeAvroPayload.java @@ -18,11 +18,14 @@ package org.apache.hudi.common.model; +import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.Option; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.util.OrderingValueUtils; +import org.apache.hudi.common.util.collection.Pair; import java.io.IOException; import java.util.Map; @@ -44,24 +47,34 @@ public EventTimeAvroPayload(Option record) { this(record.isPresent() ? record.get() : null, 0); // natural order } + @Override + public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue) { + if ((recordBytes.length == 0 || isDeletedRecord) && DEFAULT_VALUE.equals(orderingVal)) { + //use natural for delete record + return this; + } + Pair comparablePair = OrderingValueUtils.canonicalizeOrderingValue(oldValue.orderingVal, this.orderingVal); + Comparable oldValueOrderingVal = comparablePair.getLeft(); + Comparable thisOrderingVal = comparablePair.getRight(); + if (oldValueOrderingVal.compareTo(thisOrderingVal) > 0) { + return oldValue; + } else { + return this; + } + } + @Override public Option combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException { /* * Check if the incoming record is a delete record. */ - if (recordBytes.length == 0 || isDeletedRecord) { - return Option.empty(); - } - - GenericRecord incomingRecord = bytesToAvro(recordBytes, schema); - + Option incomingRecord = recordBytes.length == 0 || isDeletedRecord ? Option.empty() : Option.of(HoodieAvroUtils.bytesToAvro(recordBytes,schema)); // Null check is needed here to support schema evolution. The record in storage may be from old schema where // the new ordering column might not be present and hence returns null. if (!needUpdatingPersistedRecord(currentValue, incomingRecord, properties)) { return Option.of(currentValue); } - - return Option.of(incomingRecord); + return incomingRecord; } @Override @@ -77,4 +90,5 @@ public Option getInsertValue(Schema schema, Properties properties public Option> getMetadata() { return Option.empty(); } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java b/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java index 33da44e3bccdc..83d98317bc197 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/FirstValueAvroPayload.java @@ -23,6 +23,8 @@ import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.OrderingValueUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import java.util.Properties; @@ -99,7 +101,7 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload @Override protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue, - IndexedRecord incomingRecord, Properties properties) { + Option incomingRecord, Properties properties) { /* * Combining strategy here returns currentValue on disk if incoming record is older absolutely. * The incoming record can be either a delete (sent as an upsert with _hoodie_is_deleted set to true) @@ -116,9 +118,15 @@ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue, Object persistedOrderingVal = HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue, orderField, true, consistentLogicalTimestampEnabled); - Comparable incomingOrderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord, - orderField, - true, consistentLogicalTimestampEnabled); + Comparable incomingOrderingVal = incomingRecord.map(record -> (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) record, + orderField, + true, consistentLogicalTimestampEnabled)).orElse(orderingVal); + if (incomingRecord.isEmpty() && DEFAULT_VALUE.equals(incomingOrderingVal)) { + return true; + } + Pair comparablePair = OrderingValueUtils.canonicalizeOrderingValue((Comparable) persistedOrderingVal, incomingOrderingVal); + persistedOrderingVal = comparablePair.getLeft(); + incomingOrderingVal = comparablePair.getRight(); return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) < 0; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java index 9106f6b0de5f0..a6523dcab69a8 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java @@ -29,6 +29,8 @@ import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.cdc.HoodieCDCUtils; import org.apache.hudi.common.util.CollectionUtils; +import org.apache.hudi.common.util.OrderingValueUtils; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.HoodieRecordUtils; @@ -282,6 +284,9 @@ protected void processNextDeletedRecord(DeleteRecord deleteRecord) { Comparable curOrderingVal = oldRecord.getOrderingValue(this.readerSchema, this.hoodieTableMetaClient.getTableConfig().getProps()); Comparable deleteOrderingVal = deleteRecord.getOrderingValue(); + Pair comparablePair = OrderingValueUtils.canonicalizeOrderingValue(curOrderingVal, deleteOrderingVal); + curOrderingVal = comparablePair.getLeft(); + deleteOrderingVal = comparablePair.getRight(); // Checks the ordering value does not equal to 0 // because we use 0 as the default value which means natural order boolean choosePrev = !deleteOrderingVal.equals(0) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/OrderingValueUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/OrderingValueUtils.java new file mode 100644 index 0000000000000..7b8e2e9fd9e2a --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/OrderingValueUtils.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * “License”); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.avro.generic.GenericData; +import org.apache.avro.util.Utf8; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.util.collection.Pair; + +import java.math.BigDecimal; + +/** + * Utility class for ordering value canonicalization. Handle compatibility issues between different data types in precombine values. + */ +public class OrderingValueUtils { + /** + * Currently, there are some discrepancies between the types of the ordering value for delete record and normal record during merging. + * E.g., if the precombine field is of STRING type, the ordering value of delete record is a String, while that of normal record is an + * Avro UTF8. We should canonicalize the ordering values before merging. Specifically, STRING and DECIMAL type should be handled here. + *

+ * Processing logic: + * 1. Unifies string types: Converts between Utf8 (Avro) ↔ String (Java) + * 2. Unifies numeric types: Converts between GenericData.Fixed (Avro) ↔ BigDecimal (Java) + * + * @param oldOrder Existing ordering value in the table (typically Avro types) + * @param incomingOrder New incoming ordering value (could be Java types or Avro types) + * @return Pair Canonicalized ordering value pair with consistent types for comparison + */ + public static Pair canonicalizeOrderingValue(Comparable oldOrder, Comparable incomingOrder) { + if (oldOrder instanceof Utf8 && incomingOrder instanceof String) { + // Case 1: Old value is Avro Utf8 type, new value is Java String type + // Convert Utf8 to String to unify as Java String type for comparison + oldOrder = oldOrder.toString(); + } else if (incomingOrder instanceof Utf8 && oldOrder instanceof String) { + // Case 2: New value is Avro Utf8 type, old value is Java String type + // Convert Utf8 to String to unify as Java String type for comparison + incomingOrder = incomingOrder.toString(); + } else if (oldOrder instanceof GenericData.Fixed && incomingOrder instanceof BigDecimal) { + // Case 3: Old value is Avro Fixed type, new value is Java BigDecimal type + // Convert Fixed type to BigDecimal to unify as Java BigDecimal type for comparison + // Fixed type is typically used for storing decimal values (e.g., DECIMAL) + oldOrder = (BigDecimal) HoodieAvroUtils.convertValueForSpecificDataTypes(((GenericData.Fixed) oldOrder).getSchema(), oldOrder, false); + } else if (incomingOrder instanceof GenericData.Fixed && oldOrder instanceof BigDecimal) { + // Case 4: New value is Avro Fixed type, old value is Java BigDecimal type + // Convert Fixed type to BigDecimal to unify as Java BigDecimal type for comparison + incomingOrder = (BigDecimal) HoodieAvroUtils.convertValueForSpecificDataTypes(((GenericData.Fixed) incomingOrder).getSchema(), incomingOrder, false); + } + + // Return canonicalized ordering value pair ensuring both values have consistent types + return Pair.of(oldOrder, incomingOrder); + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 99b0136b52eee..803e12758ecef 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -61,6 +61,7 @@ import org.junit.jupiter.params.provider.ValueSource; import java.io.File; +import java.nio.file.Paths; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; @@ -481,6 +482,124 @@ void testStreamReadWithDeletes() throws Exception { assertRowsEquals(result, expected, true); } + @ParameterizedTest + @MethodSource("sceneTypeAndCompactionEnabled") + void testHardDeleteWithStringOrderingField(boolean isDeletedFirst, boolean doCompaction) throws Exception { + StoragePath storagePath = new StoragePath(Paths.get(tempFile.getAbsolutePath().replace("%5C","\\")).toUri()); + ExecMode execMode = ExecMode.BATCH; + String hoodieTableDDL = "create table t1(\n" + + " uuid varchar(20),\n" + + " name varchar(10),\n" + + " age int,\n" + + " _hoodie_is_deleted boolean,\n" + + " `partition` varchar(20),\n" + + " ts STRING,\n" + + " PRIMARY KEY(uuid) NOT ENFORCED\n" + + ")\n" + + "PARTITIONED BY (`partition`)\n" + + "with (\n" + + " 'connector' = 'hudi',\n" + + " 'table.type' = 'MERGE_ON_READ',\n" + + " 'index.type' = 'BUCKET',\n" + + " 'path' = '" + storagePath + "',\n" + + (doCompaction ? " 'compaction.delta_commits' = '1',\n" : "") + + " 'read.streaming.skip_compaction' = 'false'\n" + + ")"; + batchTableEnv.executeSql(hoodieTableDDL); + + String expected; + String insertInto; + if (isDeletedFirst) { + expected = "[" + + "+I[id1, Danny, 23, false, par1, 101], " + + "+I[id2, Stephen, 33, false, par1, 103]]"; + // first commit + insertInto = "insert into t1 values\n" + + "('id1','Danny',23,false,'par1', '101'),\n" + + "('id2','Stephen',33,false,'par1', '103')"; + execInsertSql(batchTableEnv, insertInto); + // second commit, hard delete record with smaller order value + insertInto = "insert into t1 values\n" + + "('id2','Stephen',33, true,'par1', '102')"; + } else { + // first delete record will be ignored during compaction + expected = doCompaction + ? "[+I[id1, Danny, 23, false, par1, 101], +I[id2, Stephen, 33, false, par1, 102]]" + : "[+I[id1, Danny, 23, false, par1, 101]]"; + // first commit + insertInto = "insert into t1 values\n" + + "('id1','Danny',23,false,'par1', '101'),\n" + + "('id2','Stephen',33,true,'par1', '103')"; + execInsertSql(batchTableEnv, insertInto); + // second commit, hard delete record with smaller order value + insertInto = "insert into t1 values\n" + + "('id2','Stephen',33, false,'par1', '102')"; + } + execInsertSql(batchTableEnv, insertInto); + List result = execSelectSql(batchTableEnv, "select * from t1", execMode); + // no record is deleted. + assertRowsEquals(result, expected); + } + + @ParameterizedTest + @MethodSource("sceneTypeAndCompactionEnabled") + void testHardDeleteWithDecimalOrderingField(boolean isDeletedFirst, boolean doCompaction) throws Exception { + StoragePath storagePath = new StoragePath(Paths.get(tempFile.getAbsolutePath().replace("%5C","\\")).toUri()); + ExecMode execMode = ExecMode.BATCH; + String hoodieTableDDL = "create table t1(\n" + + " uuid varchar(20),\n" + + " name varchar(10),\n" + + " age int,\n" + + " _hoodie_is_deleted boolean,\n" + + " `partition` varchar(20),\n" + + " ts DECIMAL(7,2),\n" + + " PRIMARY KEY(uuid) NOT ENFORCED\n" + + ")\n" + + "PARTITIONED BY (`partition`)\n" + + "with (\n" + + " 'connector' = 'hudi',\n" + + " 'table.type' = 'MERGE_ON_READ',\n" + + " 'index.type' = 'BUCKET',\n" + + " 'path' = '" + storagePath + "',\n" + + (doCompaction ? " 'compaction.delta_commits' = '1',\n" : "") + + " 'read.streaming.skip_compaction' = 'false'\n" + + ")"; + batchTableEnv.executeSql(hoodieTableDDL); + + String expected; + String insertInto; + if (isDeletedFirst) { + expected = "[" + + "+I[id1, Danny, 23, false, par1, 1.10], " + + "+I[id2, Stephen, 33, false, par1, 1.30]]"; + // first commit + insertInto = "insert into t1 values\n" + + "('id1','Danny',23,false,'par1', 1.10),\n" + + "('id2','Stephen',33,false,'par1', 1.30)"; + execInsertSql(batchTableEnv, insertInto); + // second commit, hard delete record with smaller order value + insertInto = "insert into t1 values\n" + + "('id2','Stephen',33, true,'par1', 1.20)"; + } else { + expected = doCompaction + // first delete record will be ignored during compaction + ? "[+I[id1, Danny, 23, false, par1, 1.10], +I[id2, Stephen, 33, false, par1, 1.20]]" + : "[+I[id1, Danny, 23, false, par1, 1.10]]"; + // first commit + insertInto = "insert into t1 values\n" + + "('id1','Danny',23,false,'par1', 1.10),\n" + + "('id2','Stephen',33,true,'par1', 1.30)"; + execInsertSql(batchTableEnv, insertInto); + // second commit, hard delete record with smaller order value + insertInto = "insert into t1 values\n" + + "('id2','Stephen',33, false,'par1', 1.20)"; + } + execInsertSql(batchTableEnv, insertInto); + List result2 = execSelectSql(batchTableEnv, "select * from t1", execMode); + // no record is deleted. + assertRowsEquals(result2, expected); + } + @ParameterizedTest @MethodSource("tableTypeAndBooleanTrueFalseParams") void testStreamReadFilterByPartition(HoodieTableType tableType, boolean hiveStylePartitioning) throws Exception { @@ -2227,6 +2346,19 @@ private static Stream indexAndTableTypeParams() { return Stream.of(data).map(Arguments::of); } + /** + * Return test params => (SceneType, true/false). + */ + private static Stream sceneTypeAndCompactionEnabled() { + Object[][] data = + new Object[][] { + {true, false}, + {false, false}, + {true, true}, + {false, true}}; + return Stream.of(data).map(Arguments::of); + } + private void execInsertSql(TableEnvironment tEnv, String insert) { TableResult tableResult = tEnv.executeSql(insert); // wait to finish diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala index 0989b8b09aee4..cdd5669baf65e 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala @@ -130,7 +130,7 @@ class ExpressionPayload(@transient record: GenericRecord, .serialize(resultingRow) .asInstanceOf[GenericRecord] - if (targetRecord.isEmpty || needUpdatingPersistedRecord(targetRecord.get, resultingAvroRecord, properties)) { + if (targetRecord.isEmpty || needUpdatingPersistedRecord(targetRecord.get, HOption.of(resultingAvroRecord), properties)) { resultRecordOpt = HOption.of(resultingAvroRecord) } else { // if the PreCombine field value of targetRecord is greater