Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.client;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
Expand Down Expand Up @@ -114,7 +115,6 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;

import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
import static org.apache.hudi.keygen.KeyGenUtils.getComplexKeygenErrorMessage;
Expand Down Expand Up @@ -1657,7 +1657,7 @@ public void commitTableChange(InternalSchema newSchema, HoodieTableMetaClient me
TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
String historySchemaStr = schemaUtil.getTableHistorySchemaStrFromCommitMetadata().orElseGet(
() -> SerDeHelper.inheritSchemas(getInternalSchema(schemaUtil), ""));
Schema schema = InternalSchemaConverter.convert(newSchema, getAvroRecordQualifiedName(config.getTableName())).toAvroSchema();
Schema schema = InternalSchemaConverter.convert(newSchema, AvroSchemaUtils.getAvroRecordQualifiedName(config.getTableName())).toAvroSchema();
String commitActionType = CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, metaClient.getTableType());
String instantTime = startCommit(commitActionType, metaClient);
config.setSchema(schema.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.hudi.client.transaction;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.TimelineLayout;
Expand All @@ -30,27 +32,19 @@
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.util.Lazy;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;

import static org.apache.hudi.avro.AvroSchemaUtils.appendFieldsToSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.containsFieldInSchema;
import static org.apache.hudi.avro.AvroSchemaUtils.createNullableSchema;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
Expand Down Expand Up @@ -86,25 +80,27 @@ public ConcurrentSchemaEvolutionTableSchemaGetter(HoodieTableMetaClient metaClie
* @param schema the input schema to process
* @return the processed schema with partition columns handled appropriately
*/
private Schema handlePartitionColumnsIfNeeded(Schema schema) {
private HoodieSchema handlePartitionColumnsIfNeeded(HoodieSchema schema) {
if (metaClient.getTableConfig().shouldDropPartitionColumns()) {
return metaClient.getTableConfig().getPartitionFields()
.map(partitionFields -> appendPartitionColumns(schema, Option.ofNullable(partitionFields)))
.or(() -> Option.of(schema))
.get();
.map(partitionFields -> TableSchemaResolver.appendPartitionColumns(schema, Option.ofNullable(partitionFields)))
.orElseGet(() -> schema);
}
return schema;
}

public Option<Schema> getTableAvroSchemaIfPresent(boolean includeMetadataFields, Option<HoodieInstant> instant) {
return getTableAvroSchemaFromTimelineWithCache(instant) // Get table schema from schema evolution timeline.
.map(HoodieSchema::fromAvroSchema)
.or(this::getTableCreateSchemaWithoutMetaField) // Fall back: read create schema from table config.
.map(tableSchema -> includeMetadataFields ? HoodieAvroUtils.addMetadataFields(tableSchema, false) : HoodieAvroUtils.removeMetadataFields(tableSchema))
.map(this::handlePartitionColumnsIfNeeded);
.map(tableSchema -> includeMetadataFields ? HoodieSchemaUtils.addMetadataFields(tableSchema, false) : HoodieSchemaUtils.removeMetadataFields(tableSchema))
.map(this::handlePartitionColumnsIfNeeded)
.map(HoodieSchema::toAvroSchema);
}

private Option<Schema> getTableCreateSchemaWithoutMetaField() {
return metaClient.getTableConfig().getTableCreateSchema();
private Option<HoodieSchema> getTableCreateSchemaWithoutMetaField() {
return metaClient.getTableConfig().getTableCreateSchema()
.map(HoodieSchema::fromAvroSchema);
}

private void setCachedLatestCommitWithValidSchema(Option<HoodieInstant> instantOption) {
Expand Down Expand Up @@ -199,36 +195,6 @@ Option<Pair<HoodieInstant, Schema>> getLastCommitMetadataWithValidSchemaFromTime
return Option.of(Pair.of(instantWithTableSchema.get(), tableSchemaAtInstant.get(instantWithTableSchema.get())));
}

public static Schema appendPartitionColumns(Schema dataSchema, Option<String[]> partitionFields) {
// In cases when {@link DROP_PARTITION_COLUMNS} config is set true, partition columns
// won't be persisted w/in the data files, and therefore we need to append such columns
// when schema is parsed from data files
//
// Here we append partition columns with {@code StringType} as the data type
if (!partitionFields.isPresent() || partitionFields.get().length == 0) {
return dataSchema;
}

boolean hasPartitionColNotInSchema = Arrays.stream(partitionFields.get()).anyMatch(pf -> !containsFieldInSchema(dataSchema, pf));
boolean hasPartitionColInSchema = Arrays.stream(partitionFields.get()).anyMatch(pf -> containsFieldInSchema(dataSchema, pf));
if (hasPartitionColNotInSchema && hasPartitionColInSchema) {
throw new HoodieSchemaException("Partition columns could not be partially contained w/in the data schema");
}

if (hasPartitionColNotInSchema) {
// when hasPartitionColNotInSchema is true and hasPartitionColInSchema is false, all partition columns
// are not in originSchema. So we create and add them.
List<Field> newFields = new ArrayList<>();
for (String partitionField : partitionFields.get()) {
newFields.add(new Schema.Field(
partitionField, createNullableSchema(Schema.Type.STRING), "", JsonProperties.NULL_VALUE));
}
return appendFieldsToSchema(dataSchema, newFields);
}

return dataSchema;
}

/**
* Get timeline in REVERSE order that only contains completed instants which POTENTIALLY evolve the table schema.
* For types of instants that are included and not reflecting table schema at their instant completion time please refer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.io;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.TaskContextSupplier;
Expand All @@ -33,6 +32,7 @@
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCache;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
Expand All @@ -56,7 +56,6 @@
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;

import java.io.IOException;
Expand Down Expand Up @@ -357,10 +356,10 @@ protected Option<Map<String, String>> getRecordMetadata(HoodieRecord record, Hoo
Object eventTime = record.getColumnValueAsJava(schema.toAvroSchema(), eventTimeFieldName, props);
if (eventTime != null) {
// Append event_time.
Option<Schema.Field> field = AvroSchemaUtils.findNestedField(schema.toAvroSchema(), eventTimeFieldName);
Option<HoodieSchemaField> field = HoodieSchemaUtils.findNestedField(schema, eventTimeFieldName);
// Field should definitely exist.
eventTime = record.convertColumnValueForLogicalType(
field.get().schema(), eventTime, keepConsistentLogicalTimestamp);
field.get().schema().toAvroSchema(), eventTime, keepConsistentLogicalTimestamp);
Map<String, String> metadata = recordMetadata.orElse(new HashMap<>());
metadata.put(METADATA_EVENT_TIME_KEY, String.valueOf(eventTime));
return Option.of(metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

package org.apache.hudi.table;

import org.apache.hudi.avro.AvroSchemaCompatibility;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
Expand Down Expand Up @@ -51,6 +48,10 @@
import org.apache.hudi.common.model.HoodieIndexMetadata;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCompatibility;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
Expand Down Expand Up @@ -97,7 +98,6 @@

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;

import java.io.FileNotFoundException;
import java.io.IOException;
Expand All @@ -117,7 +117,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.avro.AvroSchemaUtils.getNonNullTypeFromUnion;
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.EAGER;
import static org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy.LAZY;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
Expand Down Expand Up @@ -927,13 +926,14 @@ public void validateSchema() throws HoodieUpsertException, HoodieInsertException

try {
TableSchemaResolver schemaResolver = new TableSchemaResolver(getMetaClient());
Option<Schema> existingTableSchema = schemaResolver.getTableAvroSchemaIfPresent(false);
if (!existingTableSchema.isPresent()) {
Option<HoodieSchema> existingTableSchemaOpt = schemaResolver.getTableSchemaIfPresent(false);
if (existingTableSchemaOpt.isEmpty()) {
return;
}
Schema writerSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema());
Schema tableSchema = HoodieAvroUtils.createHoodieWriteSchema(existingTableSchema.get());
AvroSchemaUtils.checkSchemaCompatible(tableSchema, writerSchema, shouldValidate, allowProjection, getDropPartitionColNames());

HoodieSchema writerSchema = HoodieSchemaUtils.createHoodieWriteSchema(config.getSchema(), false);
HoodieSchema tableSchema = HoodieSchemaUtils.addMetadataFields(existingTableSchemaOpt.get());
HoodieSchemaCompatibility.checkSchemaCompatible(tableSchema, writerSchema, shouldValidate, allowProjection, getDropPartitionColNames());

// Check secondary index column compatibility
Option<HoodieIndexMetadata> indexMetadata = metaClient.getIndexMetadata();
Expand All @@ -956,8 +956,8 @@ public void validateSchema() throws HoodieUpsertException, HoodieInsertException
* @throws SchemaCompatibilityException if a secondary index column has incompatible evolution
*/
static void validateSecondaryIndexSchemaEvolution(
Schema tableSchema,
Schema writerSchema,
HoodieSchema tableSchema,
HoodieSchema writerSchema,
HoodieIndexMetadata indexMetadata) throws SchemaCompatibilityException {

// Filter for secondary index definitions
Expand All @@ -984,21 +984,25 @@ static void validateSecondaryIndexSchemaEvolution(
for (Map.Entry<String, String> entry : columnToIndexName.entrySet()) {
String columnName = entry.getKey();
String indexName = entry.getValue();

Option<HoodieSchemaField> tableFieldOpt = tableSchema.getField(columnName);

Schema.Field tableField = tableSchema.getField(columnName);

if (tableField == null) {
if (tableFieldOpt.isEmpty()) {
// This shouldn't happen as indexed columns should exist in table schema
log.warn("Secondary index '{}' references non-existent column: {}", indexName, columnName);
continue;
}


HoodieSchemaField tableField = tableFieldOpt.get();

// Use AvroSchemaCompatibility's field lookup logic to handle aliases
Schema.Field writerField = AvroSchemaCompatibility.lookupWriterField(writerSchema, tableField);
HoodieSchemaField writerField = HoodieSchemaCompatibility.lookupWriterField(writerSchema, tableField);

if (writerField != null && !tableField.schema().equals(writerField.schema())) {
// Check if this is just making the field nullable/non-nullable, which is safe from SI perspective
if (getNonNullTypeFromUnion(tableField.schema()).equals(getNonNullTypeFromUnion(writerField.schema()))) {
HoodieSchema nonNullTableField = tableField.schema().getNonNullType();
HoodieSchema nonNullWriterField = writerField.schema().getNonNullType();
if (nonNullTableField.equals(nonNullWriterField)) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCompatibility;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.InternalSchemaCache;
Expand Down Expand Up @@ -57,8 +58,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.hudi.avro.AvroSchemaUtils.isStrictProjectionOf;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Slf4j
public class HoodieMergeHelper<T> extends BaseMergeHelper {
Expand Down Expand Up @@ -96,7 +95,7 @@ public void runMerge(HoodieTable<?, ?, ?, ?> table,
// - Its field-set is a proper subset (of the reader schema)
// - There's no schema evolution transformation necessary
boolean isPureProjection = schemaEvolutionTransformerOpt.isEmpty()
&& isStrictProjectionOf(readerSchema.toAvroSchema(), writerSchema.toAvroSchema());
&& HoodieSchemaCompatibility.isStrictProjectionOf(readerSchema, writerSchema);
// Check whether we will need to rewrite target (already merged) records into the
// writer's schema
boolean shouldRewriteInWriterSchema = !isPureProjection
Expand Down
Loading