Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.log.HoodieLogFormat;
Expand Down Expand Up @@ -217,13 +218,13 @@ storage, new StoragePath(logFilePathPattern)).stream()
checkArgument(logFilePaths.size() > 0, "There is no log file");

// TODO : readerSchema can change across blocks/log files, fix this inside Scanner
Schema readerSchema = null;
HoodieSchema readerSchema = null;
// get schema from last log file
for (int i = logFilePaths.size() - 1; i >= 0; i--) {
Schema schema = TableSchemaResolver.readSchemaFromLogFile(
storage, new StoragePath(logFilePaths.get(i)));
if (schema != null) {
readerSchema = schema;
readerSchema = HoodieSchema.fromAvroSchema(schema);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,8 +367,8 @@ private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
.withHoodieTableMetaClient(metaClient)
.withLatestCommitTime(instantTime.get())
.withFileSlice(fileSlice)
.withDataSchema(dataSchema.toAvroSchema())
.withRequestedSchema(dataSchema.toAvroSchema())
.withDataSchema(dataSchema)
.withRequestedSchema(dataSchema)
.withInternalSchema(internalSchemaOption)
.withProps(metaClient.getTableConfig().getProps())
.withEnableOptimizedLogBlockScan(config.enableOptimizedLogBlocksScan())
Expand Down Expand Up @@ -555,7 +555,7 @@ public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdatesAndDeletio
readerContext.getMergeMode(),
false,
readerContext.getRecordMerger(),
writerSchema.toAvroSchema(),
writerSchema,
Option.ofNullable(Pair.of(hoodieTable.getMetaClient().getTableConfig().getPayloadClass(), hoodieTable.getConfig().getPayloadClass())),
properties,
hoodieTable.getMetaClient().getTableConfig().getPartialUpdateMode());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ public void doAppend() {
config.getBasePath(), operation.getPartitionPath()), logFileName)));
// Initializes the record iterator, log compaction requires writing the deletes into the delete block of the resulting log file.
try (HoodieFileGroupReader<T> fileGroupReader = HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
.withLatestCommitTime(instantTime).withPartitionPath(partitionPath).withLogFiles(logFiles).withBaseFileOption(Option.empty()).withDataSchema(writeSchemaWithMetaFields.toAvroSchema())
.withRequestedSchema(writeSchemaWithMetaFields.toAvroSchema()).withInternalSchema(internalSchemaOption).withProps(props).withEmitDelete(true)
.withLatestCommitTime(instantTime).withPartitionPath(partitionPath).withLogFiles(logFiles).withBaseFileOption(Option.empty()).withDataSchema(writeSchemaWithMetaFields)
.withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props).withEmitDelete(true)
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
// instead of using config.enableOptimizedLogBlocksScan(), we set to true as log compaction blocks only supported in scanV2
.withEnableOptimizedLogBlockScan(true).build()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ private HoodieFileGroupReader<T> getFileGroupReader(boolean usePosition, Option<
Option<Stream<HoodieLogFile>> logFileStreamOpt, Iterator<HoodieRecord<T>> incomingRecordsItr) {
HoodieFileGroupReader.Builder<T> fileGroupBuilder = HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge))
.withDataSchema(writeSchemaWithMetaFields.toAvroSchema()).withRequestedSchema(writeSchemaWithMetaFields.toAvroSchema())
.withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields)
.withInternalSchema(internalSchemaOption).withProps(props)
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
.withFileGroupUpdateCallback(createCallback());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

package org.apache.hudi.metadata;

import org.apache.hudi.avro.AvroSchemaCache;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
Expand Down Expand Up @@ -53,6 +51,8 @@
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCache;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
Expand Down Expand Up @@ -89,7 +89,6 @@
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.util.Lazy;

import org.apache.avro.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -115,8 +114,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.hudi.avro.HoodieAvroUtils.getRecordKeySchema;
import static org.apache.hudi.common.config.HoodieMetadataConfig.DEFAULT_METADATA_POPULATE_META_FIELDS;
import static org.apache.hudi.common.schema.HoodieSchemaUtils.getRecordKeySchema;
import static org.apache.hudi.common.table.HoodieTableConfig.TIMELINE_HISTORY_PATH;
import static org.apache.hudi.common.table.timeline.HoodieInstant.State.REQUESTED;
import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
Expand Down Expand Up @@ -872,9 +871,9 @@ private static <T> HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot(
final FileSlice fileSlice = partitionAndFileSlice.getValue();
final String fileId = fileSlice.getFileId();
HoodieReaderContext<T> readerContext = readerContextFactory.getContext();
Schema dataSchema = AvroSchemaCache.intern(HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(dataWriteConfig.getWriteSchema()), dataWriteConfig.allowOperationMetadataField()));
Schema requestedSchema = metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema()
: HoodieAvroUtils.projectSchema(dataSchema, Arrays.asList(metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0])));
HoodieSchema dataSchema = HoodieSchemaCache.intern(HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(dataWriteConfig.getWriteSchema()), dataWriteConfig.allowOperationMetadataField()));
HoodieSchema requestedSchema = metaClient.getTableConfig().populateMetaFields() ? getRecordKeySchema()
: HoodieSchemaUtils.projectSchema(dataSchema, Arrays.asList(metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0])));
Comment on lines +874 to +876
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To follow up in a separate PR: We should start adding more methods in HoodieSchema so we can avoid nested method calls (i.e., AUtil.method1(BUtil.method2(C.method3(x), y), z) which can be improved by x.method(y, z)) that reduce readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea was to have this be an easy replacement by copying over the methods to similarly named classes. I agree we can avoid these utils in the future though.

