Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.OrderingValueUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -49,6 +51,8 @@ public class DefaultHoodieRecordPayload extends OverwriteWithLatestAvroPayload {
private AtomicBoolean isDeleteComputed = new AtomicBoolean(false);
private boolean isDefaultRecordPayloadDeleted = false;

protected static final Comparable<?> DEFAULT_VALUE = 0;

public DefaultHoodieRecordPayload(GenericRecord record, Comparable orderingVal) {
super(record, orderingVal);
}
Expand All @@ -59,11 +63,7 @@ public DefaultHoodieRecordPayload(Option<GenericRecord> record) {

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
if (recordBytes.length == 0) {
return Option.empty();
}

GenericRecord incomingRecord = HoodieAvroUtils.bytesToAvro(recordBytes, schema);
Option<IndexedRecord> incomingRecord = recordBytes.length == 0 ? Option.empty() : Option.of(HoodieAvroUtils.bytesToAvro(recordBytes,schema));

// Null check is needed here to support schema evolution. The record in storage may be from old schema where
// the new ordering column might not be present and hence returns null.
Expand All @@ -74,15 +74,17 @@ public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue
/*
* We reached a point where the value is disk is older than the incoming record.
*/
eventTime = updateEventTime(incomingRecord, properties);
if (incomingRecord.isPresent()) {
eventTime = updateEventTime((GenericRecord) incomingRecord.get(), properties);
}

if (!isDeleteComputed.getAndSet(true)) {
isDefaultRecordPayloadDeleted = isDeleteRecord(incomingRecord, properties);
isDefaultRecordPayloadDeleted = incomingRecord.map(record -> isDeleteRecord((GenericRecord) record, properties)).orElse(true);
}
/*
* Now check if the incoming record is a delete record.
*/
return isDefaultRecordPayloadDeleted ? Option.empty() : Option.of(incomingRecord);
return isDefaultRecordPayloadDeleted ? Option.empty() : incomingRecord;
}

@Override
Expand Down Expand Up @@ -165,7 +167,7 @@ public Option<Map<String, String>> getMetadata() {
}

protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
IndexedRecord incomingRecord, Properties properties) {
Option<IndexedRecord> incomingRecord, Properties properties) {
/*
* Combining strategy here returns currentValue on disk if incoming record is older.
* The incoming record can be either a delete (sent as an upsert with _hoodie_is_deleted set to true)
Expand All @@ -185,10 +187,15 @@ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
Object persistedOrderingVal = HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
orderField,
true, consistentLogicalTimestampEnabled);
Comparable incomingOrderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
Comparable incomingOrderingVal = incomingRecord.map(record -> (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
orderField,
true, consistentLogicalTimestampEnabled);
true, consistentLogicalTimestampEnabled)).orElse(orderingVal);
if (incomingRecord.isEmpty() && DEFAULT_VALUE.equals(incomingOrderingVal)) {
return true;
}
Pair<Comparable, Comparable> comparablePair = OrderingValueUtils.canonicalizeOrderingValue((Comparable) persistedOrderingVal, incomingOrderingVal);
persistedOrderingVal = comparablePair.getLeft();
incomingOrderingVal = comparablePair.getRight();
return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) <= 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.hudi.common.model;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.Option;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hudi.common.util.OrderingValueUtils;
import org.apache.hudi.common.util.collection.Pair;

import java.io.IOException;
import java.util.Map;
Expand All @@ -44,24 +47,34 @@ public EventTimeAvroPayload(Option<GenericRecord> record) {
this(record.isPresent() ? record.get() : null, 0); // natural order
}

@Override
public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload oldValue) {
if ((recordBytes.length == 0 || isDeletedRecord) && DEFAULT_VALUE.equals(orderingVal)) {
//use natural for delete record
return this;
}
Pair<Comparable, Comparable> comparablePair = OrderingValueUtils.canonicalizeOrderingValue(oldValue.orderingVal, this.orderingVal);
Comparable oldValueOrderingVal = comparablePair.getLeft();
Comparable thisOrderingVal = comparablePair.getRight();
if (oldValueOrderingVal.compareTo(thisOrderingVal) > 0) {
return oldValue;
} else {
return this;
}
}

@Override
public Option<IndexedRecord> combineAndGetUpdateValue(IndexedRecord currentValue, Schema schema, Properties properties) throws IOException {
/*
* Check if the incoming record is a delete record.
*/
if (recordBytes.length == 0 || isDeletedRecord) {
return Option.empty();
}

GenericRecord incomingRecord = bytesToAvro(recordBytes, schema);

Option<IndexedRecord> incomingRecord = recordBytes.length == 0 || isDeletedRecord ? Option.empty() : Option.of(HoodieAvroUtils.bytesToAvro(recordBytes,schema));
// Null check is needed here to support schema evolution. The record in storage may be from old schema where
// the new ordering column might not be present and hence returns null.
if (!needUpdatingPersistedRecord(currentValue, incomingRecord, properties)) {
return Option.of(currentValue);
}

return Option.of(incomingRecord);
return incomingRecord;
}

