Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.cli.commands;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.cli.HoodieCLI;
import org.apache.hudi.cli.functional.CLIFunctionalTestHarness;
import org.apache.hudi.cli.testutils.HoodieTestCommitMetadataGenerator;
Expand All @@ -27,6 +26,8 @@
import org.apache.hudi.common.fs.ConsistencyGuardConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
Expand All @@ -36,7 +37,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.storage.StoragePath;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -259,18 +259,18 @@ public void testFetchTableSchema() throws Exception {
assertTrue(ShellEvaluationResultUtil.isSuccess(result));

String actualSchemaStr = result.toString().substring(result.toString().indexOf("{"));
Schema actualSchema = new Schema.Parser().parse(actualSchemaStr);
HoodieSchema actualSchema = HoodieSchema.parse(actualSchemaStr);

Schema expectedSchema = new Schema.Parser().parse(schemaStr);
expectedSchema = HoodieAvroUtils.addMetadataFields(expectedSchema);
HoodieSchema expectedSchema = HoodieSchema.parse(schemaStr);
expectedSchema = HoodieSchemaUtils.addMetadataFields(expectedSchema);
assertEquals(actualSchema, expectedSchema);

File file = File.createTempFile("temp", null);
result = shell.evaluate(() -> "fetch table schema --outputFilePath " + file.getAbsolutePath());
assertTrue(ShellEvaluationResultUtil.isSuccess(result));

actualSchemaStr = getFileContent(file.getAbsolutePath());
actualSchema = new Schema.Parser().parse(actualSchemaStr);
actualSchema = HoodieSchema.parse(actualSchemaStr);
assertEquals(actualSchema, expectedSchema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void init() throws Exception {
"", HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");

HoodieSparkWriteableTestTable cowTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema.toAvroSchema());
HoodieSparkWriteableTestTable cowTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

cowTable.addCommit("20160401010101")
.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1)
Expand All @@ -127,7 +127,7 @@ public void init() throws Exception {
morTablePath, "mor_table", HoodieTableType.MERGE_ON_READ.name(),
"", HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");
HoodieSparkWriteableTestTable morTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema.toAvroSchema());
HoodieSparkWriteableTestTable morTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

morTable.addDeltaCommit("20160401010101");
morTable.withInserts(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, "1", hoodieRecords1)
Expand All @@ -151,7 +151,7 @@ public void init() throws Exception {
"", HoodieTableVersion.current().versionCode(),
"org.apache.hudi.common.model.HoodieAvroPayload");

HoodieSparkWriteableTestTable cowNonPartitionedTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema.toAvroSchema());
HoodieSparkWriteableTestTable cowNonPartitionedTable = HoodieSparkWriteableTestTable.of(HoodieCLI.getTableMetaClient(), schema);

