diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java index 061ba21f2b07c..dce2c7064f3cd 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestTableCommand.java @@ -18,7 +18,6 @@ package org.apache.hudi.cli.commands; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.cli.HoodieCLI; import org.apache.hudi.cli.functional.CLIFunctionalTestHarness; import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator; @@ -27,6 +26,8 @@ import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -36,7 +37,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.storage.StoragePath; -import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.BeforeEach; @@ -259,10 +259,10 @@ public void testFetchTableSchema() throws Exception { assertTrue(ShellEvaluationResultUtil.isSuccess(result)); String actualSchemaStr = result.toString().substring(result.toString().indexOf("{")); - Schema actualSchema = new Schema.Parser().parse(actualSchemaStr); + HoodieSchema actualSchema = HoodieSchema.parse(actualSchemaStr); - Schema expectedSchema = new Schema.Parser().parse(schemaStr); - expectedSchema = HoodieAvroUtils.addMetadataFields(expectedSchema); + HoodieSchema expectedSchema = HoodieSchema.parse(schemaStr); + expectedSchema = HoodieSchemaUtils.addMetadataFields(expectedSchema); assertEquals(actualSchema, expectedSchema); File file = File.createTempFile("temp", null); @@ -270,7 +270,7 @@ public void testFetchTableSchema() throws Exception { assertTrue(ShellEvaluationResultUtil.isSuccess(result)); actualSchemaStr = getFileContent(file.getAbsolutePath()); - actualSchema = new Schema.Parser().parse(actualSchemaStr); + actualSchema = HoodieSchema.parse(actualSchemaStr); assertEquals(actualSchema, expectedSchema); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 86efd7db1e086..73be480a83461 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -19,7 +19,6 @@ package org.apache.hudi.client; import org.apache.hudi.avro.AvroSchemaUtils; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.avro.model.HoodieCleanMetadata; import org.apache.hudi.avro.model.HoodieIndexCommitMetadata; import org.apache.hudi.avro.model.HoodieIndexPlan; @@ -46,6 +45,7 @@ import org.apache.hudi.common.model.TableServiceType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -341,15 +341,15 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient()); if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) { InternalSchema internalSchema; - Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField()); + HoodieSchema schema = HoodieSchemaUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField()); if (historySchemaStr.isEmpty()) { - internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> InternalSchemaConverter.convert(HoodieSchema.fromAvroSchema(avroSchema))); + internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> InternalSchemaConverter.convert(schema)); internalSchema.setSchemaId(Long.parseLong(instantTime)); } else { internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime), SerDeHelper.parseSchemas(historySchemaStr)); } - InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema, config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)); + InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema.toAvroSchema(), internalSchema, config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)); if (evolvedSchema.equals(internalSchema)) { metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema)); //TODO save history schema by metaTable @@ -361,7 +361,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr)); } // update SCHEMA_KEY - metadata.addMetadata(SCHEMA_KEY, InternalSchemaConverter.convert(evolvedSchema, avroSchema.getFullName()).toString()); + metadata.addMetadata(SCHEMA_KEY, InternalSchemaConverter.convert(evolvedSchema, schema.getFullName()).toString()); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java index ab9c0ce426b77..a51dbcc396cfb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/ConcurrentSchemaEvolutionTableSchemaGetter.java @@ -35,7 +35,6 @@ import org.apache.hudi.util.Lazy; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import java.io.IOException; import java.util.Arrays; @@ -59,12 +58,12 @@ class ConcurrentSchemaEvolutionTableSchemaGetter { protected final HoodieTableMetaClient metaClient; - private final Lazy> tableSchemaCache; + private final Lazy> tableSchemaCache; private Option latestCommitWithValidSchema = Option.empty(); @VisibleForTesting - public ConcurrentHashMap getTableSchemaCache() { + public ConcurrentHashMap getTableSchemaCache() { return tableSchemaCache.get(); } @@ -89,13 +88,11 @@ private HoodieSchema handlePartitionColumnsIfNeeded(HoodieSchema schema) { return schema; } - public Option getTableAvroSchemaIfPresent(boolean includeMetadataFields, Option instant) { - return getTableAvroSchemaFromTimelineWithCache(instant) // Get table schema from schema evolution timeline. - .map(HoodieSchema::fromAvroSchema) + public Option getTableSchemaIfPresent(boolean includeMetadataFields, Option instant) { + return getTableSchemaFromTimelineWithCache(instant) // Get table schema from schema evolution timeline. .or(this::getTableCreateSchemaWithoutMetaField) // Fall back: read create schema from table config. .map(tableSchema -> includeMetadataFields ? HoodieSchemaUtils.addMetadataFields(tableSchema, false) : HoodieSchemaUtils.removeMetadataFields(tableSchema)) - .map(this::handlePartitionColumnsIfNeeded) - .map(HoodieSchema::toAvroSchema); + .map(this::handlePartitionColumnsIfNeeded); } private Option getTableCreateSchemaWithoutMetaField() { @@ -112,16 +109,16 @@ private Option getCachedLatestCommitWithValidSchema() { } @VisibleForTesting - Option getTableAvroSchemaFromTimelineWithCache(Option instantTime) { - return getTableAvroSchemaFromTimelineWithCache(computeSchemaEvolutionTimelineInReverseOrder(), instantTime); + Option getTableSchemaFromTimelineWithCache(Option instantTime) { + return getTableSchemaFromTimelineWithCache(computeSchemaEvolutionTimelineInReverseOrder(), instantTime); } // [HUDI-9112] simplify the logic - Option getTableAvroSchemaFromTimelineWithCache(Stream reversedTimelineStream, Option instantTime) { + Option getTableSchemaFromTimelineWithCache(Stream reversedTimelineStream, Option instantTime) { // If instantTime is empty it means read the latest one. In that case, get the cached instant if there is one. boolean fetchFromLastValidCommit = instantTime.isEmpty(); Option targetInstant = instantTime.or(getCachedLatestCommitWithValidSchema()); - Schema cachedTableSchema = null; + HoodieSchema cachedTableSchema = null; // Try cache first if there is a target instant to fetch for. if (!targetInstant.isEmpty()) { @@ -130,7 +127,7 @@ Option getTableAvroSchemaFromTimelineWithCache(Stream rev // Cache miss on either latestCommitWithValidSchema or commitMetadataCache. Compute the result. if (cachedTableSchema == null) { - Option> instantWithSchema = getLastCommitMetadataWithValidSchemaFromTimeline(reversedTimelineStream, targetInstant); + Option> instantWithSchema = getLastCommitMetadataWithValidSchemaFromTimeline(reversedTimelineStream, targetInstant); if (instantWithSchema.isPresent()) { targetInstant = Option.of(instantWithSchema.get().getLeft()); cachedTableSchema = instantWithSchema.get().getRight(); @@ -159,10 +156,10 @@ Option getTableAvroSchemaFromTimelineWithCache(Stream rev } @VisibleForTesting - Option> getLastCommitMetadataWithValidSchemaFromTimeline(Stream reversedTimelineStream, Option instant) { + Option> getLastCommitMetadataWithValidSchemaFromTimeline(Stream reversedTimelineStream, Option instant) { // To find the table schema given an instant time, need to walk backwards from the latest instant in // the timeline finding a completed instant containing a valid schema. - ConcurrentHashMap tableSchemaAtInstant = new ConcurrentHashMap<>(); + ConcurrentHashMap tableSchemaAtInstant = new ConcurrentHashMap<>(); Option instantWithTableSchema = Option.fromJavaOptional(reversedTimelineStream // If a completion time is specified, find the first eligible instant in the schema evolution timeline. // Should switch to completion time based. @@ -179,7 +176,7 @@ Option> getLastCommitMetadataWithValidSchemaFromTime String schemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY); boolean isValidSchemaStr = !StringUtils.isNullOrEmpty(schemaStr); if (isValidSchemaStr) { - tableSchemaAtInstant.putIfAbsent(s, new Schema.Parser().parse(schemaStr)); + tableSchemaAtInstant.putIfAbsent(s, HoodieSchema.parse(schemaStr)); } return isValidSchemaStr; } catch (IOException e) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SchemaConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SchemaConflictResolutionStrategy.java index 36845c688c78f..2eb09bd27d6b6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SchemaConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SchemaConflictResolutionStrategy.java @@ -21,6 +21,7 @@ import org.apache.hudi.ApiMaturityLevel; import org.apache.hudi.PublicAPIClass; import org.apache.hudi.PublicAPIMethod; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; @@ -28,8 +29,6 @@ import org.apache.hudi.exception.HoodieWriteConflictException; import org.apache.hudi.table.HoodieTable; -import org.apache.avro.Schema; - /** * Strategy interface for schema conflict resolution with multiple writers. * Users can provide pluggable implementations for different kinds of strategies to resolve conflicts when multiple @@ -50,14 +49,14 @@ public interface SchemaConflictResolutionStrategy { * @throws HoodieWriteConflictException if schema conflicts cannot be resolved. */ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) - Option resolveConcurrentSchemaEvolution( + Option resolveConcurrentSchemaEvolution( HoodieTable table, HoodieWriteConfig config, Option lastCompletedTxnOwnerInstant, Option currTxnOwnerInstant); static void throwConcurrentSchemaEvolutionException( - Option tableSchemaAtTxnStart, Option tableSchemaAtTxnValidation, Schema writerSchemaOfTxn, + Option tableSchemaAtTxnStart, Option tableSchemaAtTxnValidation, HoodieSchema writerSchemaOfTxn, Option lastCompletedTxnOwnerInstant, Option currTxnOwnerInstant) throws HoodieWriteConflictException { String errMsg = String.format( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java index 688f14c38d42f..0ae986cd0f78d 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleSchemaConflictResolutionStrategy.java @@ -18,7 +18,8 @@ package org.apache.hudi.client.transaction; -import org.apache.hudi.avro.AvroSchemaComparatorForSchemaEvolution; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaComparatorForSchemaEvolution; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; @@ -28,11 +29,9 @@ import org.apache.hudi.table.HoodieTable; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import java.util.stream.Stream; -import static org.apache.hudi.avro.HoodieAvroUtils.isSchemaNull; import static org.apache.hudi.client.transaction.SchemaConflictResolutionStrategy.throwConcurrentSchemaEvolutionException; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION; import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS; @@ -46,7 +45,7 @@ public class SimpleSchemaConflictResolutionStrategy implements SchemaConflictResolutionStrategy { @Override - public Option resolveConcurrentSchemaEvolution( + public Option resolveConcurrentSchemaEvolution( HoodieTable table, HoodieWriteConfig config, Option lastCompletedTxnOwnerInstant, @@ -66,10 +65,10 @@ public Option resolveConcurrentSchemaEvolution( return Option.empty(); } - Schema writerSchemaOfTxn = new Schema.Parser().parse(config.getWriteSchema()); + HoodieSchema writerSchemaOfTxn = HoodieSchema.parse(config.getWriteSchema()); // If a writer does not come with a meaningful schema, skip the schema resolution. ConcurrentSchemaEvolutionTableSchemaGetter schemaResolver = new ConcurrentSchemaEvolutionTableSchemaGetter(table.getMetaClient()); - if (isSchemaNull(writerSchemaOfTxn)) { + if (writerSchemaOfTxn.isSchemaNull()) { return getTableSchemaAtInstant(schemaResolver, currTxnOwnerInstant.get()); } @@ -98,14 +97,14 @@ public Option resolveConcurrentSchemaEvolution( return Option.of(writerSchemaOfTxn); } - Option tableSchemaAtTxnValidation = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnValidation); + Option tableSchemaAtTxnValidation = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnValidation); // If table schema is not defined, it's still case 1. There can be cases where there are commits but they didn't // write any data. if (!tableSchemaAtTxnValidation.isPresent()) { return Option.of(writerSchemaOfTxn); } // Case 2, 4, 7: Both writers try to evolve to the same schema or neither evolves schema. - boolean writerSchemaIsCurrentTableSchema = AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnValidation.get()); + boolean writerSchemaIsCurrentTableSchema = HoodieSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnValidation.get()); if (writerSchemaIsCurrentTableSchema) { return Option.of(writerSchemaOfTxn); } @@ -122,7 +121,7 @@ public Option resolveConcurrentSchemaEvolution( throwConcurrentSchemaEvolutionException( Option.empty(), tableSchemaAtTxnValidation, writerSchemaOfTxn, lastCompletedTxnOwnerInstant, currTxnOwnerInstant); } - Option tableSchemaAtTxnStart = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnStart); + Option tableSchemaAtTxnStart = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnStart); // If no table schema is defined, fall back to case 3. if (!tableSchemaAtTxnStart.isPresent()) { throwConcurrentSchemaEvolutionException( @@ -132,13 +131,13 @@ public Option resolveConcurrentSchemaEvolution( // Case 5: // Table schema has not changed from the start of the transaction till the pre-commit validation // If table schema parsing failed we will blindly go with writer schema. use option.empty - if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(tableSchemaAtTxnStart.get(), tableSchemaAtTxnValidation.get())) { + if (HoodieSchemaComparatorForSchemaEvolution.schemaEquals(tableSchemaAtTxnStart.get(), tableSchemaAtTxnValidation.get())) { return Option.of(writerSchemaOfTxn); } // Case 6: Current txn does not evolve schema, the tableSchema we saw at validation phase // might be an evolved one, use it. - if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnStart.get())) { + if (HoodieSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnStart.get())) { return tableSchemaAtTxnValidation; } @@ -164,9 +163,9 @@ private Option getInstantInTimelineImmediatelyPriorToTimestamp( .findFirst()); } - private static Option getTableSchemaAtInstant(ConcurrentSchemaEvolutionTableSchemaGetter schemaResolver, HoodieInstant instant) { + private static Option getTableSchemaAtInstant(ConcurrentSchemaEvolutionTableSchemaGetter schemaResolver, HoodieInstant instant) { try { - return schemaResolver.getTableAvroSchemaIfPresent(false, Option.of(instant)); + return schemaResolver.getTableSchemaIfPresent(false, Option.of(instant)); } catch (Exception ex) { log.error("Cannot get table schema for instant {}", instant); throw new HoodieException("Unable to get table schema", ex); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java index 8e26add429923..e8cf8e7f9d282 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java @@ -23,6 +23,7 @@ import org.apache.hudi.client.transaction.SimpleSchemaConflictResolutionStrategy; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; @@ -36,7 +37,6 @@ import org.apache.hudi.table.HoodieTable; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import java.io.IOException; import java.util.Map; @@ -78,7 +78,7 @@ public static Option resolveWriteConflictIfAny( Stream completedInstantsDuringCurrentWriteOperation = getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), pendingInstants); ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy(); - Option newTableSchema = resolveSchemaConflictIfNeeded(table, config, lastCompletedTxnOwnerInstant, currentTxnOwnerInstant); + Option newTableSchema = resolveSchemaConflictIfNeeded(table, config, lastCompletedTxnOwnerInstant, currentTxnOwnerInstant); Stream instantStream = Stream.concat(resolutionStrategy.getCandidateInstants( table.getMetaClient(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant), @@ -117,7 +117,7 @@ public static Option resolveWriteConflictIfAny( * @param currentTxnOwnerInstant current instant * @return new table schema after successful schema resolution; empty if nothing to be resolved. */ - public static Option resolveSchemaConflictIfNeeded(final HoodieTable table, + public static Option resolveSchemaConflictIfNeeded(final HoodieTable table, final HoodieWriteConfig config, final Option lastCompletedTxnOwnerInstant, final Option currentTxnOwnerInstant) { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java index dca188c8ed02c..86f141bd6319a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java @@ -40,6 +40,8 @@ import org.apache.hudi.common.model.MetadataValues; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaCache; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -61,6 +63,7 @@ import org.apache.hudi.common.util.collection.CloseableMappingIterator; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieIndexException; import org.apache.hudi.exception.HoodieMetadataIndexException; @@ -77,8 +80,6 @@ import org.apache.hudi.table.HoodieTable; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; import java.io.IOException; import java.util.ArrayList; @@ -91,7 +92,6 @@ import java.util.stream.Collectors; import static java.util.stream.Collectors.toList; -import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema; import static org.apache.hudi.common.config.HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP; import static org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER; import static org.apache.hudi.common.util.HoodieRecordUtils.getOrderingFieldNames; @@ -136,10 +136,11 @@ public static List getLatestBaseFilesForPartition(String partiti * @param tableSchema table schema * @return true if each field's data type are supported for secondary index, false otherwise */ - static boolean validateDataTypeForSecondaryIndex(List sourceFields, Schema tableSchema) { + static boolean validateDataTypeForSecondaryIndex(List sourceFields, HoodieSchema tableSchema) { return sourceFields.stream().allMatch(fieldToIndex -> { - Schema schema = getNestedFieldSchemaFromWriteSchema(tableSchema, fieldToIndex); - return isSecondaryIndexSupportedType(schema); + Pair schema = HoodieSchemaUtils.getNestedField(tableSchema, fieldToIndex) + .orElseThrow(() -> new HoodieException("Failed to get schema. Not a valid field name: " + fieldToIndex)); + return isSecondaryIndexSupportedType(schema.getRight().schema()); }); } @@ -150,10 +151,14 @@ static boolean validateDataTypeForSecondaryIndex(List sourceFields, Sche * @param tableSchema table schema * @return true if each field's data types are supported, false otherwise */ - public static boolean validateDataTypeForSecondaryOrExpressionIndex(List sourceFields, Schema tableSchema) { + public static boolean validateDataTypeForSecondaryOrExpressionIndex(List sourceFields, HoodieSchema tableSchema) { return sourceFields.stream().anyMatch(fieldToIndex -> { - Schema schema = getNestedFieldSchemaFromWriteSchema(tableSchema, fieldToIndex); - return schema.getType() != Schema.Type.RECORD && schema.getType() != Schema.Type.ARRAY && schema.getType() != Schema.Type.MAP; + Pair nestedField = HoodieSchemaUtils.getNestedField(tableSchema, fieldToIndex) + .orElseThrow(() -> new HoodieException("Failed to get schema. Not a valid field name: " + fieldToIndex)); + HoodieSchema fieldSchema = nestedField.getRight().schema(); + return fieldSchema.getType() != HoodieSchemaType.RECORD + && fieldSchema.getType() != HoodieSchemaType.ARRAY + && fieldSchema.getType() != HoodieSchemaType.MAP; }); } @@ -161,39 +166,27 @@ public static boolean validateDataTypeForSecondaryOrExpressionIndex(List * Check if the given schema type is supported for secondary index. * Supported types are: String (including CHAR), Integer types (Int, BigInt, Long, Short), and timestamp */ - private static boolean isSecondaryIndexSupportedType(Schema schema) { + private static boolean isSecondaryIndexSupportedType(HoodieSchema schema) { // Handle union types (nullable fields) - if (schema.getType() == Schema.Type.UNION) { + if (schema.getType() == HoodieSchemaType.UNION) { // For union types, check if any of the types is supported return schema.getTypes().stream() - .anyMatch(s -> s.getType() != Schema.Type.NULL && isSecondaryIndexSupportedType(s)); + .anyMatch(s -> s.getType() != HoodieSchemaType.NULL && isSecondaryIndexSupportedType(s)); } // Check basic types switch (schema.getType()) { case STRING: - // STRING type can have UUID logical type which we don't support - return schema.getLogicalType() == null; // UUID and other string-based logical types are not supported - // Regular STRING (includes CHAR) case INT: - // INT type can represent regular integers or dates/times with logical types - if (schema.getLogicalType() != null) { - // Support date and time-millis logical types - return schema.getLogicalType() == LogicalTypes.date() - || schema.getLogicalType() == LogicalTypes.timeMillis(); - } - return true; // Regular INT case LONG: - // LONG type can represent regular longs or timestamps with logical types - if (schema.getLogicalType() != null) { - // Support timestamp logical types - return schema.getLogicalType() == LogicalTypes.timestampMillis() - || schema.getLogicalType() == LogicalTypes.timestampMicros() - || schema.getLogicalType() == LogicalTypes.timeMicros(); - } - return true; // Regular LONG case DOUBLE: - return true; // Support DOUBLE type + case FLOAT: + case DATE: + case TIME: + return true; + case TIMESTAMP: + // LOCAL timestamps are not supported + return ((HoodieSchema.Timestamp) schema).isUtcAdjusted(); default: return false; } @@ -288,7 +281,7 @@ public static List> filterKeysFromFile(StoragePath filePath, HoodieStorage storage) throws HoodieIndexException { checkArgument(FSUtils.isBaseFile(filePath)); List> foundRecordKeys = new ArrayList<>(); - log.info(String.format("Going to filter %d keys from file %s", candidateRecordKeys.size(), filePath)); + log.info("Going to filter {} keys from file {}", candidateRecordKeys.size(), filePath); try (HoodieFileReader fileReader = HoodieIOFactory.getIOFactory(storage) .getReaderFactory(HoodieRecordType.AVRO) .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, filePath)) { @@ -721,40 +714,32 @@ static void validateEligibilityForSecondaryOrExpressionIndex(HoodieTableMetaClie Map options, Map> columns, String userIndexName) throws Exception { - Schema tableSchema = new TableSchemaResolver(metaClient).getTableAvroSchema(); + HoodieSchema tableSchema = new TableSchemaResolver(metaClient).getTableSchema(); List sourceFields = new ArrayList<>(columns.keySet()); String columnName = sourceFields.get(0); // We know there's only one column from the check above // First check if the field exists - try { - getNestedFieldSchemaFromWriteSchema(tableSchema, columnName); - } catch (Exception e) { - throw new HoodieMetadataIndexException(String.format( + Pair fieldSchema = HoodieSchemaUtils.getNestedField(tableSchema, columnName) + .orElseThrow(() -> new HoodieMetadataIndexException(String.format( "Cannot create %s index '%s': Column '%s' does not exist in the table schema. " - + "Please verify the column name and ensure it exists in the table.", + + "Please verify the column name and ensure it exists in the table.", indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ? "secondary" : "expression", - userIndexName, columnName)); - } + userIndexName, columnName))); // Check for complex types (RECORD, ARRAY, MAP) - not supported for any index type if (!validateDataTypeForSecondaryOrExpressionIndex(sourceFields, tableSchema)) { - Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(tableSchema, columnName); throw new HoodieMetadataIndexException(String.format( "Cannot create %s index '%s': Column '%s' has unsupported data type '%s'. " + "Complex types (RECORD, ARRAY, MAP) are not supported for indexing. " + "Please choose a column with a primitive data type.", indexType.equals(PARTITION_NAME_SECONDARY_INDEX) ? "secondary" : "expression", - userIndexName, columnName, fieldSchema.getType())); + userIndexName, columnName, fieldSchema.getRight().schema().getType())); } // For secondary index, apply stricter data type validation if (indexType.equals(PARTITION_NAME_SECONDARY_INDEX)) { if (!validateDataTypeForSecondaryIndex(sourceFields, tableSchema)) { - Schema fieldSchema = getNestedFieldSchemaFromWriteSchema(tableSchema, columnName); - String actualType = fieldSchema.getType().toString(); - if (fieldSchema.getLogicalType() != null) { - actualType += " with logical type " + fieldSchema.getLogicalType(); - } + String actualType = fieldSchema.getRight().schema().getType().toString(); throw new HoodieMetadataIndexException(String.format( "Cannot create secondary index '%s': Column '%s' has unsupported data type '%s'. " diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java index a3e955709ffaf..ddf4524b4a2d2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java @@ -46,6 +46,7 @@ import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.HoodieTableVersion; @@ -80,7 +81,6 @@ import org.apache.hudi.util.Lazy; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; import java.util.ArrayList; import java.util.Collection; @@ -96,7 +96,6 @@ import java.util.stream.Stream; import static java.util.stream.Collectors.toList; -import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_ASYNC_CLEAN; import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_CLEANER_COMMITS_RETAINED; import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS; @@ -413,14 +412,15 @@ public static HoodieData convertMetadataToPartitionStatRecords(Hoo HoodieTableMetadata tableMetadata, HoodieMetadataConfig metadataConfig, Option recordTypeOpt, boolean isDeletePartition) { try { - Option writerSchema = + Option writerSchema = Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)) .flatMap(writerSchemaStr -> isNullOrEmpty(writerSchemaStr) ? Option.empty() - : Option.of(new Schema.Parser().parse(writerSchemaStr))); + : Option.of(HoodieSchema.parse(writerSchemaStr))); HoodieTableConfig tableConfig = dataMetaClient.getTableConfig(); - Option tableSchema = writerSchema.map(schema -> tableConfig.populateMetaFields() ? addMetadataFields(schema) : schema).map(HoodieSchema::fromAvroSchema); + Option tableSchema = writerSchema.map(schema -> tableConfig.populateMetaFields() ? HoodieSchemaUtils.addMetadataFields(schema) : schema); + if (tableSchema.isEmpty()) { return engineContext.emptyHoodieData(); } diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java index 4e02d74d1983e..909ca863bfecb 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConcurrentSchemaEvolutionTableSchemaGetter.java @@ -32,6 +32,8 @@ import org.apache.hudi.common.model.HoodieReplaceCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.versioning.clean.CleanPlanV2MigrationHandler; @@ -41,7 +43,6 @@ import org.apache.hudi.common.testutils.HoodieTestUtils; import org.apache.hudi.common.util.Option; -import org.apache.avro.Schema; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -57,7 +58,6 @@ import java.util.Properties; import java.util.stream.Stream; -import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields; import static org.apache.hudi.common.table.HoodieTableConfig.PARTITION_FIELDS; import static org.apache.hudi.common.table.timeline.HoodieTimeline.CLUSTERING_ACTION; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; @@ -122,10 +122,10 @@ public class TestConcurrentSchemaEvolutionTableSchemaGetter extends HoodieCommon + " {\"name\":\"partitionColumn\",\"type\":[\"null\",\"string\"],\"doc\":\"\",\"default\":null}" + " ]\n" + "}"; - private static Schema SCHEMA_WITHOUT_METADATA2 = new Schema.Parser().parse(SCHEMA_WITHOUT_METADATA_STR2); - private static Schema SCHEMA_WITHOUT_METADATA = new Schema.Parser().parse(SCHEMA_WITHOUT_METADATA_STR); - private static Schema SCHEMA_WITH_METADATA = addMetadataFields(SCHEMA_WITHOUT_METADATA, false); - private static Schema SCHEMA_WITH_PARTITION_COLUMN = new Schema.Parser().parse(SCHEMA_WITH_PARTITION_COLUMN_STR); + private static final HoodieSchema SCHEMA_WITHOUT_METADATA2 = HoodieSchema.parse(SCHEMA_WITHOUT_METADATA_STR2); + private static final HoodieSchema SCHEMA_WITHOUT_METADATA = HoodieSchema.parse(SCHEMA_WITHOUT_METADATA_STR); + private static final HoodieSchema SCHEMA_WITH_METADATA = HoodieSchemaUtils.addMetadataFields(SCHEMA_WITHOUT_METADATA); + private static final HoodieSchema SCHEMA_WITH_PARTITION_COLUMN = HoodieSchema.parse(SCHEMA_WITH_PARTITION_COLUMN_STR); @BeforeEach public void setUp() throws Exception { @@ -195,7 +195,7 @@ void testGetTableSchemaFromLatestCommitMetadata(boolean enableMetadata, HoodieTa metaClient.reloadActiveTimeline(); ConcurrentSchemaEvolutionTableSchemaGetter resolver = new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient); - Option schemaOption = resolver.getTableAvroSchemaIfPresent(false, Option.empty()); + Option schemaOption = resolver.getTableSchemaIfPresent(false, Option.empty()); assertTrue(schemaOption.isPresent()); assertEquals(SCHEMA_WITHOUT_METADATA, schemaOption.get()); } @@ -235,7 +235,7 @@ void testGetTableSchemaFromLatestCommitMetadata() throws Exception { metaClient.reloadActiveTimeline(); ConcurrentSchemaEvolutionTableSchemaGetter resolver = new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient); - Option schemaOption = resolver.getTableAvroSchemaIfPresent(false, Option.empty()); + Option schemaOption = resolver.getTableSchemaIfPresent(false, Option.empty()); assertTrue(schemaOption.isPresent()); assertEquals(SCHEMA_WITHOUT_METADATA, schemaOption.get()); } @@ -269,14 +269,14 @@ private static Stream commonTableConfigTestDimension() { @ParameterizedTest @MethodSource("commonTableConfigTestDimension") - void testGetTableAvroSchemaInternalNoSchemaFoundEmptyTimeline(HoodieTableType tableType) throws IOException { + void testGetTableSchemaInternalNoSchemaFoundEmptyTimeline(HoodieTableType tableType) throws IOException { // Don't set any schema in commit metadata or table config initMetaClient(false, tableType); testTable = HoodieTestTable.of(metaClient); metaClient.reloadActiveTimeline(); ConcurrentSchemaEvolutionTableSchemaGetter resolver = new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient); - Option schemaOption = resolver.getTableAvroSchemaIfPresent(true, Option.empty()); + Option schemaOption = resolver.getTableSchemaIfPresent(true, Option.empty()); assertFalse(schemaOption.isPresent()); } @@ -285,7 +285,7 @@ void testGetTableAvroSchemaInternalNoSchemaFoundEmptyTimeline(HoodieTableType ta // we will only use that and ignore the other instants. @ParameterizedTest @MethodSource("commonTableConfigTestDimension") - void testGetTableAvroSchemaInternalNoSchemaFoundDisqualifiedInstant(HoodieTableType tableType) throws Exception { + void testGetTableSchemaInternalNoSchemaFoundDisqualifiedInstant(HoodieTableType tableType) throws Exception { // Don't set any schema in commit metadata or table config initMetaClient(false, tableType); testTable = HoodieTestTable.of(metaClient); @@ -296,7 +296,7 @@ void testGetTableAvroSchemaInternalNoSchemaFoundDisqualifiedInstant(HoodieTableT metaClient.reloadActiveTimeline(); ConcurrentSchemaEvolutionTableSchemaGetter resolver = new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient); - Option schemaOption = resolver.getTableAvroSchemaIfPresent(true, Option.empty()); + Option schemaOption = resolver.getTableSchemaIfPresent(true, Option.empty()); assertTrue(schemaOption.isEmpty()); } @@ -383,7 +383,7 @@ private static Stream schemaTestParams() { @ParameterizedTest @MethodSource("schemaTestParams") - void testGetTableAvroSchema(Schema inputSchema, boolean includeMetadataFields, Schema expectedSchema) throws Exception { + void testGetTableSchema(HoodieSchema inputSchema, boolean includeMetadataFields, HoodieSchema expectedSchema) throws Exception { metaClient = HoodieTestUtils.getMetaClientBuilder(HoodieTableType.COPY_ON_WRITE, new Properties(),"") .setTableCreateSchema(SCHEMA_WITH_METADATA.toString()) .initTable(getDefaultStorageConf(), basePath); @@ -397,9 +397,9 @@ void testGetTableAvroSchema(Schema inputSchema, boolean includeMetadataFields, S inputSchema.toString(), COMMIT_ACTION))); - assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent(includeMetadataFields, Option.empty()).get()); + assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent(includeMetadataFields, Option.empty()).get()); HoodieInstant instant = metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED, COMMIT_ACTION, "0010", "0011"); - assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent( + assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent( includeMetadataFields, Option.of(instant)).get()); } @@ -412,7 +412,7 @@ private static Stream partitionColumnSchemaTestParams() { @ParameterizedTest @MethodSource("partitionColumnSchemaTestParams") - void testGetTableAvroSchemaAppendPartitionColumn(boolean shouldIncludePartitionColumns, Schema expectedSchema) throws Exception { + void testGetTableSchemaAppendPartitionColumn(boolean shouldIncludePartitionColumns, HoodieSchema expectedSchema) throws Exception { metaClient = HoodieTestUtils.getMetaClientBuilder(HoodieTableType.COPY_ON_WRITE, new Properties(),"") .setPartitionFields("partitionColumn") .setShouldDropPartitionColumns(shouldIncludePartitionColumns) @@ -427,8 +427,8 @@ void testGetTableAvroSchemaAppendPartitionColumn(boolean shouldIncludePartitionC SCHEMA_WITHOUT_METADATA.toString(), COMMIT_ACTION))); - assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent(false, Option.empty()).get()); - assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent( + assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent(false, Option.empty()).get()); + assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent( false, Option.of(metaClient.getInstantGenerator().createNewInstant( HoodieInstant.State.COMPLETED, COMMIT_ACTION, "0010", "0011"))).get()); } @@ -442,21 +442,21 @@ private static Stream createSchemaTestParam() { @ParameterizedTest @MethodSource("createSchemaTestParam") - void testGetTableCreateAvroSchema(boolean includeMetadataFields, Schema expectedSchema) throws Exception { + void testGetTableCreateSchema(boolean includeMetadataFields, HoodieSchema expectedSchema) throws Exception { metaClient = HoodieTestUtils.getMetaClientBuilder(HoodieTableType.COPY_ON_WRITE, new Properties(),"") .setTableCreateSchema(SCHEMA_WITH_METADATA.toString()) .initTable(getDefaultStorageConf(), basePath); testTable = HoodieTestTable.of(metaClient); - assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent(includeMetadataFields, Option.empty()).get()); + assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent(includeMetadataFields, Option.empty()).get()); // getTableAvroSchemaFromLatestCommit only cares about active timeline, since it is empty, no schema is returned. - assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableAvroSchemaIfPresent( + assertEquals(expectedSchema, new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient).getTableSchemaIfPresent( includeMetadataFields, Option.of(metaClient.getInstantGenerator().createNewInstant( HoodieInstant.State.COMPLETED, COMMIT_ACTION, "0010", "0011"))).get()); } @Test - public void testGetTableAvroSchemaInternalWithPartitionFields() throws IOException { + public void testGetTableSchemaInternalWithPartitionFields() throws IOException { initMetaClient(false, HoodieTableType.COPY_ON_WRITE); testTable = HoodieTestTable.of(metaClient); // Setup table config with partition fields @@ -467,22 +467,22 @@ public void testGetTableAvroSchemaInternalWithPartitionFields() throws IOExcepti metaClient.reloadActiveTimeline(); ConcurrentSchemaEvolutionTableSchemaGetter resolver = new ConcurrentSchemaEvolutionTableSchemaGetter(metaClient); - Option schemaOption = resolver.getTableAvroSchemaIfPresent(true, Option.empty()); + Option schemaOption = resolver.getTableSchemaIfPresent(true, Option.empty()); assertTrue(schemaOption.isPresent()); - Schema resultSchema = schemaOption.get(); + HoodieSchema resultSchema = schemaOption.get(); assertTrue(resultSchema.getFields().stream() .anyMatch(f -> f.name().equals("partition_path"))); } @ParameterizedTest @MethodSource("commonTableConfigTestDimension") - void testGetTableAvroSchemaInternalWithSpecificInstant(HoodieTableType tableType) throws Exception { + void testGetTableSchemaInternalWithSpecificInstant(HoodieTableType tableType) throws Exception { initMetaClient(false, tableType); testTable = HoodieTestTable.of(metaClient); - Schema schema1 = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); - Schema schema2 = new Schema.Parser().parse(TRIP_SCHEMA); + HoodieSchema schema1 = HoodieSchema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); + HoodieSchema schema2 = HoodieSchema.parse(TRIP_SCHEMA); // Create two commits with different schemas int startCommitTime = 10; @@ -514,7 +514,7 @@ void testGetTableAvroSchemaInternalWithSpecificInstant(HoodieTableType tableType // Test getting schema from first instant String timestamp1 = padWithLeadingZeros(Integer.toString(10), REQUEST_TIME_LENGTH); - Option schema1Option = resolver.getTableAvroSchemaIfPresent( + Option schema1Option = resolver.getTableSchemaIfPresent( false, Option.of(metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED, COMMIT_ACTION, timestamp1, incTimestampStrByOne(timestamp1)))); assertTrue(schema1Option.isPresent()); @@ -522,7 +522,7 @@ void testGetTableAvroSchemaInternalWithSpecificInstant(HoodieTableType tableType // Test getting schema from second instant String timestamp2 = padWithLeadingZeros(Integer.toString(20), REQUEST_TIME_LENGTH); - Option schema2Option = resolver.getTableAvroSchemaIfPresent( + Option schema2Option = resolver.getTableSchemaIfPresent( false, Option.of(metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED, COMMIT_ACTION, timestamp2, incTimestampStrByOne(timestamp2)))); assertTrue(schema2Option.isPresent()); @@ -534,7 +534,7 @@ void testGetTableAvroSchemaInternalWithSpecificInstant(HoodieTableType tableType for (Integer i = startCommitTime + 10; i <= endCommitTime + 10; i += 10) { String timestampI = padWithLeadingZeros(Integer.toString(i), REQUEST_TIME_LENGTH); - schema2Option = resolver.getTableAvroSchemaIfPresent(false, + schema2Option = resolver.getTableSchemaIfPresent(false, Option.of(metaClient.getInstantGenerator().createNewInstant(HoodieInstant.State.COMPLETED, COMMIT_ACTION, timestampI, incTimestampStrByOne(timestampI)))); assertTrue(schema2Option.isPresent(), i::toString); assertEquals(schema2.toString(), schema2Option.get().toString()); @@ -542,14 +542,14 @@ void testGetTableAvroSchemaInternalWithSpecificInstant(HoodieTableType tableType } @Test - void testTableAvroSchemaFromTimelineCachingBehavior() throws Exception { + void testTableSchemaFromTimelineCachingBehavior() throws Exception { // Initialize with COW table type initMetaClient(false, HoodieTableType.COPY_ON_WRITE); testTable = HoodieTestTable.of(metaClient); // Create test schema - Schema schema1 = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); - Schema schema2 = new Schema.Parser().parse(HoodieTestDataGenerator.SHORT_TRIP_SCHEMA); + HoodieSchema schema1 = HoodieSchema.parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); + HoodieSchema schema2 = HoodieSchema.parse(HoodieTestDataGenerator.SHORT_TRIP_SCHEMA); // Create a commit with schema1 String commitTime1 = "0010"; @@ -578,7 +578,7 @@ void testTableAvroSchemaFromTimelineCachingBehavior() throws Exception { HoodieInstant instant1 = metaClient.getCommitsTimeline().filterCompletedInstants().nthInstant(0).get(); // Case 1: First call with empty instant - should fetch from timeline and cache - Option schemaOption1 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty()); + Option schemaOption1 = resolver.getTableSchemaFromTimelineWithCache(Option.empty()); assertTrue(schemaOption1.isPresent()); assertEquals(schema2, schemaOption1.get()); @@ -586,7 +586,7 @@ void testTableAvroSchemaFromTimelineCachingBehavior() throws Exception { verify(resolver, times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any()); // Case 2: Second call with empty instant - should use cache - Option schemaOption2 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.empty()); + Option schemaOption2 = resolver.getTableSchemaFromTimelineWithCache(Option.empty()); assertTrue(schemaOption2.isPresent()); assertEquals(schema2, schemaOption2.get()); @@ -594,7 +594,7 @@ void testTableAvroSchemaFromTimelineCachingBehavior() throws Exception { verify(resolver, times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any()); // Case 3: Call with the latest valid instant - there should be a cache hit - Option schemaOption3 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant2)); + Option schemaOption3 = resolver.getTableSchemaFromTimelineWithCache(Option.of(instant2)); assertTrue(schemaOption3.isPresent()); assertEquals(schema2, schemaOption3.get()); @@ -602,7 +602,7 @@ void testTableAvroSchemaFromTimelineCachingBehavior() throws Exception { verify(resolver, times(1)).getLastCommitMetadataWithValidSchemaFromTimeline(any(), any()); // Case 4: Second call with some other instant - should use cache - Option schemaOption4 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(instant1)); + Option schemaOption4 = resolver.getTableSchemaFromTimelineWithCache(Option.of(instant1)); assertTrue(schemaOption4.isPresent()); assertEquals(schema1, schemaOption4.get()); @@ -613,7 +613,7 @@ void testTableAvroSchemaFromTimelineCachingBehavior() throws Exception { String nonExistentTime = "9999"; HoodieInstant nonExistentInstant = metaClient.getInstantGenerator().createNewInstant( HoodieInstant.State.COMPLETED, COMMIT_ACTION, nonExistentTime, nonExistentTime); - Option schemaOption5 = resolver.getTableAvroSchemaFromTimelineWithCache(Option.of(nonExistentInstant)); + Option schemaOption5 = resolver.getTableSchemaFromTimelineWithCache(Option.of(nonExistentInstant)); assertEquals(schema2, schemaOption5.get()); // Verify one more call to timeline for non-existent instant diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleSchemaConflictResolutionStrategy.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleSchemaConflictResolutionStrategy.java index db6a6d5abf0e3..44759a5743ddd 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleSchemaConflictResolutionStrategy.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestSimpleSchemaConflictResolutionStrategy.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.view.FileSystemViewManager; @@ -43,7 +44,6 @@ import org.apache.hudi.exception.HoodieSchemaEvolutionConflictException; import org.apache.hudi.table.TestBaseHoodieTable; -import org.apache.avro.Schema; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.mockito.Mock; @@ -137,9 +137,9 @@ private void setupInstants(String tableSchemaAtTxnStart, String tableSchemaAtTxn @Test void testNoConflictFirstCommit() throws Exception { setupInstants(null, null, SCHEMA1, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, Option.empty(), nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA1), result); + assertEquals(HoodieSchema.parse(SCHEMA1), result); } @Test @@ -152,9 +152,9 @@ void testNullWriterSchema() throws Exception { @Test void testNullTypeWriterSchema() throws Exception { setupInstants(SCHEMA1, SCHEMA1, NULL_SCHEMA, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, lastCompletedTxnOwnerInstant, nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA1), result); + assertEquals(HoodieSchema.parse(SCHEMA1), result); } @Test @@ -167,41 +167,41 @@ void testConflictSecondCommitDifferentSchema() throws Exception { @Test void testConflictSecondCommitSameSchema() throws Exception { setupInstants(null, SCHEMA1, SCHEMA1, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, Option.empty(), nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA1), result); + assertEquals(HoodieSchema.parse(SCHEMA1), result); } @Test void testNoConflictSameSchema() throws Exception { setupInstants(SCHEMA1, SCHEMA1, SCHEMA1, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, lastCompletedTxnOwnerInstant, nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA1), result); + assertEquals(HoodieSchema.parse(SCHEMA1), result); } @Test void testNoConflictBackwardsCompatible1() throws Exception { setupInstants(SCHEMA1, SCHEMA2, SCHEMA1, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, lastCompletedTxnOwnerInstant, nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA2), result); + assertEquals(HoodieSchema.parse(SCHEMA2), result); } @Test void testNoConflictBackwardsCompatible2() throws Exception { setupInstants(SCHEMA1, SCHEMA1, SCHEMA2, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, lastCompletedTxnOwnerInstant, nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA2), result); + assertEquals(HoodieSchema.parse(SCHEMA2), result); } @Test void testNoConflictConcurrentEvolutionSameSchema() throws Exception { setupInstants(SCHEMA1, SCHEMA2, SCHEMA2, true, false); - Schema result = strategy.resolveConcurrentSchemaEvolution( + HoodieSchema result = strategy.resolveConcurrentSchemaEvolution( table, config, lastCompletedTxnOwnerInstant, nonTableCompactionInstant).get(); - assertEquals(new Schema.Parser().parse(SCHEMA2), result); + assertEquals(HoodieSchema.parse(SCHEMA2), result); } @Test diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/TestHoodieIndexUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/TestHoodieIndexUtils.java index 656203d8a905a..fe523fb31cd39 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/TestHoodieIndexUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/index/TestHoodieIndexUtils.java @@ -18,15 +18,15 @@ package org.apache.hudi.index; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.exception.HoodieMetadataIndexException; import org.apache.hudi.metadata.MetadataPartitionType; -import org.apache.avro.LogicalTypes; -import org.apache.avro.Schema; -import org.apache.avro.SchemaBuilder; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -47,6 +47,7 @@ import static org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_SECONDARY_INDEX; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.when; @@ -79,20 +80,17 @@ public void setUp() { public void testIsEligibleForSecondaryIndexWithSupportedDataTypes() { // Given: A schema with supported data types for secondary index (String/CHAR, Int, Long, Float, Double) // Note: CHAR is represented as STRING in Avro schema - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("stringField") - .requiredString("charField") // CHAR is represented as STRING in Avro - .optionalInt("intField") - .requiredLong("longField") - .name("doubleField").type().doubleType().noDefault() - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("stringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("charField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("intField", HoodieSchema.createNullable(HoodieSchemaType.INT)), + HoodieSchemaField.of("longField", HoodieSchema.create(HoodieSchemaType.LONG)), + HoodieSchemaField.of("doubleField", HoodieSchema.create(HoodieSchemaType.DOUBLE)) + )); // Mock the schema resolver try (MockedConstruction mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Test case 1: Secondary index with record index already present // Given: Record index partition already exists @@ -143,21 +141,18 @@ public void testIsEligibleForSecondaryIndexWithSupportedDataTypes() { } } + @Test public void testValidateDataTypeForSecondaryOrExpressionIndex() { // Create a dummy schema with both complex and primitive types - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("stringField") - .optionalInt("intField") - .name("arrayField").type().array().items().stringType().noDefault() - .name("mapField").type().map().values().intType().noDefault() - .name("structField").type().record("NestedRecord") - .fields() - .requiredString("nestedString") - .endRecord() - .noDefault() - .endRecord(); - + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("stringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("intField", HoodieSchema.createNullable(HoodieSchemaType.INT)), + HoodieSchemaField.of("arrayField", HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING))), + HoodieSchemaField.of("mapField", HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT))), + HoodieSchemaField.of("structField", HoodieSchema.createRecord("NestedRecord", null, null, Arrays.asList( + HoodieSchemaField.of("nestedString", HoodieSchema.create(HoodieSchemaType.STRING)) + ))) + )); // Test for primitive fields assertTrue(validateDataTypeForSecondaryOrExpressionIndex(Arrays.asList("stringField", "intField"), schema)); @@ -175,24 +170,21 @@ public void testValidateDataTypeForSecondaryOrExpressionIndex() { @Test public void testValidateDataTypeForSecondaryIndex() { // Create a schema with various data types - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("stringField") - .requiredString("charField") // CHAR is represented as STRING in Avro - .optionalInt("intField") - .requiredLong("longField") - .name("timestampField").type().longType().longDefault(0L) // timestamp as long - .name("booleanField").type().booleanType().noDefault() - .name("floatField").type().floatType().noDefault() - .name("doubleField").type().doubleType().noDefault() - .name("arrayField").type().array().items().stringType().noDefault() - .name("mapField").type().map().values().intType().noDefault() - .name("structField").type().record("NestedRecord") - .fields() - .requiredString("nestedString") - .endRecord() - .noDefault() - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("stringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("charField", HoodieSchema.create(HoodieSchemaType.STRING)), // CHAR is represented as STRING + HoodieSchemaField.of("intField", HoodieSchema.createNullable(HoodieSchemaType.INT)), + HoodieSchemaField.of("longField", HoodieSchema.create(HoodieSchemaType.LONG)), + HoodieSchemaField.of("timestampField", HoodieSchema.create(HoodieSchemaType.LONG), null, 0L), // timestamp as long + HoodieSchemaField.of("booleanField", HoodieSchema.create(HoodieSchemaType.BOOLEAN)), + HoodieSchemaField.of("floatField", HoodieSchema.create(HoodieSchemaType.FLOAT)), + HoodieSchemaField.of("doubleField", HoodieSchema.create(HoodieSchemaType.DOUBLE)), + HoodieSchemaField.of("arrayField", HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING))), + HoodieSchemaField.of("mapField", HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT))), + HoodieSchemaField.of("structField", HoodieSchema.createRecord("NestedRecord", null, null, Arrays.asList( + HoodieSchemaField.of("nestedString", HoodieSchema.create(HoodieSchemaType.STRING)) + ))) + )); // Test supported types for secondary index assertTrue(validateDataTypeForSecondaryIndex(Collections.singletonList("stringField"), schema)); @@ -210,7 +202,7 @@ public void testValidateDataTypeForSecondaryIndex() { assertFalse(validateDataTypeForSecondaryIndex(Collections.singletonList("arrayField"), schema)); assertFalse(validateDataTypeForSecondaryIndex(Collections.singletonList("mapField"), schema)); assertFalse(validateDataTypeForSecondaryIndex(Collections.singletonList("structField"), schema)); - assertFalse(validateDataTypeForSecondaryIndex(Collections.singletonList("floatField"), schema)); + assertTrue(validateDataTypeForSecondaryIndex(Collections.singletonList("floatField"), schema)); // Test mix of supported and unsupported types (should fail) assertFalse(validateDataTypeForSecondaryIndex(Arrays.asList("stringField", "booleanField"), schema)); @@ -227,32 +219,36 @@ public void testValidateDataTypeForSecondaryIndex() { @Test public void testValidateDataTypeForSecondaryIndexWithLogicalTypes() { // Supported logical types - Schema timestampMillis = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); - Schema timestampMicros = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); - Schema date = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); - Schema timeMillis = LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT)); - Schema timeMicros = LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)); - + HoodieSchema timestampMillis = HoodieSchema.createTimestampMillis(); + HoodieSchema timestampMicros = HoodieSchema.createTimestampMicros(); + HoodieSchema date = HoodieSchema.createDate(); + HoodieSchema timeMillis = HoodieSchema.createTimeMillis(); + HoodieSchema timeMicros = HoodieSchema.createTimeMicros(); + // Unsupported logical types - Schema decimal = LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES)); - Schema uuid = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)); - Schema localTimestampMillis = LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); - Schema localTimestampMicros = LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); - - Schema schemaWithLogicalTypes = SchemaBuilder.record("TestRecord") - .fields() - // Supported logical types - .name("timestampMillisField").type(timestampMillis).noDefault() - .name("timestampMicrosField").type(timestampMicros).noDefault() - .name("dateField").type(date).noDefault() - .name("timeMillisField").type(timeMillis).noDefault() - .name("timeMicrosField").type(timeMicros).noDefault() - // Unsupported logical types - .name("decimalField").type(decimal).noDefault() - .name("uuidField").type(uuid).noDefault() - .name("localTimestampMillisField").type(localTimestampMillis).noDefault() - .name("localTimestampMicrosField").type(localTimestampMicros).noDefault() - .endRecord(); + HoodieSchema decimalBytesBackedField = HoodieSchema.createDecimal(10, 2); + HoodieSchema decimalFixedBackedField = HoodieSchema.createDecimal("decimal_fixed", null, null, 10, 2, 16); + assertInstanceOf(HoodieSchema.Decimal.class, decimalBytesBackedField); + assertInstanceOf(HoodieSchema.Decimal.class, decimalFixedBackedField); + assertFalse(((HoodieSchema.Decimal) decimalBytesBackedField).isFixed()); + assertTrue(((HoodieSchema.Decimal) decimalFixedBackedField).isFixed()); + + HoodieSchema uuid = HoodieSchema.createUUID(); + HoodieSchema localTimestampMillis = HoodieSchema.createLocalTimestampMillis(); + HoodieSchema localTimestampMicros = HoodieSchema.createLocalTimestampMicros(); + + HoodieSchema schemaWithLogicalTypes = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("timestampMillisField", timestampMillis), + HoodieSchemaField.of("timestampMicrosField", timestampMicros), + HoodieSchemaField.of("dateField", date), + HoodieSchemaField.of("timeMillisField", timeMillis), + HoodieSchemaField.of("timeMicrosField", timeMicros), + HoodieSchemaField.of("decimalBytesBackedField", decimalBytesBackedField), + HoodieSchemaField.of("decimalFixedBackedField", decimalFixedBackedField), + HoodieSchemaField.of("uuidField", uuid), + HoodieSchemaField.of("localTimestampMillisField", localTimestampMillis), + HoodieSchemaField.of("localTimestampMicrosField", localTimestampMicros) + )); // Test supported timestamp and date/time logical types assertTrue(validateDataTypeForSecondaryIndex(Collections.singletonList("timestampMillisField"), schemaWithLogicalTypes)); @@ -260,15 +256,16 @@ public void testValidateDataTypeForSecondaryIndexWithLogicalTypes() { assertTrue(validateDataTypeForSecondaryIndex(Collections.singletonList("dateField"), schemaWithLogicalTypes)); assertTrue(validateDataTypeForSecondaryIndex(Collections.singletonList("timeMillisField"), schemaWithLogicalTypes)); assertTrue(validateDataTypeForSecondaryIndex(Collections.singletonList("timeMicrosField"), schemaWithLogicalTypes)); - + // Test unsupported logical types - assertFalse(validateDataTypeForSecondaryIndex(Collections.singletonList("decimalField"), schemaWithLogicalTypes)); + assertFalse(validateDataTypeForSecondaryIndex(Collections.singletonList("decimalBytesBackedField"), schemaWithLogicalTypes)); + assertFalse(validateDataTypeForSecondaryIndex(Collections.singletonList("decimalFixedBackedField"), schemaWithLogicalTypes)); assertFalse(validateDataTypeForSecondaryIndex(Collections.singletonList("uuidField"), schemaWithLogicalTypes)); assertFalse(validateDataTypeForSecondaryIndex(Collections.singletonList("localTimestampMillisField"), schemaWithLogicalTypes)); assertFalse(validateDataTypeForSecondaryIndex(Collections.singletonList("localTimestampMicrosField"), schemaWithLogicalTypes)); - + // Test mix of supported and unsupported logical types - assertFalse(validateDataTypeForSecondaryIndex(Arrays.asList("timestampMillisField", "decimalField"), schemaWithLogicalTypes)); + assertFalse(validateDataTypeForSecondaryIndex(Arrays.asList("timestampMillisField", "decimalBytesBackedField"), schemaWithLogicalTypes)); } /** @@ -282,21 +279,18 @@ public void testValidateDataTypeForSecondaryIndexWithLogicalTypes() { public void testIsEligibleForSecondaryIndexWithUnsupportedDataTypes() { // Given: A schema with unsupported data types for secondary index (Boolean, Decimal) // Note: Float and Double are now supported - Schema decimalType = LogicalTypes.decimal(10, 2).addToSchema(Schema.create(Schema.Type.BYTES)); - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("stringField") - .name("floatField").type().floatType().noDefault() - .name("doubleField").type().doubleType().noDefault() - .name("booleanField").type().booleanType().noDefault() - .name("decimalField").type(decimalType).noDefault() - .endRecord(); + HoodieSchema decimalType = HoodieSchema.createDecimal(10, 2); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("stringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("floatField", HoodieSchema.create(HoodieSchemaType.FLOAT)), + HoodieSchemaField.of("doubleField", HoodieSchema.create(HoodieSchemaType.DOUBLE)), + HoodieSchemaField.of("booleanField", HoodieSchema.create(HoodieSchemaType.BOOLEAN)), + HoodieSchemaField.of("decimalField", decimalType) + )); // Mock the schema resolver try (MockedConstruction mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Given: Record index partition exists Set partitions = new HashSet<>(); @@ -312,8 +306,7 @@ public void testIsEligibleForSecondaryIndexWithUnsupportedDataTypes() { // When: Checking eligibility for secondary index // Then: Should not throw exception because float is now supported for secondary index - assertThrows(HoodieMetadataIndexException.class, - () -> HoodieIndexUtils.validateEligibilityForSecondaryOrExpressionIndex( + assertDoesNotThrow(() -> HoodieIndexUtils.validateEligibilityForSecondaryOrExpressionIndex( mockMetaClient, PARTITION_NAME_SECONDARY_INDEX, options, columns, "test_index")); // Test case 2: Supported double field (now supported) @@ -351,8 +344,7 @@ public void testIsEligibleForSecondaryIndexWithUnsupportedDataTypes() { () -> HoodieIndexUtils.validateEligibilityForSecondaryOrExpressionIndex( mockMetaClient, PARTITION_NAME_SECONDARY_INDEX, options, columns, "test_index")); assertTrue(ex4.getMessage().contains("unsupported data type")); - assertTrue(ex4.getMessage().contains("BYTES with logical type")); - assertTrue(ex4.getMessage().contains("Decimal")); + assertTrue(ex4.getMessage().contains("DECIMAL")); assertTrue(ex4.getMessage().contains("Secondary indexes only support")); // Test case 5: Mix of supported fields (now including double) @@ -378,20 +370,16 @@ public void testIsEligibleForSecondaryIndexWithUnsupportedDataTypes() { @Test public void testIsEligibleForSecondaryIndexWithLogicalTypes() { // Given: A schema with timestamp and date logical types - Schema timestampMillis = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); - Schema date = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); - - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .name("timestampField").type(timestampMillis).noDefault() - .name("dateField").type(date).noDefault() - .endRecord(); + HoodieSchema timestampMillis = HoodieSchema.createTimestampMillis(); + HoodieSchema date = HoodieSchema.createDate(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("timestampField", timestampMillis), + HoodieSchemaField.of("dateField", date) + )); // Mock the schema resolver try (MockedConstruction mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Given: Record index partition exists Set partitions = new HashSet<>(); @@ -420,16 +408,13 @@ public void testIsEligibleForSecondaryIndexWithLogicalTypes() { @Test public void testIsEligibleForSecondaryIndexWithoutRecordIndex() { // Given: A schema with supported data types - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("stringField") - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Collections.singletonList( + HoodieSchemaField.of("stringField", HoodieSchema.create(HoodieSchemaType.STRING)) + )); // Mock the schema resolver try (MockedConstruction mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Test case 1: No record index partition and not enabled in options // Given: No record index partition exists and not enabled in options @@ -471,19 +456,16 @@ public void testIsEligibleForSecondaryIndexWithoutRecordIndex() { @Test public void testIsEligibleForExpressionIndex() { // Given: A schema with various data types including complex types - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("stringField") - .name("floatField").type().floatType().noDefault() - .name("arrayField").type().array().items().stringType().noDefault() - .name("mapField").type().map().values().intType().noDefault() - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("stringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("floatField", HoodieSchema.create(HoodieSchemaType.FLOAT)), + HoodieSchemaField.of("arrayField", HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING))), + HoodieSchemaField.of("mapField", HoodieSchema.createMap(HoodieSchema.create(HoodieSchemaType.INT))) + )); // Mock the schema resolver try (MockedConstruction mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { Map> columns = new HashMap<>(); Map options = new HashMap<>(); @@ -544,18 +526,17 @@ public void testIsEligibleForExpressionIndex() { */ @Test public void testIsEligibleForExpressionIndexWithNullableFields() { + // An int with default 0 must have the int type defined first. + // If null is defined first, which HoodieSchema#createNullable does, an error will be thrown + HoodieSchema nullableIntWithDefault = HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.INT), HoodieSchema.create(HoodieSchemaType.NULL)); // Given: A schema with nullable fields (union types) - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .optionalString("nullableStringField") - .name("nullableIntField").type().nullable().intType().intDefault(0) - .endRecord(); - + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("nullableStringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("nullableIntField", nullableIntWithDefault, null, 0) + )); // Mock the schema resolver try (MockedConstruction mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { Map> columns = new HashMap<>(); columns.put("nullableStringField", Collections.emptyMap()); @@ -578,19 +559,18 @@ public void testIsEligibleForExpressionIndexWithNullableFields() { */ @Test public void testIsEligibleForSecondaryIndexWithNullableFields() { + HoodieSchema nullableIntWithDefault = HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.INT), HoodieSchema.create(HoodieSchemaType.NULL)); + HoodieSchema nullableLongWithDefault = HoodieSchema.createUnion(HoodieSchema.create(HoodieSchemaType.LONG), HoodieSchema.create(HoodieSchemaType.NULL)); // Given: A schema with nullable fields that are supported for secondary index - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .optionalString("nullableStringField") - .name("nullableIntField").type().nullable().intType().intDefault(0) - .name("nullableLongField").type().nullable().longType().longDefault(0L) - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("nullableStringField", HoodieSchema.create(HoodieSchemaType.STRING)), + HoodieSchemaField.of("nullableIntField", nullableIntWithDefault, null, 0), + HoodieSchemaField.of("nullableLongField", nullableLongWithDefault, null, 0L) + )); // Mock the schema resolver try (MockedConstruction mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Given: Record index partition exists Set partitions = new HashSet<>(); @@ -620,26 +600,23 @@ public void testIsEligibleForSecondaryIndexWithNullableFields() { @Test public void testIsEligibleForSecondaryIndexWithAllLogicalTypes() { // Given: A schema with all supported timestamp logical types - Schema timestampMillis = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); - Schema timestampMicros = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); - Schema date = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); - Schema timeMillis = LogicalTypes.timeMillis().addToSchema(Schema.create(Schema.Type.INT)); - Schema timeMicros = LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG)); - - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .name("timestampMillisField").type(timestampMillis).noDefault() - .name("timestampMicrosField").type(timestampMicros).noDefault() - .name("dateField").type(date).noDefault() - .name("timeMillisField").type(timeMillis).noDefault() - .name("timeMicrosField").type(timeMicros).noDefault() - .endRecord(); + HoodieSchema timestampMillis = HoodieSchema.createTimestampMillis(); + HoodieSchema timestampMicros = HoodieSchema.createTimestampMicros(); + HoodieSchema date = HoodieSchema.createDate(); + HoodieSchema timeMillis = HoodieSchema.createTimeMillis(); + HoodieSchema timeMicros = HoodieSchema.createTimeMicros(); + + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("timestampMillisField", timestampMillis), + HoodieSchemaField.of("timestampMicrosField", timestampMicros), + HoodieSchemaField.of("dateField", date), + HoodieSchemaField.of("timeMillisField", timeMillis), + HoodieSchemaField.of("timeMicrosField", timeMicros) + )); // Mock the schema resolver try (MockedConstruction mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Given: Record index is enabled Set partitions = new HashSet<>(); @@ -671,16 +648,13 @@ public void testIsEligibleForSecondaryIndexWithAllLogicalTypes() { @Test public void testIsEligibleForSecondaryIndexWithColumnNotInSchema() { // Given: A schema without the requested column - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("existingField") - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("existingField", HoodieSchema.create(HoodieSchemaType.STRING)) + )); // Mock the schema resolver try (MockedConstruction mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Given: Record index is enabled Set partitions = new HashSet<>(); @@ -711,19 +685,16 @@ public void testIsEligibleForSecondaryIndexWithColumnNotInSchema() { @Test public void testIsEligibleForSecondaryIndexWithStringLogicalTypes() { // Given: A schema with UUID logical type on string field - Schema uuidSchema = LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING)); - - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .name("uuidField").type(uuidSchema).noDefault() - .requiredString("regularStringField") - .endRecord(); + HoodieSchema uuidSchema = HoodieSchema.createUUID(); + + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("uuidField", uuidSchema), + HoodieSchemaField.of("regularStringField", HoodieSchema.create(HoodieSchemaType.STRING)) + )); // Mock the schema resolver try (MockedConstruction mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { // Given: Record index is enabled Set partitions = new HashSet<>(); @@ -767,16 +738,13 @@ public void testIsEligibleForSecondaryIndexWithStringLogicalTypes() { @Test public void testIsEligibleForExpressionIndexWithColumnNotInSchema() { // Given: A schema without the requested column - Schema schema = SchemaBuilder.record("TestRecord") - .fields() - .requiredString("existingField") - .endRecord(); + HoodieSchema schema = HoodieSchema.createRecord("TestRecord", null, null, Arrays.asList( + HoodieSchemaField.of("existingField", HoodieSchema.create(HoodieSchemaType.STRING)) + )); // Mock the schema resolver try (MockedConstruction mockedResolver = Mockito.mockConstruction(TableSchemaResolver.class, - (mock, context) -> { - when(mock.getTableAvroSchema()).thenReturn(schema); - })) { + (mock, context) -> when(mock.getTableSchema()).thenReturn(schema))) { Map> columns = new HashMap<>(); columns.put("nonExistentField", Collections.emptyMap()); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java index b31f00e60d8bb..f880c35594fd6 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/GenericRecordValidationTestUtils.java @@ -18,10 +18,13 @@ package org.apache.hudi.testutils; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.config.HoodieReaderConfig; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -35,7 +38,6 @@ import org.apache.hudi.storage.StoragePath; import org.apache.hudi.storage.HoodieStorageUtils; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.ArrayWritable; @@ -67,8 +69,8 @@ public class GenericRecordValidationTestUtils { public static void assertGenericRecords(GenericRecord record1, GenericRecord record2, - Schema schema, List excludeFields) { - for (Schema.Field f: schema.getFields()) { + HoodieSchema schema, List excludeFields) { + for (HoodieSchemaField f: schema.getFields()) { String fieldName = f.name(); if (excludeFields.contains(fieldName)) { continue; @@ -81,7 +83,7 @@ public static void assertGenericRecords(GenericRecord record1, GenericRecord rec HoodieRealtimeRecordReaderUtils.arrayWritableToString((ArrayWritable) value2)); } else if (value1 instanceof Text && value2 instanceof BytesWritable) { assertArrayEquals(((Text) value1).getBytes(), ((BytesWritable) value2).getBytes()); - } else if (f.schema().getType() == Schema.Type.ENUM + } else if (f.schema().getType() == HoodieSchemaType.ENUM && value1 instanceof BytesWritable && value2 instanceof Text) { // TODO(HUDI-8660): Revisit ENUM handling in Spark parquet reader and writer assertArrayEquals(((BytesWritable) value1).getBytes(), ((Text) value2).getBytes()); @@ -126,8 +128,8 @@ public static void assertDataInMORTable(HoodieWriteConfig config, String instant // Verify row count. assertEquals(prevRecordsMap.size(), newRecordsMap.size()); - Schema readerSchema = HoodieAvroUtils.addMetadataFields( - new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField()); + HoodieSchema readerSchema = HoodieSchemaUtils.addMetadataFields( + HoodieSchema.parse(config.getSchema()), config.allowOperationMetadataField()); // Verify every field. prevRecordsMap.forEach((key, value) -> { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java index 9b2cb5a64fcc2..e512f5070313b 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/testutils/HoodieMergeOnReadTestUtils.java @@ -18,7 +18,9 @@ package org.apache.hudi.testutils; -import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.testutils.HadoopFSTestUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -28,8 +30,6 @@ import org.apache.hudi.storage.StorageConfiguration; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; @@ -67,42 +67,42 @@ public static List getRecordsUsingInputFormat(StorageConfiguratio public static List getRecordsUsingInputFormat(StorageConfiguration conf, List inputPaths, String basePath, JobConf jobConf, boolean realtime, boolean populateMetaFields) { - return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, realtime, HoodieTestDataGenerator.AVRO_SCHEMA, + return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, realtime, HoodieTestDataGenerator.HOODIE_SCHEMA, HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>(), populateMetaFields); } - public static List getRecordsUsingInputFormat(StorageConfiguration conf, List inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema rawSchema, + public static List getRecordsUsingInputFormat(StorageConfiguration conf, List inputPaths, String basePath, JobConf jobConf, boolean realtime, HoodieSchema rawSchema, String rawHiveColumnTypes, boolean projectCols, List projectedColumns) { return getRecordsUsingInputFormat(conf, inputPaths, basePath, jobConf, realtime, rawSchema, rawHiveColumnTypes, projectCols, projectedColumns, true); } - public static List getRecordsUsingInputFormat(StorageConfiguration conf, List inputPaths, String basePath, JobConf jobConf, boolean realtime, Schema rawSchema, + public static List getRecordsUsingInputFormat(StorageConfiguration conf, List inputPaths, String basePath, JobConf jobConf, boolean realtime, HoodieSchema rawSchema, String rawHiveColumnTypes, boolean projectCols, List projectedColumns, boolean populateMetaFields) { HoodieTableMetaClient metaClient = HoodieTestUtils.createMetaClient(conf, basePath); FileInputFormat inputFormat = HoodieInputFormatUtils.getInputFormat(metaClient.getTableConfig().getBaseFileFormat(), realtime, jobConf); - Schema schema; + HoodieSchema schema; String hiveColumnTypes; if (populateMetaFields) { - schema = HoodieAvroUtils.addMetadataFields(rawSchema); - hiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(rawHiveColumnTypes); + schema = HoodieSchemaUtils.addMetadataFields(rawSchema); + hiveColumnTypes = HoodieSchemaUtils.addMetadataColumnTypes(rawHiveColumnTypes); } else { schema = rawSchema; hiveColumnTypes = rawHiveColumnTypes; } setPropsForInputFormat(inputFormat, jobConf, schema, hiveColumnTypes, projectCols, projectedColumns, populateMetaFields); - final List fields; + final List fields; if (projectCols) { fields = schema.getFields().stream().filter(f -> projectedColumns.contains(f.name())) .collect(Collectors.toList()); } else { fields = schema.getFields(); } - final Schema projectedSchema = Schema.createRecord(fields.stream() - .map(HoodieAvroUtils::createNewSchemaField) + final HoodieSchema projectedSchema = HoodieSchema.createRecord("testRecord", null, null, fields.stream() + .map(HoodieSchemaUtils::createNewSchemaField) .collect(Collectors.toList())); try { @@ -115,7 +115,7 @@ public static List getRecordsUsingInputFormat(StorageConfiguratio Object key = recordReader.createKey(); ArrayWritable writable = (ArrayWritable) recordReader.createValue(); while (recordReader.next(key, writable)) { - GenericRecordBuilder newRecord = new GenericRecordBuilder(projectedSchema); + GenericRecordBuilder newRecord = new GenericRecordBuilder(projectedSchema.toAvroSchema()); // writable returns an array with [field1, field2, _hoodie_commit_time, // _hoodie_commit_seqno] Writable[] values = writable.get(); @@ -123,7 +123,7 @@ public static List getRecordsUsingInputFormat(StorageConfiguratio .filter(f -> !projectCols || projectedColumns.contains(f.name())) .map(f -> Pair.of(projectedSchema.getFields().stream() .filter(p -> f.name().equals(p.name())).findFirst().get(), f)) - .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey(), values[fieldsPair.getValue().pos()])); + .forEach(fieldsPair -> newRecord.set(fieldsPair.getKey().getAvroField(), values[fieldsPair.getValue().pos()])); records.add(newRecord.build()); } recordReader.close(); @@ -137,12 +137,12 @@ public static List getRecordsUsingInputFormat(StorageConfiguratio } } - private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, Schema schema, String hiveColumnTypes, boolean projectCols, List projectedCols, + private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf jobConf, HoodieSchema schema, String hiveColumnTypes, boolean projectCols, List projectedCols, boolean populateMetaFieldsConfigValue) { - List fields = schema.getFields(); + List fields = schema.getFields(); final List projectedColNames; if (!projectCols) { - projectedColNames = fields.stream().map(Field::name).collect(Collectors.toList()); + projectedColNames = fields.stream().map(HoodieSchemaField::name).collect(Collectors.toList()); } else { projectedColNames = projectedCols; } @@ -155,7 +155,7 @@ private static void setPropsForInputFormat(FileInputFormat inputFormat, JobConf .map(f -> String.valueOf(f.pos())).collect(Collectors.joining(",")); String hiveColumnNames = fields.stream() .filter(field -> !field.name().equalsIgnoreCase("datestr")) - .map(Field::name).collect(Collectors.joining(",")); + .map(HoodieSchemaField::name).collect(Collectors.joining(",")); hiveColumnNames = hiveColumnNames + ",datestr"; StorageConfiguration conf = HoodieTestUtils.getDefaultStorageConf(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java index efdd945c29403..c84349c1aafe1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java @@ -99,7 +99,7 @@ private static HoodieSchema getBootstrapSourceSchemaOrc(HoodieWriteConfig writeC String tableName = HoodieAvroUtils.sanitizeName(writeConfig.getTableName()); String structName = tableName + "_record"; String recordNamespace = "hoodie." + tableName; - return HoodieSchema.fromAvroSchema(AvroOrcUtils.createAvroSchemaWithDefaultValue(orcSchema, structName, recordNamespace, true)); + return AvroOrcUtils.createSchemaWithDefaultValue(orcSchema, structName, recordNamespace, true); } } diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java index 947a292196aea..dc1e41cd589ce 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/AvroRecordContext.java @@ -79,11 +79,12 @@ public static Object getFieldValueFromIndexedRecord( if (fieldOpt.isEmpty()) { return null; } - Object value = currentRecord.get(fieldOpt.get().pos()); + HoodieSchemaField field = fieldOpt.get(); + Object value = currentRecord.get(field.pos()); if (i == path.length - 1) { return value; } - currentSchema = fieldOpt.get().schema(); + currentSchema = field.schema(); currentRecord = (IndexedRecord) value; } return null; diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index f2951fa05a023..c11f87daf0fec 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -398,10 +398,6 @@ public static Schema removeFields(Schema schema, Set fieldsToRemove) { return AvroSchemaUtils.createNewSchemaFromFieldsWithReference(schema, filteredFields); } - public static String addMetadataColumnTypes(String hiveColumnTypes) { - return "string,string,string,string,string," + hiveColumnTypes; - } - public static Schema makeFieldNonNull(Schema schema, String fieldName, Object fieldDefaultValue) { ValidationUtils.checkArgument(fieldDefaultValue != null); List filteredFields = schema.getFields() diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java index 3ab2059a18b0e..eb7378152bcaf 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchema.java @@ -780,6 +780,10 @@ public boolean isNullable() { .anyMatch(schema -> schema.getType() == Schema.Type.NULL); } + public boolean isSchemaNull() { + return type == null || type == HoodieSchemaType.NULL; + } + /** * If this is a union schema, returns the non-null type. Otherwise, returns this schema. * diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java new file mode 100644 index 0000000000000..9f2ea6e94ca66 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaComparatorForSchemaEvolution.java @@ -0,0 +1,342 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Defines equality comparison rules for HoodieSchema schemas for schema evolution purposes. + * + *