@Override
Expand All @@ -77,4 +90,5 @@ public Option<IndexedRecord> getInsertValue(Schema schema, Properties properties
public Option<Map<String, String>> getMetadata() {
return Option.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.OrderingValueUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;

import java.util.Properties;
Expand Down Expand Up @@ -99,7 +101,7 @@ public OverwriteWithLatestAvroPayload preCombine(OverwriteWithLatestAvroPayload

@Override
protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
IndexedRecord incomingRecord, Properties properties) {
Option<IndexedRecord> incomingRecord, Properties properties) {
/*
* Combining strategy here returns currentValue on disk if incoming record is older absolutely.
* The incoming record can be either a delete (sent as an upsert with _hoodie_is_deleted set to true)
Expand All @@ -116,9 +118,15 @@ protected boolean needUpdatingPersistedRecord(IndexedRecord currentValue,
Object persistedOrderingVal = HoodieAvroUtils.getNestedFieldVal((GenericRecord) currentValue,
orderField,
true, consistentLogicalTimestampEnabled);
Comparable incomingOrderingVal = (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) incomingRecord,
orderField,
true, consistentLogicalTimestampEnabled);
Comparable incomingOrderingVal = incomingRecord.map(record -> (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
orderField,
true, consistentLogicalTimestampEnabled)).orElse(orderingVal);
if (incomingRecord.isEmpty() && DEFAULT_VALUE.equals(incomingOrderingVal)) {
return true;
}
Pair<Comparable, Comparable> comparablePair = OrderingValueUtils.canonicalizeOrderingValue((Comparable) persistedOrderingVal, incomingOrderingVal);
persistedOrderingVal = comparablePair.getLeft();
incomingOrderingVal = comparablePair.getRight();
return persistedOrderingVal == null || ((Comparable) persistedOrderingVal).compareTo(incomingOrderingVal) < 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.cdc.HoodieCDCUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.OrderingValueUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordUtils;
Expand Down Expand Up @@ -282,6 +284,9 @@ protected void processNextDeletedRecord(DeleteRecord deleteRecord) {

Comparable curOrderingVal = oldRecord.getOrderingValue(this.readerSchema, this.hoodieTableMetaClient.getTableConfig().getProps());
Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
Pair<Comparable, Comparable> comparablePair = OrderingValueUtils.canonicalizeOrderingValue(curOrderingVal, deleteOrderingVal);
curOrderingVal = comparablePair.getLeft();
deleteOrderingVal = comparablePair.getRight();
// Checks the ordering value does not equal to 0
// because we use 0 as the default value which means natural order
boolean choosePrev = !deleteOrderingVal.equals(0)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* “License”); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an “AS IS” BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.common.util;

import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.util.collection.Pair;

import java.math.BigDecimal;

/**
* Utility class for ordering value canonicalization. Handle compatibility issues between different data types in precombine values.
*/
public class OrderingValueUtils {
/**
* Currently, there are some discrepancies between the types of the ordering value for delete record and normal record during merging.
* E.g., if the precombine field is of STRING type, the ordering value of delete record is a String, while that of normal record is an
* Avro UTF8. We should canonicalize the ordering values before merging. Specifically, STRING and DECIMAL type should be handled here.
* <p>
* Processing logic:
* 1. Unifies string types: Converts between Utf8 (Avro) ↔ String (Java)
* 2. Unifies numeric types: Converts between GenericData.Fixed (Avro) ↔ BigDecimal (Java)
*
* @param oldOrder Existing ordering value in the table (typically Avro types)
* @param incomingOrder New incoming ordering value (could be Java types or Avro types)
* @return Pair<Comparable, Comparable> Canonicalized ordering value pair with consistent types for comparison
*/
public static Pair<Comparable, Comparable> canonicalizeOrderingValue(Comparable oldOrder, Comparable incomingOrder) {
if (oldOrder instanceof Utf8 && incomingOrder instanceof String) {
// Case 1: Old value is Avro Utf8 type, new value is Java String type
// Convert Utf8 to String to unify as Java String type for comparison
oldOrder = oldOrder.toString();
} else if (incomingOrder instanceof Utf8 && oldOrder instanceof String) {
// Case 2: New value is Avro Utf8 type, old value is Java String type
// Convert Utf8 to String to unify as Java String type for comparison
incomingOrder = incomingOrder.toString();
} else if (oldOrder instanceof GenericData.Fixed && incomingOrder instanceof BigDecimal) {
// Case 3: Old value is Avro Fixed type, new value is Java BigDecimal type
// Convert Fixed type to BigDecimal to unify as Java BigDecimal type for comparison
// Fixed type is typically used for storing decimal values (e.g., DECIMAL)
oldOrder = (BigDecimal) HoodieAvroUtils.convertValueForSpecificDataTypes(((GenericData.Fixed) oldOrder).getSchema(), oldOrder, false);
} else if (incomingOrder instanceof GenericData.Fixed && oldOrder instanceof BigDecimal) {
// Case 4: New value is Avro Fixed type, old value is Java BigDecimal type
// Convert Fixed type to BigDecimal to unify as Java BigDecimal type for comparison
incomingOrder = (BigDecimal) HoodieAvroUtils.convertValueForSpecificDataTypes(((GenericData.Fixed) incomingOrder).getSchema(), incomingOrder, false);
}

// Return canonicalized ordering value pair ensuring both values have consistent types
return Pair.of(oldOrder, incomingOrder);
}
}
Loading