cowNonPartitionedTable.addCommit("20160401010101")
.withInserts(HoodieTestDataGenerator.NO_PARTITION_PATH, "1", hoodieRecords1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.client;

import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPlan;
Expand All @@ -46,6 +45,7 @@
import org.apache.hudi.common.model.TableServiceType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
Expand Down Expand Up @@ -341,15 +341,15 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
FileBasedInternalSchemaStorageManager schemasManager = new FileBasedInternalSchemaStorageManager(table.getMetaClient());
if (!historySchemaStr.isEmpty() || Boolean.parseBoolean(config.getString(HoodieCommonConfig.RECONCILE_SCHEMA.key()))) {
InternalSchema internalSchema;
Schema avroSchema = HoodieAvroUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField());
HoodieSchema schema = HoodieSchemaUtils.createHoodieWriteSchema(config.getSchema(), config.allowOperationMetadataField());
if (historySchemaStr.isEmpty()) {
internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> InternalSchemaConverter.convert(HoodieSchema.fromAvroSchema(avroSchema)));
internalSchema = SerDeHelper.fromJson(config.getInternalSchema()).orElseGet(() -> InternalSchemaConverter.convert(schema));
internalSchema.setSchemaId(Long.parseLong(instantTime));
} else {
internalSchema = InternalSchemaUtils.searchSchema(Long.parseLong(instantTime),
SerDeHelper.parseSchemas(historySchemaStr));
}
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(avroSchema, internalSchema, config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
InternalSchema evolvedSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema.toAvroSchema(), internalSchema, config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS));
if (evolvedSchema.equals(internalSchema)) {
metadata.addMetadata(SerDeHelper.LATEST_SCHEMA, SerDeHelper.toJson(evolvedSchema));
//TODO save history schema by metaTable
Expand All @@ -361,7 +361,7 @@ private void saveInternalSchema(HoodieTable table, String instantTime, HoodieCom
schemasManager.persistHistorySchemaStr(instantTime, SerDeHelper.inheritSchemas(evolvedSchema, historySchemaStr));
}
// update SCHEMA_KEY
metadata.addMetadata(SCHEMA_KEY, InternalSchemaConverter.convert(evolvedSchema, avroSchema.getFullName()).toString());
metadata.addMetadata(SCHEMA_KEY, InternalSchemaConverter.convert(evolvedSchema, schema.getFullName()).toString());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@

package org.apache.hudi.client.bootstrap;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieFileStatus;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaType;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;

import org.apache.avro.Schema;

import java.util.List;

/**
Expand All @@ -45,11 +44,11 @@ public HoodieBootstrapSchemaProvider(HoodieWriteConfig writeConfig) {
* @param partitions List of partitions with files within them
* @return Avro Schema
*/
public final Schema getBootstrapSchema(HoodieEngineContext context, List<Pair<String, List<HoodieFileStatus>>> partitions) {
public final HoodieSchema getBootstrapSchema(HoodieEngineContext context, List<Pair<String, List<HoodieFileStatus>>> partitions) {
if (writeConfig.getSchema() != null) {
// Use schema specified by user if set
Schema userSchema = new Schema.Parser().parse(writeConfig.getSchema());
if (!HoodieAvroUtils.getNullSchema().equals(userSchema)) {
HoodieSchema userSchema = HoodieSchema.parse(writeConfig.getSchema());
if (!HoodieSchema.create(HoodieSchemaType.NULL).equals(userSchema)) {
return userSchema;
}
}
Expand All @@ -63,7 +62,7 @@ public final Schema getBootstrapSchema(HoodieEngineContext context, List<Pair<St
* @param partitions List of partitions with files within them
* @return Avro Schema
*/
protected abstract Schema getBootstrapSourceSchema(HoodieEngineContext context,
protected abstract HoodieSchema getBootstrapSourceSchema(HoodieEngineContext context,
List<Pair<String, List<HoodieFileStatus>>> partitions);

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.apache.hudi.util.Lazy;

import lombok.extern.slf4j.Slf4j;
import org.apache.avro.Schema;

import java.io.IOException;
import java.util.Arrays;
Expand All @@ -59,12 +58,12 @@ class ConcurrentSchemaEvolutionTableSchemaGetter {

protected final HoodieTableMetaClient metaClient;

private final Lazy<ConcurrentHashMap<HoodieInstant, Schema>> tableSchemaCache;
private final Lazy<ConcurrentHashMap<HoodieInstant, HoodieSchema>> tableSchemaCache;

private Option<HoodieInstant> latestCommitWithValidSchema = Option.empty();

@VisibleForTesting
public ConcurrentHashMap<HoodieInstant, Schema> getTableSchemaCache() {
public ConcurrentHashMap<HoodieInstant, HoodieSchema> getTableSchemaCache() {
return tableSchemaCache.get();
}

Expand All @@ -89,13 +88,11 @@ private HoodieSchema handlePartitionColumnsIfNeeded(HoodieSchema schema) {
return schema;
}

public Option<Schema> getTableAvroSchemaIfPresent(boolean includeMetadataFields, Option<HoodieInstant> instant) {
return getTableAvroSchemaFromTimelineWithCache(instant) // Get table schema from schema evolution timeline.
.map(HoodieSchema::fromAvroSchema)
public Option<HoodieSchema> getTableSchemaIfPresent(boolean includeMetadataFields, Option<HoodieInstant> instant) {
return getTableSchemaFromTimelineWithCache(instant) // Get table schema from schema evolution timeline.
.or(this::getTableCreateSchemaWithoutMetaField) // Fall back: read create schema from table config.
.map(tableSchema -> includeMetadataFields ? HoodieSchemaUtils.addMetadataFields(tableSchema, false) : HoodieSchemaUtils.removeMetadataFields(tableSchema))
.map(this::handlePartitionColumnsIfNeeded)
.map(HoodieSchema::toAvroSchema);
.map(this::handlePartitionColumnsIfNeeded);
}

private Option<HoodieSchema> getTableCreateSchemaWithoutMetaField() {
Expand All @@ -112,16 +109,16 @@ private Option<HoodieInstant> getCachedLatestCommitWithValidSchema() {
}

@VisibleForTesting
Option<Schema> getTableAvroSchemaFromTimelineWithCache(Option<HoodieInstant> instantTime) {
return getTableAvroSchemaFromTimelineWithCache(computeSchemaEvolutionTimelineInReverseOrder(), instantTime);
Option<HoodieSchema> getTableSchemaFromTimelineWithCache(Option<HoodieInstant> instantTime) {
return getTableSchemaFromTimelineWithCache(computeSchemaEvolutionTimelineInReverseOrder(), instantTime);
}

// [HUDI-9112] simplify the logic
Option<Schema> getTableAvroSchemaFromTimelineWithCache(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instantTime) {
Option<HoodieSchema> getTableSchemaFromTimelineWithCache(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instantTime) {
// If instantTime is empty it means read the latest one. In that case, get the cached instant if there is one.
boolean fetchFromLastValidCommit = instantTime.isEmpty();
Option<HoodieInstant> targetInstant = instantTime.or(getCachedLatestCommitWithValidSchema());
Schema cachedTableSchema = null;
HoodieSchema cachedTableSchema = null;

// Try cache first if there is a target instant to fetch for.
if (!targetInstant.isEmpty()) {
Expand All @@ -130,7 +127,7 @@ Option<Schema> getTableAvroSchemaFromTimelineWithCache(Stream<HoodieInstant> rev

// Cache miss on either latestCommitWithValidSchema or commitMetadataCache. Compute the result.
if (cachedTableSchema == null) {
Option<Pair<HoodieInstant, Schema>> instantWithSchema = getLastCommitMetadataWithValidSchemaFromTimeline(reversedTimelineStream, targetInstant);
Option<Pair<HoodieInstant, HoodieSchema>> instantWithSchema = getLastCommitMetadataWithValidSchemaFromTimeline(reversedTimelineStream, targetInstant);
if (instantWithSchema.isPresent()) {
targetInstant = Option.of(instantWithSchema.get().getLeft());
cachedTableSchema = instantWithSchema.get().getRight();
Expand Down Expand Up @@ -159,10 +156,10 @@ Option<Schema> getTableAvroSchemaFromTimelineWithCache(Stream<HoodieInstant> rev
}

@VisibleForTesting
Option<Pair<HoodieInstant, Schema>> getLastCommitMetadataWithValidSchemaFromTimeline(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instant) {
Option<Pair<HoodieInstant, HoodieSchema>> getLastCommitMetadataWithValidSchemaFromTimeline(Stream<HoodieInstant> reversedTimelineStream, Option<HoodieInstant> instant) {
// To find the table schema given an instant time, need to walk backwards from the latest instant in
// the timeline finding a completed instant containing a valid schema.
ConcurrentHashMap<HoodieInstant, Schema> tableSchemaAtInstant = new ConcurrentHashMap<>();
ConcurrentHashMap<HoodieInstant, HoodieSchema> tableSchemaAtInstant = new ConcurrentHashMap<>();
Option<HoodieInstant> instantWithTableSchema = Option.fromJavaOptional(reversedTimelineStream
// If a completion time is specified, find the first eligible instant in the schema evolution timeline.
// Should switch to completion time based.
Expand All @@ -179,7 +176,7 @@ Option<Pair<HoodieInstant, Schema>> getLastCommitMetadataWithValidSchemaFromTime
String schemaStr = metadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY);
boolean isValidSchemaStr = !StringUtils.isNullOrEmpty(schemaStr);
if (isValidSchemaStr) {
tableSchemaAtInstant.putIfAbsent(s, new Schema.Parser().parse(schemaStr));
tableSchemaAtInstant.putIfAbsent(s, HoodieSchema.parse(schemaStr));
}
return isValidSchemaStr;
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,14 @@
import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.PublicAPIMethod;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieSchemaEvolutionConflictException;
import org.apache.hudi.exception.HoodieWriteConflictException;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;

/**
* Strategy interface for schema conflict resolution with multiple writers.
* Users can provide pluggable implementations for different kinds of strategies to resolve conflicts when multiple
Expand All @@ -50,14 +49,14 @@ public interface SchemaConflictResolutionStrategy {
* @throws HoodieWriteConflictException if schema conflicts cannot be resolved.
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
Option<Schema> resolveConcurrentSchemaEvolution(
Option<HoodieSchema> resolveConcurrentSchemaEvolution(
HoodieTable table,
HoodieWriteConfig config,
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
Option<HoodieInstant> currTxnOwnerInstant);

static void throwConcurrentSchemaEvolutionException(
Option<Schema> tableSchemaAtTxnStart, Option<Schema> tableSchemaAtTxnValidation, Schema writerSchemaOfTxn,
Option<HoodieSchema> tableSchemaAtTxnStart, Option<HoodieSchema> tableSchemaAtTxnValidation, HoodieSchema writerSchemaOfTxn,
Option<HoodieInstant> lastCompletedTxnOwnerInstant,
Option<HoodieInstant> currTxnOwnerInstant) throws HoodieWriteConflictException {
String errMsg = String.format(
Expand Down
Loading