This class provides schema comparison logic that focuses only on attributes that affect + * data readers/writers, ignoring metadata like documentation, namespace, and aliases which + * don't impact schema evolution compatibility.

+ * + *

Common Rules Across All Types

+ * Included in equality check: + *
    + *
  • Name/identifier
  • + *
  • Type including primitive type, complex type (see below), and logical type
  • + *
+ * Excluded from equality check: + *
    + *
  • Namespace
  • + *
  • Documentation
  • + *
  • Aliases
  • + *
  • Custom properties
  • + *
+ * + *

Type-Specific Rules

+ * + *

Record

+ * Included: + *
    + *
  • Field names
  • + *
  • Field types
  • + *
  • Field order attribute
  • + *
  • Default values
  • + *
+ * Excluded: + *
    + *
  • Field documentation
  • + *
  • Field aliases
  • + *
+ * + *

Enum

+ * Included: + *
    + *
  • Name
  • + *
  • Symbol order
  • + *
  • Symbol value
  • + *
+ * Excluded: + *
    + *
  • Custom properties
  • + *
+ * + *

Array

+ * Included: + *
    + *
  • Items schema
  • + *
+ * Excluded: + *
    + *
  • Documentation
  • + *
  • Custom properties
  • + *
+ * + *

Map

+ * Included: + *
    + *
  • Values schema
  • + *
