Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.cli.commands;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
Expand All @@ -27,6 +26,8 @@
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
Expand All @@ -36,7 +37,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.storage.StoragePath;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -259,18 +259,18 @@ public void testFetchTableSchema() throws Exception {
assertTrue(ShellEvaluationResultUtil.isSuccess(result));

String actualSchemaStr = result.toString().substring(result.toString().indexOf("{"));
Schema actualSchema = new Schema.Parser().parse(actualSchemaStr);
HoodieSchema actualSchema = HoodieSchema.parse(actualSchemaStr);

Schema expectedSchema = new Schema.Parser().parse(schemaStr);
expectedSchema = HoodieAvroUtils.addMetadataFields(expectedSchema);
HoodieSchema expectedSchema = HoodieSchema.parse(schemaStr);
expectedSchema = HoodieSchemaUtils.addMetadataFields(expectedSchema);
assertEquals(actualSchema, expectedSchema);

File file = File.createTempFile("temp", null);
result = shell.evaluate(() -> "fetch table schema --outputFilePath " + file.getAbsolutePath());
assertTrue(ShellEvaluationResultUtil.isSuccess(result));

actualSchemaStr = getFileContent(file.getAbsolutePath());
actualSchema = new Schema.Parser().parse(actualSchemaStr);
actualSchema = HoodieSchema.parse(actualSchemaStr);
assertEquals(actualSchema, expectedSchema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.client;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPlan;
Expand All @@ -46,6 +45,7 @@
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
Expand Down Expand Up @@ -341,15 +341,15 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) {
InternalSchema internalSchema;
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField());
HoodieSchema schema = HoodieSchemaUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField());
if (historySchemaStr.isEmpty()) {
internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> InternalSchemaConverter.convert(HoodieSchema.fromAvroSchema(avroSchema)));
internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> InternalSchemaConverter.convert(schema));
internalSchema.setSchemaId(Long.parseLong(instantTime));
} else {
internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
}
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema, config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema.toAvroSchema(), internalSchema, config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
if (evolvedSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
//TODO save history schema by metaTable
Expand All @@ -361,7 +361,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr));
}
// update SCHEMA_KEY
metadata.addMetadata(SCHEMA_KEY, InternalSchemaConverter.convert(evolvedSchema, avroSchema.getFullName()).toString());
metadata.addMetadata(SCHEMA_KEY, InternalSchemaConverter.convert(evolvedSchema, schema.getFullName()).toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hudi.util.Lazy;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -59,12 +58,12 @@ class ConcurrentSchemaEvolutionTableSchemaGetter {

protected final HoodieTableMetaClient metaClient;

private final Lazy<ConcurrentHashMap<HoodieInstant, Schema>> tableSchemaCache;
private final Lazy<ConcurrentHashMap<HoodieInstant, HoodieSchema>> tableSchemaCache;

private Option<HoodieInstant> latestCommitWithValidSchema = Option.empty();

@VisibleForTesting
public ConcurrentHashMap<HoodieInstant, Schema> getTableSchemaCache() {
public ConcurrentHashMap<HoodieInstant, HoodieSchema> getTableSchemaCache() {
return tableSchemaCache.get();
}

Expand All @@ -89,13 +88,11 @@ private HoodieSchema handlePartitionColumnsIfNeeded(HoodieSchema schema) {
return schema;
}

public Option<Schema> getTableAvroSchemaIfPresent(boolean includeMetadataFields, Option<HoodieInstant> instant) {
return getTableAvroSchemaFromTimelineWithCache(instant) // Get table schema from schema evolution timeline.
.map(HoodieSchema::fromAvroSchema)
public Option<HoodieSchema> getTableSchemaIfPresent(boolean includeMetadataFields, Option<HoodieInstant> instant) {
return getTableSchemaFromTimelineWithCache(instant) // Get table schema from schema evolution timeline.
.or(this::getTableCreateSchemaWithoutMetaField) // Fall back: read create schema from table config.
.map(tableSchema -> includeMetadataFields ? HoodieSchemaUtils.addMetadataFields(tableSchema, false) : HoodieSchemaUtils.removeMetadataFields(tableSchema))
.map(this::handlePartitionColumnsIfNeeded)
.map(HoodieSchema::toAvroSchema);
.map(this::handlePartitionColumnsIfNeeded);
}

private Option<HoodieSchema> getTableCreateSchemaWithoutMetaField() {
Expand All @@ -112,16 +109,16 @@ private Option<HoodieInstant> getCachedLatestCommitWithValidSchema() {
}

@VisibleForTesting
Option<Schema> getTableAvroSchemaFromTimelineWithCache(Option<HoodieInstant> instantTime) {
return getTableAvroSchemaFromTimelineWithCache(computeSchemaEvolutionTimelineInReverseOrder(), instantTime);
Option<HoodieSchema> getTableSchemaFromTimelineWithCache(Option<HoodieInstant> instantTime) {
return getTableSchemaFromTimelineWithCache(computeSchemaEvolutionTimelineInReverseOrder(), instantTime);
}

// [HUDI-9112] simplify the logic
Option<Schema> getTableAvroSchemaFromTimelineWithCache(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instantTime) {
Option<HoodieSchema> getTableSchemaFromTimelineWithCache(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instantTime) {
// If instantTime is empty it means read the latest one. In that case, get the cached instant if there is one.
boolean fetchFromLastValidCommit = instantTime.isEmpty();
Option<HoodieInstant> targetInstant = instantTime.or(getCachedLatestCommitWithValidSchema());
Schema cachedTableSchema = null;
HoodieSchema cachedTableSchema = null;

// Try cache first if there is a target instant to fetch for.
if (!targetInstant.isEmpty()) {
Expand All @@ -130,7 +127,7 @@ Option<Schema> getTableAvroSchemaFromTimelineWithCache(Stream<HoodieInstant> rev

// Cache miss on either latestCommitWithValidSchema or commitMetadataCache. Compute the result.
if (cachedTableSchema == null) {
Option<Pair<HoodieInstant, Schema>> instantWithSchema = getLastCommitMetadataWithValidSchemaFromTimeline(reversedTimelineStream, targetInstant);
Option<Pair<HoodieInstant, HoodieSchema>> instantWithSchema = getLastCommitMetadataWithValidSchemaFromTimeline(reversedTimelineStream, targetInstant);
if (instantWithSchema.isPresent()) {
targetInstant = Option.of(instantWithSchema.get().getLeft());
cachedTableSchema = instantWithSchema.get().getRight();
Expand Down Expand Up @@ -159,10 +156,10 @@ Option<Schema> getTableAvroSchemaFromTimelineWithCache(Stream<HoodieInstant> rev
}

@VisibleForTesting
Option<Pair<HoodieInstant, Schema>> getLastCommitMetadataWithValidSchemaFromTimeline(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instant) {
Option<Pair<HoodieInstant, HoodieSchema>> getLastCommitMetadataWithValidSchemaFromTimeline(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instant) {
// To find the table schema given an instant time, need to walk backwards from the latest instant in
// the timeline finding a completed instant containing a valid schema.
ConcurrentHashMap<HoodieInstant, Schema> tableSchemaAtInstant = new ConcurrentHashMap<>();
ConcurrentHashMap<HoodieInstant, HoodieSchema> tableSchemaAtInstant = new ConcurrentHashMap<>();
Option<HoodieInstant> instantWithTableSchema = Option.fromJavaOptional(reversedTimelineStream
// If a completion time is specified, find the first eligible instant in the schema evolution timeline.
// Should switch to completion time based.
Expand All @@ -179,7 +176,7 @@ Option<Pair<HoodieInstant, Schema>> getLastCommitMetadataWithValidSchemaFromTime
String schemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
boolean isValidSchemaStr = !StringUtils.isNullOrEmpty(schemaStr);
if (isValidSchemaStr) {
tableSchemaAtInstant.putIfAbsent(s, new Schema.Parser().parse(schemaStr));
tableSchemaAtInstant.putIfAbsent(s, HoodieSchema.parse(schemaStr));
}
return isValidSchemaStr;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSchemaEvolutionConflictException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;

/**
* Strategy interface for schema conflict resolution with multiple writers.
* Users can provide pluggable implementations for different kinds of strategies to resolve conflicts when multiple
Expand All @@ -50,14 +49,14 @@ public interface SchemaConflictResolutionStrategy {
* @throws HoodieWriteConflictException if schema conflicts cannot be resolved.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
Option<Schema> resolveConcurrentSchemaEvolution(
Option<HoodieSchema> resolveConcurrentSchemaEvolution(
HoodieTable table,
HoodieWriteConfig config,
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
Option<HoodieInstant> currTxnOwnerInstant);

static void throwConcurrentSchemaEvolutionException(
Option<Schema> tableSchemaAtTxnStart, Option<Schema> tableSchemaAtTxnValidation, Schema writerSchemaOfTxn,
Option<HoodieSchema> tableSchemaAtTxnStart, Option<HoodieSchema> tableSchemaAtTxnValidation, HoodieSchema writerSchemaOfTxn,
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
Option<HoodieInstant> currTxnOwnerInstant) throws HoodieWriteConflictException {
String errMsg = String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

package org.apache.hudi.client.transaction;

import org.apache.hudi.avro.AvroSchemaComparatorForSchemaEvolution;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaComparatorForSchemaEvolution;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Option;
Expand All @@ -28,11 +29,9 @@
import org.apache.hudi.table.HoodieTable;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;

import java.util.stream.Stream;

import static org.apache.hudi.avro.HoodieAvroUtils.isSchemaNull;
import static org.apache.hudi.client.transaction.SchemaConflictResolutionStrategy.throwConcurrentSchemaEvolutionException;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
import static org.apache.hudi.common.table.timeline.InstantComparison.LESSER_THAN_OR_EQUALS;
Expand All @@ -46,7 +45,7 @@
public class SimpleSchemaConflictResolutionStrategy implements SchemaConflictResolutionStrategy {

@Override
public Option<Schema> resolveConcurrentSchemaEvolution(
public Option<HoodieSchema> resolveConcurrentSchemaEvolution(
HoodieTable table,
HoodieWriteConfig config,
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
Expand All @@ -66,10 +65,10 @@ public Option<Schema> resolveConcurrentSchemaEvolution(
return Option.empty();
}

Schema writerSchemaOfTxn = new Schema.Parser().parse(config.getWriteSchema());
HoodieSchema writerSchemaOfTxn = HoodieSchema.parse(config.getWriteSchema());
// If a writer does not come with a meaningful schema, skip the schema resolution.
ConcurrentSchemaEvolutionTableSchemaGetter schemaResolver = new ConcurrentSchemaEvolutionTableSchemaGetter(table.getMetaClient());
if (isSchemaNull(writerSchemaOfTxn)) {
if (writerSchemaOfTxn.isSchemaNull()) {
return getTableSchemaAtInstant(schemaResolver, currTxnOwnerInstant.get());
}

Expand Down Expand Up @@ -98,14 +97,14 @@ public Option<Schema> resolveConcurrentSchemaEvolution(
return Option.of(writerSchemaOfTxn);
}

Option<Schema> tableSchemaAtTxnValidation = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnValidation);
Option<HoodieSchema> tableSchemaAtTxnValidation = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnValidation);
// If table schema is not defined, it's still case 1. There can be cases where there are commits but they didn't
// write any data.
if (!tableSchemaAtTxnValidation.isPresent()) {
return Option.of(writerSchemaOfTxn);
}
// Case 2, 4, 7: Both writers try to evolve to the same schema or neither evolves schema.
boolean writerSchemaIsCurrentTableSchema = AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnValidation.get());
boolean writerSchemaIsCurrentTableSchema = HoodieSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnValidation.get());
if (writerSchemaIsCurrentTableSchema) {
return Option.of(writerSchemaOfTxn);
}
Expand All @@ -122,7 +121,7 @@ public Option<Schema> resolveConcurrentSchemaEvolution(
throwConcurrentSchemaEvolutionException(
Option.empty(), tableSchemaAtTxnValidation, writerSchemaOfTxn, lastCompletedTxnOwnerInstant, currTxnOwnerInstant);
}
Option<Schema> tableSchemaAtTxnStart = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnStart);
Option<HoodieSchema> tableSchemaAtTxnStart = getTableSchemaAtInstant(schemaResolver, lastCompletedInstantAtTxnStart);
// If no table schema is defined, fall back to case 3.
if (!tableSchemaAtTxnStart.isPresent()) {
throwConcurrentSchemaEvolutionException(
Expand All @@ -132,13 +131,13 @@ public Option<Schema> resolveConcurrentSchemaEvolution(
// Case 5:
// Table schema has not changed from the start of the transaction till the pre-commit validation
// If table schema parsing failed we will blindly go with writer schema. use option.empty
if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(tableSchemaAtTxnStart.get(), tableSchemaAtTxnValidation.get())) {
if (HoodieSchemaComparatorForSchemaEvolution.schemaEquals(tableSchemaAtTxnStart.get(), tableSchemaAtTxnValidation.get())) {
return Option.of(writerSchemaOfTxn);
}

// Case 6: Current txn does not evolve schema, the tableSchema we saw at validation phase
// might be an evolved one, use it.
if (AvroSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnStart.get())) {
if (HoodieSchemaComparatorForSchemaEvolution.schemaEquals(writerSchemaOfTxn, tableSchemaAtTxnStart.get())) {
return tableSchemaAtTxnValidation;
}

Expand All @@ -164,9 +163,9 @@ private Option<HoodieInstant> getInstantInTimelineImmediatelyPriorToTimestamp(
.findFirst());
}

private static Option<Schema> getTableSchemaAtInstant(ConcurrentSchemaEvolutionTableSchemaGetter schemaResolver, HoodieInstant instant) {
private static Option<HoodieSchema> getTableSchemaAtInstant(ConcurrentSchemaEvolutionTableSchemaGetter schemaResolver, HoodieInstant instant) {
try {
return schemaResolver.getTableAvroSchemaIfPresent(false, Option.of(instant));
return schemaResolver.getTableSchemaIfPresent(false, Option.of(instant));
} catch (Exception ex) {
log.error("Cannot get table schema for instant {}", instant);
throw new HoodieException("Unable to get table schema", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.client.transaction.SimpleSchemaConflictResolutionStrategy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
Expand All @@ -36,7 +37,6 @@
import org.apache.hudi.table.HoodieTable;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;

import java.io.IOException;
import java.util.Map;
Expand Down Expand Up @@ -78,7 +78,7 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
Stream<HoodieInstant> completedInstantsDuringCurrentWriteOperation =
getCompletedInstantsDuringCurrentWriteOperation(table.getMetaClient(), pendingInstants);
ConflictResolutionStrategy resolutionStrategy = config.getWriteConflictResolutionStrategy();
Option<Schema> newTableSchema = resolveSchemaConflictIfNeeded(table, config, lastCompletedTxnOwnerInstant, currentTxnOwnerInstant);
Option<HoodieSchema> newTableSchema = resolveSchemaConflictIfNeeded(table, config, lastCompletedTxnOwnerInstant, currentTxnOwnerInstant);

Stream<HoodieInstant> instantStream = Stream.concat(resolutionStrategy.getCandidateInstants(
table.getMetaClient(), currentTxnOwnerInstant.get(), lastCompletedTxnOwnerInstant),
Expand Down Expand Up @@ -117,7 +117,7 @@ public static Option<HoodieCommitMetadata> resolveWriteConflictIfAny(
* @param currentTxnOwnerInstant current instant
* @return new table schema after successful schema resolution; empty if nothing to be resolved.
*/
public static Option<Schema> resolveSchemaConflictIfNeeded(final HoodieTable table,
public static Option<HoodieSchema> resolveSchemaConflictIfNeeded(final HoodieTable table,
final HoodieWriteConfig config,
final Option<HoodieInstant> lastCompletedTxnOwnerInstant,
final Option<HoodieInstant> currentTxnOwnerInstant) {
Expand Down
Loading
Loading