Option<InternalSchema> internalSchemaOption = SerDeHelper.fromJson(dataWriteConfig.getInternalSchema());
HoodieFileGroupReader<T> fileGroupReader = HoodieFileGroupReader.<T>newBuilder()
.withReaderContext(readerContext)
Expand All @@ -890,7 +889,7 @@ private static <T> HoodieData<HoodieRecord> readRecordKeysFromFileSliceSnapshot(
.build();
String baseFileInstantTime = fileSlice.getBaseInstantTime();
return new CloseableMappingIterator<>(fileGroupReader.getClosableIterator(), record -> {
String recordKey = readerContext.getRecordContext().getRecordKey(record, HoodieSchema.fromAvroSchema(requestedSchema));
String recordKey = readerContext.getRecordContext().getRecordKey(record, requestedSchema);
return HoodieMetadataPayload.createRecordIndexUpdate(recordKey, partition, fileId,
baseFileInstantTime, 0);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.metadata;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.data.HoodieData;
Expand All @@ -35,6 +34,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
Expand Down Expand Up @@ -288,14 +288,14 @@ private static <T> ClosableIterator<Pair<String, String>> createSecondaryIndexRe
TypedProperties props,
boolean allowInflightInstants) throws IOException {
String secondaryKeyField = indexDefinition.getSourceFieldsKey();
Schema requestedSchema = getRequestedSchemaForSecondaryIndex(metaClient, tableSchema, secondaryKeyField);
HoodieSchema requestedSchema = getRequestedSchemaForSecondaryIndex(metaClient, tableSchema, secondaryKeyField);
HoodieFileGroupReader<T> fileGroupReader = HoodieFileGroupReader.<T>newBuilder()
.withReaderContext(readerContext)
.withFileSlice(fileSlice)
.withHoodieTableMetaClient(metaClient)
.withProps(props)
.withLatestCommitTime(instantTime)
.withDataSchema(tableSchema.toAvroSchema())
.withDataSchema(tableSchema)
.withRequestedSchema(requestedSchema)
.withAllowInflightInstants(allowInflightInstants)
.build();
Expand All @@ -318,9 +318,9 @@ public boolean hasNext() {

while (recordIterator.hasNext()) {
T record = recordIterator.next();
Object secondaryKey = readerContext.getRecordContext().getValue(record, HoodieSchema.fromAvroSchema(requestedSchema), secondaryKeyField);
Object secondaryKey = readerContext.getRecordContext().getValue(record, requestedSchema, secondaryKeyField);
nextValidRecord = Pair.of(
readerContext.getRecordContext().getRecordKey(record, HoodieSchema.fromAvroSchema(requestedSchema)),
readerContext.getRecordContext().getRecordKey(record, requestedSchema),
secondaryKey == null ? null : secondaryKey.toString()
);
return true;
Expand All @@ -342,15 +342,15 @@ public Pair<String, String> next() {
};
}

private static Schema getRequestedSchemaForSecondaryIndex(HoodieTableMetaClient metaClient, HoodieSchema tableSchema, String secondaryKeyField) {
private static HoodieSchema getRequestedSchemaForSecondaryIndex(HoodieTableMetaClient metaClient, HoodieSchema tableSchema, String secondaryKeyField) {
String[] recordKeyFields;
if (tableSchema.getField(RECORD_KEY_METADATA_FIELD) != null) {
if (tableSchema.getField(RECORD_KEY_METADATA_FIELD).isPresent()) {
recordKeyFields = new String[] {RECORD_KEY_METADATA_FIELD};
} else {
recordKeyFields = metaClient.getTableConfig().getRecordKeyFields().orElse(new String[0]);
}
String[] projectionFields = Arrays.copyOf(recordKeyFields, recordKeyFields.length + 1);
projectionFields[recordKeyFields.length] = secondaryKeyField;
return HoodieAvroUtils.projectSchema(tableSchema.toAvroSchema(), Arrays.asList(projectionFields));
return HoodieSchemaUtils.projectSchema(tableSchema, Arrays.asList(projectionFields));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.table.action.cluster.strategy;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand All @@ -31,6 +30,8 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.read.HoodieFileGroupReader;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -67,14 +68,14 @@ public abstract class ClusteringExecutionStrategy<T, I, K, O> implements Seriali
private final transient HoodieEngineContext engineContext;
protected HoodieWriteConfig writeConfig;
protected final HoodieRecordType recordType;
protected final Schema readerSchemaWithMetaFields;
protected final HoodieSchema readerSchemaWithMetaFields;

public ClusteringExecutionStrategy(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig writeConfig) {
this.writeConfig = writeConfig;
this.hoodieTable = table;
this.engineContext = engineContext;
this.recordType = table.getConfig().getRecordMerger().getRecordType();
this.readerSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(writeConfig.getSchema()));
this.readerSchemaWithMetaFields = HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(writeConfig.getSchema()));
}

/**
Expand Down Expand Up @@ -146,7 +147,7 @@ protected FileSlice clusteringOperationToFileSlice(String basePath, ClusteringOp
return fileSlice;
}

protected static <R> HoodieFileGroupReader<R> getFileGroupReader(HoodieTableMetaClient metaClient, FileSlice fileSlice, Schema readerSchema, Option<InternalSchema> internalSchemaOption,
protected static <R> HoodieFileGroupReader<R> getFileGroupReader(HoodieTableMetaClient metaClient, FileSlice fileSlice, HoodieSchema readerSchema, Option<InternalSchema> internalSchemaOption,
ReaderContextFactory<R> readerContextFactory, String instantTime,
TypedProperties properties, boolean usePosition, boolean enableLogBlocksScan) {
HoodieReaderContext<R> readerContext = readerContextFactory.getContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.table.action.commit;

import org.apache.hudi.avro.AvroSchemaCache;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.engine.HoodieReaderContext;
Expand All @@ -28,6 +27,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaCache;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.BufferedRecordMerger;
Expand All @@ -44,8 +44,6 @@
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;

import org.apache.avro.Schema;

import java.io.IOException;
import java.util.List;

Expand Down Expand Up @@ -109,13 +107,13 @@ public I deduplicateRecords(I records, HoodieTable<T, I, K, O> table, int parall
HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
readerContext.initRecordMergerForIngestion(table.getConfig().getProps());
List<String> orderingFieldNames = HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), table.getMetaClient());
Schema recordSchema;
HoodieSchema recordSchema;
if (StringUtils.nonEmpty(table.getConfig().getPartialUpdateSchema())) {
recordSchema = new Schema.Parser().parse(table.getConfig().getPartialUpdateSchema());
recordSchema = HoodieSchema.parse(table.getConfig().getPartialUpdateSchema());
} else {
recordSchema = new Schema.Parser().parse(table.getConfig().getWriteSchema());
recordSchema = HoodieSchema.parse(table.getConfig().getWriteSchema());
}
recordSchema = AvroSchemaCache.intern(recordSchema);
recordSchema = HoodieSchemaCache.intern(recordSchema);
TypedProperties mergedProperties = readerContext.getMergeProps(table.getConfig().getProps());
BufferedRecordMerger<T> bufferedRecordMerger = BufferedRecordMergerFactory.create(
readerContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.hudi.common.model.IOType;
import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
Expand Down Expand Up @@ -94,7 +95,6 @@
import org.apache.hudi.table.upgrade.UpgradeDowngrade;
import org.apache.hudi.testutils.MetadataMergeWriteStatus;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -573,7 +573,7 @@ protected List<HoodieRecord<IndexedRecord>> dedupForCopyOnWriteStorage(HoodieWri
false,
Option.ofNullable(writeClient.getConfig().getRecordMerger()),
Option.ofNullable(writeClient.getConfig().getPayloadClass()),
new Schema.Parser().parse(writeClient.getConfig().getSchema()),
HoodieSchema.parse(writeClient.getConfig().getSchema()),
writeClient.getConfig().getProps(),
metaClient.getTableConfig().getPartialUpdateMode());
HoodieData<HoodieRecord<IndexedRecord>> dedupedRecsRdd =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ public UnaryOperator<RowData> projectRecord(HoodieSchema from, HoodieSchema to,
return rowProjection::project;
}

public void initOrderingValueConverter(Schema dataSchema, List<String> orderingFieldNames) {
public void initOrderingValueConverter(HoodieSchema dataSchema, List<String> orderingFieldNames) {
this.orderingValueConverter = OrderingValueEngineTypeConverter.create(dataSchema, orderingFieldNames, utcTimezone);
}
}
Loading
Loading