+ * Excluded: + *
    + *
  • Documentation
  • + *
  • Custom properties
  • + *
+ * + *

Fixed

+ * Included: + *
    + *
  • Size
  • + *
  • Name
  • + *
+ * Excluded: + *
    + *
  • Namespace
  • + *
  • Aliases
  • + *
+ * + *

Union

+ * Included: + *
    + *
  • Member types
  • + *
+ * Excluded: + *
    + *
  • Member order
  • + *
+ * + *

Logical Types

+ * Included: + *
    + *
  • Logical type name (via schema subclass)
  • + *
  • Underlying primitive type
  • + *
  • Decimal precision/scale (if applicable)
  • + *
  • Timestamp/Time precision (if applicable)
  • + *
+ * Excluded: + *
    + *
  • Documentation
  • + *
  • Custom properties
  • + *
+ */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class HoodieSchemaComparatorForSchemaEvolution { + + private static final HoodieSchemaComparatorForSchemaEvolution VALIDATOR = new HoodieSchemaComparatorForSchemaEvolution(); + + public static boolean schemaEquals(HoodieSchema s1, HoodieSchema s2) { + return VALIDATOR.schemaEqualsInternal(s1, s2); + } + + protected boolean schemaEqualsInternal(HoodieSchema s1, HoodieSchema s2) { + if (s1 == s2) { + return true; + } + if (s1 == null || s2 == null) { + return false; + } + if (s1.getType() != s2.getType()) { + return false; + } + + switch (s1.getType()) { + case RECORD: + return recordSchemaEquals(s1, s2); + case ENUM: + return enumSchemaEquals(s1, s2); + case ARRAY: + return arraySchemaEquals(s1, s2); + case MAP: + return mapSchemaEquals(s1, s2); + case FIXED: + return fixedSchemaEquals(s1, s2); + case UNION: + return unionSchemaEquals(s1, s2); + case STRING: + case BYTES: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case BOOLEAN: + case NULL: + case DATE: + // DATE is INT with date logical type (no additional properties) + case UUID: + // UUID is STRING with uuid logical type (no additional properties) + return true; + case DECIMAL: + return decimalSchemaEquals(s1, s2); + case TIME: + return timeSchemaEquals(s1, s2); + case TIMESTAMP: + return timestampSchemaEquals(s1, s2); + default: + throw new IllegalArgumentException("Unknown schema type: " + s1.getType()); + } + } + + protected boolean validateRecord(HoodieSchema s1, HoodieSchema s2) { + return s1.isError() == s2.isError(); + } + + private boolean recordSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + if (!validateRecord(s1, s2)) { + return false; + } + + List fields1 = s1.getFields(); + List fields2 = s2.getFields(); + + if (fields1.size() != fields2.size()) { + return false; + } + + for (int i = 0; i < fields1.size(); i++) { + if (!fieldEquals(fields1.get(i), fields2.get(i))) { + return false; + } + } + return true; + } + + protected boolean validateField(HoodieSchemaField f1, HoodieSchemaField f2) { + if (!f1.name().equals(f2.name())) { + return false; + } + + if (f1.order() != f2.order()) { + return false; + } + + // Check if both have default values + if (f1.hasDefaultValue() != f2.hasDefaultValue()) { + return false; + } + + // If both have default values, they must be equal + return !f1.hasDefaultValue() || f1.defaultVal().get().equals(f2.defaultVal().get()); + } + + private boolean fieldEquals(HoodieSchemaField f1, HoodieSchemaField f2) { + if (!validateField(f1, f2)) { + return false; + } + + return schemaEqualsInternal(f1.schema(), f2.schema()); + } + + protected boolean enumSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + // Check name equality first + if (!s1.getName().equals(s2.getName())) { + return false; + } + + List symbols1 = s1.getEnumSymbols(); + List symbols2 = s2.getEnumSymbols(); + + // Quick size check before creating lists + if (symbols1.size() != symbols2.size()) { + return false; + } + + return symbols1.equals(symbols2); + } + + protected boolean unionSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + List types1 = s1.getTypes(); + List types2 = s2.getTypes(); + + if (types1.size() != types2.size()) { + return false; + } + + // Create sets of effectively equal types + Set set1 = types1.stream().map(SchemaWrapper::new).collect(Collectors.toSet()); + Set set2 = types2.stream().map(SchemaWrapper::new).collect(Collectors.toSet()); + + // Compare sets instead of ordered lists + return set1.equals(set2); + } + + private boolean arraySchemaEquals(HoodieSchema s1, HoodieSchema s2) { + return schemaEqualsInternal(s1.getElementType(), s2.getElementType()); + } + + private boolean mapSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + return schemaEqualsInternal(s1.getValueType(), s2.getValueType()); + } + + private boolean fixedSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + return s1.getName().equals(s2.getName()) && s1.getFixedSize() == s2.getFixedSize(); + } + + private static boolean decimalSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + HoodieSchema.Decimal d1 = (HoodieSchema.Decimal) s1; + HoodieSchema.Decimal d2 = (HoodieSchema.Decimal) s2; + // Check if both use same underlying representation (FIXED vs BYTES) + if (d1.isFixed() != d2.isFixed()) { + return false; + } + // If both use FIXED representation, they must have the same fixed size + if (d1.isFixed() && d1.getFixedSize() != d2.getFixedSize()) { + return false; + } + return d1.getPrecision() == d2.getPrecision() && d1.getScale() == d2.getScale(); + } + + private static boolean timestampSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + HoodieSchema.Timestamp t1 = (HoodieSchema.Timestamp) s1; + HoodieSchema.Timestamp t2 = (HoodieSchema.Timestamp) s2; + return t1.getPrecision() == t2.getPrecision() && t1.isUtcAdjusted() == t2.isUtcAdjusted(); + } + + private static boolean timeSchemaEquals(HoodieSchema s1, HoodieSchema s2) { + HoodieSchema.Time t1 = (HoodieSchema.Time) s1; + HoodieSchema.Time t2 = (HoodieSchema.Time) s2; + return t1.getPrecision() == t2.getPrecision(); + } + + /** + * Wrapper class to use HoodieSchema in HashSet with our custom equality + */ + static class SchemaWrapper { + private final HoodieSchema schema; + + public SchemaWrapper(HoodieSchema schema) { + this.schema = schema; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SchemaWrapper that = (SchemaWrapper) o; + return schemaEquals(schema, that.schema); + } + + @Override + public int hashCode() { + // This is a simplified hash code that considers only the type + // It's not perfect but good enough for our use case + return schema.getType().hashCode(); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java index bb54628e31af8..ec49fdee0fbef 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/schema/HoodieSchemaUtils.java @@ -766,4 +766,8 @@ public static HoodieSchema resolveUnionSchema(HoodieSchema schema, String fieldS return nonNullType; } + + public static String addMetadataColumnTypes(String hiveColumnTypes) { + return "string,string,string,string,string," + hiveColumnTypes; + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java new file mode 100644 index 0000000000000..e28d694590e01 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/schema/TestHoodieSchemaComparatorForSchemaEvolution.java @@ -0,0 +1,505 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.schema; + +import org.apache.hudi.io.util.FileIOUtils; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class TestHoodieSchemaComparatorForSchemaEvolution { + @Test + void testAttrsIrrelevantToEquality() throws IOException { + // Validates that schemas with different non-essential attributes (like doc strings or aliases) + // are still considered equivalent for schema evolution purposes + String schemaA = FileIOUtils.readAsUTFString(TestHoodieSchemaComparatorForSchemaEvolution.class.getResourceAsStream("/avro-schema-evo/schema-allshapes-A.txt")); + String schemaB = FileIOUtils.readAsUTFString(TestHoodieSchemaComparatorForSchemaEvolution.class.getResourceAsStream("/avro-schema-evo/schema-allshapes-B.txt")); + + HoodieSchema schema1 = HoodieSchema.parse(schemaA); + HoodieSchema schema2 = HoodieSchema.parse(schemaB); + assertNotEquals(schema1, schema2); + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema1, schema2)); + assertEquals(new HoodieSchemaComparatorForSchemaEvolution.SchemaWrapper(schema1), + new HoodieSchemaComparatorForSchemaEvolution.SchemaWrapper(schema2)); + } + + @Test + void testComparingPrimitiveTypes() { + // Tests comparison of all primitive types against each other + // Validates that each primitive type is equal only to other schemas sharing the same + // primitive type. + HoodieSchemaType[] primitiveTypes = { + HoodieSchemaType.NULL, HoodieSchemaType.BOOLEAN, HoodieSchemaType.INT, + HoodieSchemaType.LONG, HoodieSchemaType.FLOAT, HoodieSchemaType.DOUBLE, + HoodieSchemaType.BYTES, HoodieSchemaType.STRING + }; + + for (HoodieSchemaType primitiveType : primitiveTypes) { + for (HoodieSchemaType type : primitiveTypes) { + if (primitiveType == type) { + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.create(primitiveType), + HoodieSchema.create(type) + )); + } else { + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.create(primitiveType), + HoodieSchema.create(type) + ), String.format("Types %s and %s should not be equal", + primitiveType, type)); + } + } + } + } + + @Test + void testEqualToSelf() { + // Validates that a schema is equal to itself + // Basic sanity check for schema comparison + String schema = "{\"type\":\"record\",\"name\":\"R\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":\"string\"}]}"; + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema), + HoodieSchema.parse(schema) + )); + } + + @Test + void testIsErrorFieldInRecordSchema() { + // Tests that a record schema is not equal to an error schema + // even if they have the same structure + HoodieSchema record1 = HoodieSchema.createRecord("TestRecord", null, null, false, + Arrays.asList( + HoodieSchemaField.of("field1", HoodieSchema.create(HoodieSchemaType.STRING), null, null) + )); + + HoodieSchema record2 = HoodieSchema.createRecord("TestRecord", null, null, true, // error record + Arrays.asList( + HoodieSchemaField.of("field1", HoodieSchema.create(HoodieSchemaType.STRING), null, null) + )); + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record2)); + } + + @Test + void testRecordFieldTypes() { + // Validates that records with fields of different types are not considered equal + // even if the field names are the same + String schema1 = "{\"type\":\"record\",\"name\":\"R\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":\"string\"}]}"; + String schema2 = "{\"type\":\"record\",\"name\":\"R\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":\"int\"}]}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testRecordFieldOrderAttribute() { + // Tests that records with different field order attributes are not equal + // This is important for schema evolution as order affects serialization + String schema1 = "{\"type\":\"record\",\"name\":\"R\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":\"string\",\"order\":\"ascending\"}]}"; + String schema2 = "{\"type\":\"record\",\"name\":\"R\",\"fields\":[" + + "{\"name\":\"field1\",\"type\":\"string\",\"order\":\"descending\"}]}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testArraySchema() { + // Validates that array schemas with different item types are not equal + // even if the array structure is the same + String schema1 = "{\"type\":\"array\",\"items\":\"string\"}"; + String schema2 = "{\"type\":\"array\",\"items\":\"int\"}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testMapSchema() { + // Tests that map schemas with different value types are not equal + // even if the map structure is the same + String schema1 = "{\"type\":\"map\",\"values\":\"string\"}"; + String schema2 = "{\"type\":\"map\",\"values\":\"int\"}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testFixedSchemaSizeAttr() { + // Validates that fixed-type schemas with different sizes are not equal + // Size is a critical attribute for fixed-length fields + String schema1 = "{\"type\":\"fixed\",\"name\":\"F\",\"size\":16}"; + String schema2 = "{\"type\":\"fixed\",\"name\":\"F\",\"size\":32}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testUnionMemberTypes() { + // Tests that unions with different member types are not equal + // even if they have the same number of members + String schema1 = "[\"null\",\"string\"]"; + String schema2 = "[\"null\",\"int\"]"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testUnionMemberOrdering() { + // Validates that the order of union members doesn't affect equality + String schema1 = "[\"null\",\"string\"]"; + String schema2 = "[\"string\",\"null\"]"; + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testLogicalTypeDecimalAttr() { + // Tests that decimal logical types with different precision and scale are not equal + String schema1 = "{\"type\":\"fixed\",\"name\":\"D\",\"size\":16," + + "\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}"; + String schema2 = "{\"type\":\"fixed\",\"name\":\"D\",\"size\":16," + + "\"logicalType\":\"decimal\",\"precision\":8,\"scale\":2}"; + String schema3 = "{\"type\":\"fixed\",\"name\":\"D\",\"size\":16," + + "\"logicalType\":\"decimal\",\"precision\":8,\"scale\":3}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema2), + HoodieSchema.parse(schema3) + )); + } + + @Test + void testLogicalType() { + // Validates that different logical types on the same underlying type are not equal + String schema1 = "{\"type\":\"int\",\"logicalType\":\"date\"}"; + String schema2 = "{\"type\":\"int\",\"logicalType\":\"time-millis\"}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testLogicalTypesWithDifferentPrimitiveTypes() { + // Tests that logical types with different underlying types are not equal + // even if they represent the same logical concept (decimal) + String decimalFixed = "{\"type\":\"fixed\",\"name\":\"D\",\"size\":16,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}"; + String decimalBytes = "{\"type\":\"bytes\",\"logicalType\":\"decimal\",\"precision\":10,\"scale\":2}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(decimalFixed), + HoodieSchema.parse(decimalBytes) + )); + } + + @Test + void testComparingSchemaFieldNames() { + // Validates that schemas with different names are not equal + // even if their structure is identical - tests for records, enums, and fixed types + String record1 = "{\"type\":\"record\",\"name\":\"R1\",\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"; + String record2 = "{\"type\":\"record\",\"name\":\"R2\",\"fields\":[{\"name\":\"f2\",\"type\":\"string\"}]}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(record1), + HoodieSchema.parse(record2) + )); + + // Enum + String enum1 = "{\"type\":\"enum\",\"name\":\"E1\",\"symbols\":[\"A\"]}"; + String enum2 = "{\"type\":\"enum\",\"name\":\"E2\",\"symbols\":[\"A\"]}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(enum1), + HoodieSchema.parse(enum2) + )); + + // Fixed + String fixed1 = "{\"type\":\"fixed\",\"name\":\"F1\",\"size\":16}"; + String fixed2 = "{\"type\":\"fixed\",\"name\":\"F2\",\"size\":16}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(fixed1), + HoodieSchema.parse(fixed2) + )); + } + + @Test + void testEnumSchemaName() { + // Tests that enum schemas with different names are not equal + // even if they have the same symbols + HoodieSchema schema1 = HoodieSchema.createEnum("enum1", null, null, Arrays.asList("A", "B", "C")); + HoodieSchema schema2 = HoodieSchema.createEnum("enum2", null, null, Arrays.asList("A", "B", "C")); + HoodieSchema schema3 = HoodieSchema.createEnum("enum1", null, null, Arrays.asList("A", "B", "C")); + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema1, schema2)); + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema1, schema3)); + } + + @Test + void testEnumSchema() { + // Validates that enum schemas with different symbol sets are not equal + // even if one is a subset of the other + HoodieSchema schema1 = HoodieSchema.createEnum("enum", null, null, Arrays.asList("A", "C")); + HoodieSchema schema2 = HoodieSchema.createEnum("enum", null, null, Arrays.asList("A", "B", "C")); + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema1, schema2)); + } + + @Test + void testEnumSymbolsOrder() { + // Tests that enum schemas with different symbol orders are not equal + // Order matters for enum serialization + String schema1 = "{\"type\":\"enum\",\"name\":\"E\",\"symbols\":[\"A\",\"B\"]}"; + String schema2 = "{\"type\":\"enum\",\"name\":\"E\",\"symbols\":[\"B\",\"A\"]}"; + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + } + + @Test + void testDefaultValueEquality() { + // Tests comparison of schemas with different default values + HoodieSchemaField field1 = HoodieSchemaField.of("field", HoodieSchema.create(HoodieSchemaType.STRING), null, "default1"); + HoodieSchemaField field2 = HoodieSchemaField.of("field", HoodieSchema.create(HoodieSchemaType.STRING), null, "default2"); + HoodieSchemaField field3 = HoodieSchemaField.of("field", HoodieSchema.create(HoodieSchemaType.STRING), null, "default1"); + HoodieSchemaField fieldNoDefault = HoodieSchemaField.of("field", HoodieSchema.create(HoodieSchemaType.STRING), null, null); + + HoodieSchema record1 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field1)); + HoodieSchema record2 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field2)); + HoodieSchema record3 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field3)); + HoodieSchema recordNoDefault = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(fieldNoDefault)); + + // Different default values should not be equal + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record2)); + + // Same default values should be equal + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record3)); + + // No default value vs default value should not be equal + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, recordNoDefault)); + + // No default values should be equal to each other + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(recordNoDefault, recordNoDefault)); + } + + @Test + void testComplexDefaultValueEquality() { + // Tests equality comparison of schemas with complex default values (nested records) + // Validates that default value comparison works correctly for nested structures + HoodieSchema innerSchema = HoodieSchema.createRecord("inner", null, null, false, + Collections.singletonList( + HoodieSchemaField.of("value", HoodieSchema.create(HoodieSchemaType.STRING), null, null) + )); + + // Create default values as JSON-compatible Maps + Map defaultValue1 = new HashMap<>(); + defaultValue1.put("value", "test"); + + Map defaultValue2 = new HashMap<>(); + defaultValue2.put("value", "test"); + + Map defaultValue3 = new HashMap<>(); + defaultValue3.put("value", "different"); + + HoodieSchemaField field1 = HoodieSchemaField.of("field", innerSchema, null, defaultValue1); + HoodieSchemaField field2 = HoodieSchemaField.of("field", innerSchema, null, defaultValue2); + HoodieSchemaField field3 = HoodieSchemaField.of("field", innerSchema, null, defaultValue3); + + HoodieSchema record1 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field1)); + HoodieSchema record2 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field2)); + HoodieSchema record3 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field3)); + + // Same complex default values should be equal + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record2)); + + // Different complex default values should not be equal + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record3)); + } + + @Test + void testArrayDefaultValueEquality() { + // Tests equality comparison of schemas with array default values + // Validates that default value comparison works correctly for array types + List defaultArray1 = Arrays.asList("a", "b", "c"); + List defaultArray2 = Arrays.asList("a", "b", "c"); + List defaultArray3 = Arrays.asList("x", "y", "z"); + + HoodieSchema arraySchema = HoodieSchema.createArray(HoodieSchema.create(HoodieSchemaType.STRING)); + + HoodieSchemaField field1 = HoodieSchemaField.of("field", arraySchema, null, defaultArray1); + HoodieSchemaField field2 = HoodieSchemaField.of("field", arraySchema, null, defaultArray2); + HoodieSchemaField field3 = HoodieSchemaField.of("field", arraySchema, null, defaultArray3); + + HoodieSchema record1 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field1)); + HoodieSchema record2 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field2)); + HoodieSchema record3 = HoodieSchema.createRecord("test", null, null, false, Collections.singletonList(field3)); + + // Same array default values should be equal + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record2)); + + // Different array default values should not be equal + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record3)); + } + + @Test + void testCompareWithNull() { + // Tests schema comparison behavior when one or both schemas are null + // Validates proper handling of null cases in the comparator + HoodieSchema schema = HoodieSchema.create(HoodieSchemaType.STRING); + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(schema, null)); + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(null, schema)); + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(null, null)); + } + + @Test + void testRecordFieldCountMismatch() { + // Tests that records with different number of fields are not equal + // even if all common fields match + HoodieSchema record1 = HoodieSchema.createRecord("TestRecord", null, null, false, + Collections.singletonList( + HoodieSchemaField.of("field1", HoodieSchema.create(HoodieSchemaType.STRING), null, null) + )); + + HoodieSchema record2 = HoodieSchema.createRecord("TestRecord", null, null, false, + Arrays.asList( + HoodieSchemaField.of("field1", HoodieSchema.create(HoodieSchemaType.STRING), null, null), + HoodieSchemaField.of("field2", HoodieSchema.create(HoodieSchemaType.STRING), null, null) + )); + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(record1, record2)); + } + + @Test + void testUnionSizeMismatch() { + // Tests that unions with different number of types are not equal + // even if all common types match + HoodieSchema union1 = HoodieSchema.createUnion(Arrays.asList( + HoodieSchema.create(HoodieSchemaType.NULL), + HoodieSchema.create(HoodieSchemaType.STRING) + )); + + HoodieSchema union2 = HoodieSchema.createUnion(Arrays.asList( + HoodieSchema.create(HoodieSchemaType.NULL), + HoodieSchema.create(HoodieSchemaType.STRING), + HoodieSchema.create(HoodieSchemaType.INT) + )); + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(union1, union2)); + } + + @Test + void testUnionOrder() { + // Tests that the order of types in a union doesn't affect equality + // Important for schema evolution as union member order shouldn't matter + HoodieSchema union1 = HoodieSchema.createUnion(Arrays.asList( + HoodieSchema.create(HoodieSchemaType.NULL), + HoodieSchema.create(HoodieSchemaType.STRING) + )); + + HoodieSchema union2 = HoodieSchema.createUnion(Arrays.asList( + HoodieSchema.create(HoodieSchemaType.STRING), + HoodieSchema.create(HoodieSchemaType.NULL) + )); + + assertTrue(HoodieSchemaComparatorForSchemaEvolution.schemaEquals(union1, union2)); + } + + @Test + void testLogicalTypeOneNull() { + // Tests comparison of schemas where one has a logical type and the other doesn't + // Validates that logical type presence affects equality + String schema1 = "{\"type\":\"int\",\"logicalType\":\"date\"}"; + String schema2 = "{\"type\":\"int\"}"; + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema1), + HoodieSchema.parse(schema2) + )); + + // Swap the 2 schema position should have no effect. + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(schema2), + HoodieSchema.parse(schema1) + )); + } + + @Test + void testSchemaWrapperNullAndTypeMismatch() { + // Tests SchemaWrapper's null handling and type comparison behavior + // Validates proper handling of edge cases in the wrapper class + HoodieSchema schema = HoodieSchema.create(HoodieSchemaType.STRING); + HoodieSchemaComparatorForSchemaEvolution.SchemaWrapper wrapper = new HoodieSchemaComparatorForSchemaEvolution.SchemaWrapper(schema); + + assertNotNull(wrapper); + assertNotEquals(wrapper, new Object()); + } + + @Test + void testTimestampLogicalTypeEquality() { + // Tests that timestamp logical types with different precisions are not equal + String timestampMillis = "{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}"; + String timestampMicros = "{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}"; + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(timestampMillis), + HoodieSchema.parse(timestampMicros) + )); + } + + @Test + void testTimeLogicalTypeEquality() { + // Tests that time logical types with different precisions are not equal + String timeMillis = "{\"type\":\"int\",\"logicalType\":\"time-millis\"}"; + String timeMicros = "{\"type\":\"long\",\"logicalType\":\"time-micros\"}"; + + assertFalse(HoodieSchemaComparatorForSchemaEvolution.schemaEquals( + HoodieSchema.parse(timeMillis), + HoodieSchema.parse(timeMicros) + )); + } +} diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java index 62a555a0f9d7e..77445f1115802 100644 --- a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/AvroOrcUtils.java @@ -18,6 +18,9 @@ package org.apache.hudi.common.util; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaField; +import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.exception.HoodieIOException; import org.apache.avro.Conversions; @@ -55,7 +58,6 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.apache.avro.JsonProperties.NULL_VALUE; import static org.apache.hudi.common.util.BinaryUtil.toBytes; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; @@ -810,74 +812,65 @@ private static Schema getActualSchemaType(Schema unionSchema) { } } - public static Schema createAvroSchemaWithDefaultValue(TypeDescription orcSchema, String recordName, String namespace, boolean nullable) { - Schema avroSchema = createAvroSchemaWithNamespace(orcSchema,recordName,namespace); - List fields = new ArrayList(); - List fieldList = avroSchema.getFields(); - for (Field field : fieldList) { - Schema fieldSchema = field.schema(); - Schema nullableSchema = Schema.createUnion(Schema.create(Schema.Type.NULL),fieldSchema); + public static HoodieSchema createSchemaWithDefaultValue(TypeDescription orcSchema, String recordName, String namespace, boolean nullable) { + HoodieSchema hoodieSchema = createSchemaWithNamespace(orcSchema, recordName, namespace); + List fields = new ArrayList<>(hoodieSchema.getFields().size()); + for (HoodieSchemaField field : hoodieSchema.getFields()) { + HoodieSchema fieldSchema = field.schema(); + HoodieSchema nullableSchema = HoodieSchema.createNullable(fieldSchema); if (nullable) { - fields.add(new Schema.Field(field.name(), nullableSchema, null, NULL_VALUE)); + fields.add(HoodieSchemaField.of(field.name(), nullableSchema, null, HoodieSchema.NULL_VALUE)); } else { - fields.add(new Schema.Field(field.name(), fieldSchema, null, null)); + fields.add(HoodieSchemaField.of(field.name(), fieldSchema, null, null)); } } - Schema schema = Schema.createRecord(recordName, null, null, false); - schema.setFields(fields); - return schema; + return HoodieSchema.createRecord(recordName, null, null, false, fields); } - private static Schema createAvroSchemaWithNamespace(TypeDescription orcSchema, String recordName, String namespace) { + private static HoodieSchema createSchemaWithNamespace(TypeDescription orcSchema, String recordName, String namespace) { switch (orcSchema.getCategory()) { case BOOLEAN: - return Schema.create(Schema.Type.BOOLEAN); + return HoodieSchema.create(HoodieSchemaType.BOOLEAN); case BYTE: // tinyint (8 bit), use int to hold it - return Schema.create(Schema.Type.INT); + return HoodieSchema.create(HoodieSchemaType.INT); case SHORT: // smallint (16 bit), use int to hold it - return Schema.create(Schema.Type.INT); + return HoodieSchema.create(HoodieSchemaType.INT); case INT: // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MILLIS, but there is no way to distinguish - return Schema.create(Schema.Type.INT); + return HoodieSchema.create(HoodieSchemaType.INT); case LONG: // the Avro logical type could be AvroTypeUtil.LOGICAL_TYPE_TIME_MICROS, but there is no way to distinguish - return Schema.create(Schema.Type.LONG); + return HoodieSchema.create(HoodieSchemaType.LONG); case FLOAT: - return Schema.create(Schema.Type.FLOAT); + return HoodieSchema.create(HoodieSchemaType.FLOAT); case DOUBLE: - return Schema.create(Schema.Type.DOUBLE); + return HoodieSchema.create(HoodieSchemaType.DOUBLE); case VARCHAR: case CHAR: case STRING: - return Schema.create(Schema.Type.STRING); + return HoodieSchema.create(HoodieSchemaType.STRING); case DATE: - Schema date = Schema.create(Schema.Type.INT); - LogicalTypes.date().addToSchema(date); - return date; + return HoodieSchema.createDate(); case TIMESTAMP: - Schema timestamp = Schema.create(Schema.Type.LONG); - LogicalTypes.timestampMillis().addToSchema(timestamp); - return timestamp; + return HoodieSchema.createTimestampMillis(); case BINARY: - return Schema.create(Schema.Type.BYTES); + return HoodieSchema.create(HoodieSchemaType.BYTES); case DECIMAL: - Schema decimal = Schema.create(Schema.Type.BYTES); - LogicalTypes.decimal(orcSchema.getPrecision(), orcSchema.getScale()).addToSchema(decimal); - return decimal; + return HoodieSchema.createDecimal(orcSchema.getPrecision(), orcSchema.getScale()); case LIST: - return Schema.createArray(createAvroSchemaWithNamespace(orcSchema.getChildren().get(0), recordName, "")); + return HoodieSchema.createArray(createSchemaWithNamespace(orcSchema.getChildren().get(0), recordName, "")); case MAP: - return Schema.createMap(createAvroSchemaWithNamespace(orcSchema.getChildren().get(1), recordName, "")); + return HoodieSchema.createMap(createSchemaWithNamespace(orcSchema.getChildren().get(1), recordName, "")); case STRUCT: - List childFields = new ArrayList<>(); + List childFields = new ArrayList<>(); for (int i = 0; i < orcSchema.getChildren().size(); i++) { TypeDescription childType = orcSchema.getChildren().get(i); String childName = orcSchema.getFieldNames().get(i); - childFields.add(new Field(childName, createAvroSchemaWithNamespace(childType, childName, ""), null, null)); + childFields.add(HoodieSchemaField.of(childName, createSchemaWithNamespace(childType, childName, ""), null, null)); } - return Schema.createRecord(recordName, null, namespace, false, childFields); + return HoodieSchema.createRecord(recordName, null, namespace, false, childFields); default: throw new IllegalStateException(String.format("Unrecognized ORC type: %s", orcSchema.getCategory().getName())); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java index 2932daf9f2d30..c95ca1f3ba581 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java @@ -18,7 +18,6 @@ package org.apache.hudi.hadoop.hive; -import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; @@ -339,7 +338,7 @@ public void multiLevelPartitionReadersRealtimeCombineHoodieInputFormat() throws String hiveColumnNames = fields.stream().map(HoodieSchemaField::name).collect(Collectors.joining(",")); hiveColumnNames = hiveColumnNames + ",year,month,day"; - String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(tripsHiveColumnTypes); + String modifiedHiveColumnTypes = HoodieSchemaUtils.addMetadataColumnTypes(tripsHiveColumnTypes); modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string,string,string"; jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index 0a96659c5592b..e116efd5a0afc 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -469,7 +469,7 @@ public static void setProjectFieldsForInputFormat(JobConf jobConf, String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr")) .map(Schema.Field::name).collect(Collectors.joining(",")); hiveColumnNames = hiveColumnNames + ",datestr"; - String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes); + String modifiedHiveColumnTypes = HoodieSchemaUtils.addMetadataColumnTypes(hiveColumnTypes); modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string"; jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes); @@ -496,7 +496,7 @@ public static void setPropsForInputFormat(JobConf jobConf, String hiveColumnNames = fields.stream().filter(field -> !field.name().equalsIgnoreCase("datestr")) .map(Schema.Field::name).collect(Collectors.joining(",")); hiveColumnNames = hiveColumnNames + ",datestr"; - String modifiedHiveColumnTypes = HoodieAvroUtils.addMetadataColumnTypes(hiveColumnTypes); + String modifiedHiveColumnTypes = HoodieSchemaUtils.addMetadataColumnTypes(hiveColumnTypes); modifiedHiveColumnTypes = modifiedHiveColumnTypes + ",string"; jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames); jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, modifiedHiveColumnTypes); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java index 977c629c20ad6..73568ebb48f21 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestDataValidationCheckForLogCompactionActions.java @@ -185,7 +185,7 @@ private void verifyRecords(TestTableContents mainTable, TestTableContents experi // Verify every field. mainRecordsMap.forEach((key, value) -> { assertTrue(experimentRecordsMap.containsKey(key + RECORD_KEY_APPEND_VALUE)); - assertGenericRecords(value, experimentRecordsMap.get(key + RECORD_KEY_APPEND_VALUE), readerSchema.toAvroSchema(), excludeFields); + assertGenericRecords(value, experimentRecordsMap.get(key + RECORD_KEY_APPEND_VALUE), readerSchema, excludeFields); }); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java index de92a916267dc..071e1ffb20f19 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java @@ -420,7 +420,7 @@ private void checkBootstrapResults(int totalRecords, HoodieSchema schema, String HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()), FSUtils.getAllPartitionPaths(context, metaClient, false).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), - basePath, roJobConf, false, schema.toAvroSchema(), TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>()); + basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>()); assertEquals(totalRecords, records.size()); Set seenKeys = new HashSet<>(); for (GenericRecord r : records) { @@ -438,7 +438,7 @@ private void checkBootstrapResults(int totalRecords, HoodieSchema schema, String HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()), FSUtils.getAllPartitionPaths(context, metaClient, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), - basePath, rtJobConf, true, schema.toAvroSchema(), TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>()); + basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, false, new ArrayList<>()); assertEquals(totalRecords, records.size()); for (GenericRecord r : records) { assertEquals(r.get("_row_key").toString(), r.get("_hoodie_record_key").toString(), "Realtime Record :" + r); @@ -454,7 +454,7 @@ private void checkBootstrapResults(int totalRecords, HoodieSchema schema, String HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()), FSUtils.getAllPartitionPaths(context, metaClient, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), - basePath, roJobConf, false, schema.toAvroSchema(), TRIP_HIVE_COLUMN_TYPES, + basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true, HoodieRecord.HOODIE_META_COLUMNS); assertEquals(totalRecords, records.size()); seenKeys = new HashSet<>(); @@ -471,7 +471,7 @@ private void checkBootstrapResults(int totalRecords, HoodieSchema schema, String HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()), FSUtils.getAllPartitionPaths(context, metaClient, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), - basePath, rtJobConf, true, schema.toAvroSchema(), TRIP_HIVE_COLUMN_TYPES, true, + basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true, HoodieRecord.HOODIE_META_COLUMNS); assertEquals(totalRecords, records.size()); for (GenericRecord r : records) { @@ -486,7 +486,7 @@ private void checkBootstrapResults(int totalRecords, HoodieSchema schema, String HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()), FSUtils.getAllPartitionPaths(context, metaClient, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), - basePath, roJobConf, false, schema.toAvroSchema(), TRIP_HIVE_COLUMN_TYPES, true, + basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, true, Arrays.asList("_row_key")); assertEquals(totalRecords, records.size()); seenKeys = new HashSet<>(); @@ -503,7 +503,7 @@ private void checkBootstrapResults(int totalRecords, HoodieSchema schema, String HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()), FSUtils.getAllPartitionPaths(context, metaClient, HoodieMetadataConfig.DEFAULT_METADATA_ENABLE_FOR_READERS).stream() .map(f -> basePath + "/" + f).collect(Collectors.toList()), - basePath, rtJobConf, true, schema.toAvroSchema(), TRIP_HIVE_COLUMN_TYPES, true, + basePath, rtJobConf, true, schema, TRIP_HIVE_COLUMN_TYPES, true, Arrays.asList("_row_key")); assertEquals(totalRecords, records.size()); for (GenericRecord r : records) { diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java index 4878a5fe5a4ec..d3acadf29471a 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java @@ -58,7 +58,6 @@ import org.apache.hudi.table.action.bootstrap.BootstrapUtils; import org.apache.hudi.testutils.HoodieSparkClientTestBase; -import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -88,6 +87,7 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -165,8 +165,7 @@ public HoodieSchema generateNewDataSetAndReturnSchema(long timestamp, int numRec TypeDescription orcSchema = orcReader.getSchema(); - Schema avroSchema = AvroOrcUtils.createAvroSchemaWithDefaultValue(orcSchema, "test_orc_record", null, true); - return HoodieSchema.fromAvroSchema(avroSchema); + return AvroOrcUtils.createSchemaWithDefaultValue(orcSchema, "test_orc_record", null, true); } @Test @@ -355,7 +354,7 @@ public void testMetadataAndFullBootstrapWithUpdatesMOR() throws Exception { private void checkBootstrapResults(int totalRecords, HoodieSchema schema, String maxInstant, boolean checkNumRawFiles, int expNumInstants, long expTimestamp, long expROTimestamp, boolean isDeltaCommit, boolean validateRecordsForCommitTime) throws Exception { checkBootstrapResults(totalRecords, schema, maxInstant, checkNumRawFiles, expNumInstants, expNumInstants, - expTimestamp, expROTimestamp, isDeltaCommit, Arrays.asList(maxInstant), validateRecordsForCommitTime); + expTimestamp, expROTimestamp, isDeltaCommit, Collections.singletonList(maxInstant), validateRecordsForCommitTime); } private void checkBootstrapResults(int totalRecords, HoodieSchema schema, String instant, boolean checkNumRawFiles, @@ -421,8 +420,8 @@ public JavaRDD generateInputRecords(String tableName, String sourc new Path(filePath), new OrcFile.ReaderOptions(jsc.hadoopConfiguration())); TypeDescription orcSchema = orcReader.getSchema(); - Schema avroSchema = AvroOrcUtils.createAvroSchemaWithDefaultValue(orcSchema, "test_orc_record", null, true); - return generateInputBatch(jsc, partitionPaths, HoodieSchema.fromAvroSchema(avroSchema)); + HoodieSchema hoodieSchema = AvroOrcUtils.createSchemaWithDefaultValue(orcSchema, "test_orc_record", null, true); + return generateInputBatch(jsc, partitionPaths, hoodieSchema); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); @@ -437,7 +436,7 @@ private static JavaRDD generateInputBatch(JavaSparkContext jsc, return jsc.parallelize(fullFilePathsWithPartition.stream().flatMap(p -> { try { Configuration conf = jsc.hadoopConfiguration(); - AvroReadSupport.setAvroReadSchema(conf, writerSchema.getAvroSchema()); + AvroReadSupport.setAvroReadSchema(conf, writerSchema.toAvroSchema()); Reader orcReader = OrcFile.createReader( p.getValue(), new OrcFile.ReaderOptions(jsc.hadoopConfiguration())); @@ -445,9 +444,9 @@ private static JavaRDD generateInputBatch(JavaSparkContext jsc, TypeDescription orcSchema = orcReader.getSchema(); - Schema avroSchema = AvroOrcUtils.createAvroSchemaWithDefaultValue(orcSchema, "test_orc_record", null, true); + HoodieSchema hoodieSchema = AvroOrcUtils.createSchemaWithDefaultValue(orcSchema, "test_orc_record", null, true); - Iterator recIterator = new OrcReaderIterator(recordReader, avroSchema, orcSchema); + Iterator recIterator = new OrcReaderIterator(recordReader, hoodieSchema.toAvroSchema(), orcSchema); return StreamSupport.stream(Spliterators.spliteratorUnknownSize(recIterator, 0), false).map(gr -> { String key = gr.get("_row_key").toString(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexDataTypes.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexDataTypes.scala index d59038ec0d409..b0108722bdc1e 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexDataTypes.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSecondaryIndexDataTypes.scala @@ -64,10 +64,10 @@ class TestSecondaryIndexDataTypes extends HoodieSparkSqlTestBase { | col_tinyint TINYINT, | col_timestamp TIMESTAMP, | col_date DATE, + | col_float FLOAT, -- Float is supported for secondary indexes | col_double DOUBLE, -- Double is supported for secondary indexes | -- Unsupported types | col_boolean BOOLEAN, - | col_float FLOAT, | col_decimal DECIMAL(10,2), | col_binary BINARY, | col_array ARRAY, @@ -126,7 +126,7 @@ class TestSecondaryIndexDataTypes extends HoodieSparkSqlTestBase { | cast('2023-01-02 10:00:00' as timestamp) as col_timestamp, | cast('2023-01-02' as date) as col_date, | false as col_boolean, - | cast(2.2220 as float) as col_float, + | cast(2.5 as float) as col_float, | cast(2.222220 as double) as col_double, | cast(22.20 as decimal(10,2)) as col_decimal, | cast('binary2' as binary) as col_binary, @@ -150,7 +150,7 @@ class TestSecondaryIndexDataTypes extends HoodieSparkSqlTestBase { | cast('2023-01-03 10:00:00' as timestamp) as col_timestamp, | cast('2023-01-03' as date) as col_date, | true as col_boolean, - | cast(3.3 as float) as col_float, + | cast(3.75 as float) as col_float, | cast(3.33 as double) as col_double, | cast(33.33 as decimal(10,2)) as col_decimal, | cast('binary3' as binary) as col_binary, @@ -171,10 +171,13 @@ class TestSecondaryIndexDataTypes extends HoodieSparkSqlTestBase { ("col_tinyint", Seq(("'3'", 3), ("2L", 2), (1, 1), (1.0, 1), (if (gteqSpark4_0) "1.00" else "'1.00'", 1))), ("col_timestamp", Seq(("cast('2023-01-03 10:00:00' as timestamp)", 3))), ("col_date", Seq(("cast('2023-01-03' as date)", 3))), + // Note: Using exactly representable float values to avoid binary floating-point precision issues. + // Exactly representable in binary: 1.0 = 2^0, 2.5 = 2^1 + 2^-1, 3.75 = 2^1 + 2^0 + 2^-1 + 2^-2 + // Avoid infinite/non-representable values like 2.222 + ("col_float", Seq(("'1'", 1), ("1L", 1), (2.5, 2), (3.75, 3))), ("col_double", Seq(("'1'", 1), ("1L", 1), (2.222220, 2), ("'3.3300'", 3))) ) val unsupportedColumns = Seq( - "col_float", "col_decimal", "col_boolean", "col_binary", @@ -236,6 +239,9 @@ class TestSecondaryIndexDataTypes extends HoodieSparkSqlTestBase { Seq("1000$1"), Seq("3000$3"), Seq("1.0$1"), + Seq("2.5$2"), + Seq("3.75$3"), + Seq("1.0$1"), Seq("2.22222$2"), Seq("3.33$3") )