Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
Expand Down Expand Up @@ -116,7 +117,7 @@ public String showArchivedCommits(
for (StoragePathInfo pathInfo : pathInfoList) {
// read the archived file
try (Reader reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(pathInfo.getPath()),
HoodieArchivedMetaEntry.getClassSchema())) {
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {
List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks
while (reader.hasNext()) {
Expand Down Expand Up @@ -190,7 +191,7 @@ public String showCommits(
for (StoragePathInfo pathInfo : pathInfoList) {
// read the archived file
try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(HoodieStorageUtils.getStorage(basePath, HoodieCLI.conf),
new HoodieLogFile(pathInfo.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
new HoodieLogFile(pathInfo.getPath()), HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {
List<IndexedRecord> readRecords = new ArrayList<>();
// read the avro blocks
while (reader.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.HoodieLogFormat.Reader;
Expand Down Expand Up @@ -130,7 +131,7 @@ private int copyArchivedInstants(List<StoragePathInfo> pathInfoList,

for (StoragePathInfo pathInfo : pathInfoList) {
// read the archived file
try (Reader reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(pathInfo.getPath()), HoodieArchivedMetaEntry.getClassSchema())) {
try (Reader reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(pathInfo.getPath()), HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()))) {

// read the avro blocks
while (reader.hasNext() && copyCount++ < limit) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ storage, new StoragePath(logFilePathPattern)).stream()
} else {
fileName = path.getName();
}
Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, path);
HoodieSchema writerSchema = HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage, path));
try (Reader reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(path), writerSchema)) {

// read the avro blocks
Expand Down Expand Up @@ -265,7 +265,7 @@ storage, new StoragePath(logFilePathPattern)).stream()
Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(
client.getStorage(), new StoragePath(logFile));
try (HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(storage, new HoodieLogFile(new StoragePath(logFile)), writerSchema)) {
HoodieLogFormat.newReader(storage, new HoodieLogFile(new StoragePath(logFile)), HoodieSchema.fromAvroSchema(writerSchema))) {
// read the avro blocks
while (reader.hasNext()) {
HoodieLogBlock n = reader.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withStorage(storage)
.withBasePath(tablePath)
.withLogFilePaths(logFilePaths)
.withReaderSchema(schema.toAvroSchema())
.withReaderSchema(schema)
.withLatestInstantTime(INSTANT_TIME)
.withMaxMemorySizeInBytes(
HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hudi.client.timeline.ActiveActionWithDetails;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
Expand Down Expand Up @@ -268,7 +269,7 @@ public boolean hasNext() {
reader = HoodieLogFormat.newReader(
metaClient.getStorage(),
new HoodieLogFile(pathInfo.getPath()),
HoodieArchivedMetaEntry.getClassSchema());
HoodieSchema.fromAvroSchema(HoodieArchivedMetaEntry.getClassSchema()));
} catch (IOException ioe) {
throw new HoodieIOException(
"Error initializing the reader for archived log: " + pathInfo.getPath(), ioe);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> log
}

try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage,
new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
new HoodieLogFile(pathInfoList.get(0).getPath()), HoodieSchema.fromAvroSchema(writerSchema))) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
Expand Down Expand Up @@ -2886,7 +2886,7 @@ private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> logFiles) thro
}

try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage,
new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
new HoodieLogFile(pathInfoList.get(0).getPath()), HoodieSchema.fromAvroSchema(writerSchema))) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ public abstract HoodieData<WriteStatus> performClusteringWithRecordsRDD(final Ho
final Map<String, String> extraMetadata);

protected BulkInsertPartitioner<Dataset<Row>> getRowPartitioner(Map<String, String> strategyParams,
Schema schema) {
HoodieSchema schema) {
return getPartitioner(strategyParams, schema, true);
}

protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getRDDPartitioner(Map<String, String> strategyParams,
Schema schema) {
HoodieSchema schema) {
return getPartitioner(strategyParams, schema, false);
}

Expand All @@ -195,7 +195,7 @@ protected BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> getRDDPartitioner(Map<
* @param schema Schema of the data including metadata fields.
*/
private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategyParams,
Schema schema,
HoodieSchema schema,
boolean isRowPartitioner) {
Option<String[]> orderByColumnsOpt =
Option.ofNullable(strategyParams.get(PLAN_STRATEGY_SORT_COLUMNS.key()))
Expand All @@ -209,11 +209,11 @@ private <I> BulkInsertPartitioner<I> getPartitioner(Map<String, String> strategy
return isRowPartitioner
? new RowSpatialCurveSortPartitioner(getWriteConfig())
: new RDDSpatialCurveSortPartitioner((HoodieSparkEngineContext) getEngineContext(), orderByColumns, layoutOptStrategy,
getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieAvroUtils.addMetadataFields(schema), recordType);
getWriteConfig().getLayoutOptimizationCurveBuildMethod(), HoodieSchemaUtils.addMetadataFields(schema), recordType);
case LINEAR:
return isRowPartitioner
? new RowCustomColumnsSortPartitioner(orderByColumns, getWriteConfig())
: new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieAvroUtils.addMetadataFields(schema), getWriteConfig());
: new RDDCustomColumnsSortPartitioner(orderByColumns, HoodieSchemaUtils.addMetadataFields(schema), getWriteConfig());
default:
throw new UnsupportedOperationException(String.format("Layout optimization strategy '%s' is not supported", layoutOptStrategy));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> in
// Since clustering will write to single file group using HoodieUnboundedCreateHandle, set max file size to a large value.
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE));

BulkInsertPartitioner<Dataset<Row>> partitioner = getRowPartitioner(strategyParams, schema.toAvroSchema());
BulkInsertPartitioner<Dataset<Row>> partitioner = getRowPartitioner(strategyParams, schema);
Dataset<Row> repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups);

return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, getHoodieTable(), newConfig,
Expand Down Expand Up @@ -107,6 +107,7 @@ public HoodieData<WriteStatus> performClusteringWithRecordsRDD(HoodieData<Hoodie
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(Long.MAX_VALUE));

return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(), newConfig,
false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), shouldPreserveHoodieMetadata));
false, getRDDPartitioner(strategyParams, HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups,
new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), shouldPreserveHoodieMetadata));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public HoodieData<WriteStatus> performClusteringWithRecordsAsRow(Dataset<Row> in

newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));

BulkInsertPartitioner<Dataset<Row>> partitioner = getRowPartitioner(strategyParams, schema.toAvroSchema());
BulkInsertPartitioner<Dataset<Row>> partitioner = getRowPartitioner(strategyParams, schema);
Dataset<Row> repartitionedRecords = partitioner.repartitionRecords(inputRecords, numOutputGroups);

return HoodieDatasetBulkInsertHelper.bulkInsert(repartitionedRecords, instantTime, getHoodieTable(), newConfig,
Expand All @@ -96,6 +96,6 @@ public HoodieData<WriteStatus> performClusteringWithRecordsRDD(final HoodieData<
newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));

return (HoodieData<WriteStatus>) SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, getHoodieTable(),
newConfig, false, getRDDPartitioner(strategyParams, schema), true, numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
newConfig, false, getRDDPartitioner(strategyParams, HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
package org.apache.hudi.execution.bulkinsert;

import org.apache.hudi.SparkAdapterSupport$;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.SortUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.BulkInsertPartitioner;

import org.apache.avro.Schema;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.HoodieUTF8StringFactory;

Expand All @@ -43,22 +42,22 @@ public class RDDCustomColumnsSortPartitioner<T>
implements BulkInsertPartitioner<JavaRDD<HoodieRecord<T>>> {

private final String[] sortColumnNames;
private final SerializableSchema serializableSchema;
private final HoodieSchema schema;
private final boolean consistentLogicalTimestampEnabled;
private final boolean suffixRecordKey;
private final HoodieUTF8StringFactory utf8StringFactory =
SparkAdapterSupport$.MODULE$.sparkAdapter().getUTF8StringFactory();

public RDDCustomColumnsSortPartitioner(HoodieWriteConfig config) {
this.serializableSchema = new SerializableSchema(new Schema.Parser().parse(config.getSchema()));
this.schema = HoodieSchema.parse(config.getSchema());
this.sortColumnNames = getSortColumnName(config);
this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled();
this.suffixRecordKey = config.getBoolean(BULKINSERT_SUFFIX_RECORD_KEY_SORT_COLUMNS);
}

public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, HoodieWriteConfig config) {
public RDDCustomColumnsSortPartitioner(String[] columnNames, HoodieSchema schema, HoodieWriteConfig config) {
this.sortColumnNames = columnNames;
this.serializableSchema = new SerializableSchema(schema);
this.schema = schema;
this.consistentLogicalTimestampEnabled = config.isConsistentLogicalTimestampEnabled();
this.suffixRecordKey = config.getBoolean(BULKINSERT_SUFFIX_RECORD_KEY_SORT_COLUMNS);
}
Expand All @@ -67,7 +66,7 @@ public RDDCustomColumnsSortPartitioner(String[] columnNames, Schema schema, Hood
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records,
int outputSparkPartitions) {
return records
.sortBy(record -> SortUtils.getComparableSortColumns(record, sortColumnNames, serializableSchema.get(), suffixRecordKey, consistentLogicalTimestampEnabled,
.sortBy(record -> SortUtils.getComparableSortColumns(record, sortColumnNames, schema, suffixRecordKey, consistentLogicalTimestampEnabled,
utf8StringFactory::wrapArrayOfObjects), true, outputSparkPartitions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,16 @@
import org.apache.hudi.HoodieSparkUtils;
import org.apache.hudi.SparkConversionUtils;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieMetadataField;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieClusteringConfig;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
Expand All @@ -51,26 +50,26 @@ public class RDDSpatialCurveSortPartitioner<T>
extends SpatialCurveSortPartitionerBase<JavaRDD<HoodieRecord<T>>> {

private final transient HoodieSparkEngineContext sparkEngineContext;
private final SerializableSchema schema;
private final HoodieSchema schema;
private final HoodieRecordType recordType;

public RDDSpatialCurveSortPartitioner(HoodieSparkEngineContext sparkEngineContext,
String[] orderByColumns,
HoodieClusteringConfig.LayoutOptimizationStrategy layoutOptStrategy,
HoodieClusteringConfig.SpatialCurveCompositionStrategyType curveCompositionStrategyType,
Schema schema,
HoodieSchema schema,
HoodieRecordType recordType) {
super(orderByColumns, layoutOptStrategy, curveCompositionStrategyType);
this.sparkEngineContext = sparkEngineContext;
this.schema = new SerializableSchema(schema);
this.schema = schema;
this.recordType = recordType;
}

@Override
public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> records, int outputSparkPartitions) {
if (recordType == HoodieRecordType.AVRO) {
JavaRDD<GenericRecord> genericRecordsRDD =
records.map(f -> (GenericRecord) f.toIndexedRecord(schema.get(), new Properties()).get().getData());
records.map(f -> (GenericRecord) f.toIndexedRecord(schema.toAvroSchema(), new Properties()).get().getData());

Dataset<Row> sourceDataset =
AvroConversionUtils.createDataFrame(
Expand All @@ -80,7 +79,7 @@ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> reco
);
Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);

return HoodieSparkUtils.createRdd(sortedDataset, schema.get().getName(), schema.get().getNamespace(), false, Option.empty())
return HoodieSparkUtils.createRdd(sortedDataset, schema.getName(), schema.getNamespace().orElse(null), false, Option.empty())
.toJavaRDD()
.map(record -> {
String key = record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
Expand All @@ -90,7 +89,7 @@ public JavaRDD<HoodieRecord<T>> repartitionRecords(JavaRDD<HoodieRecord<T>> reco
return hoodieRecord;
});
} else if (recordType == HoodieRecordType.SPARK) {
StructType structType = HoodieInternalRowUtils.getCachedSchema(schema.get());
StructType structType = HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema());
Dataset<Row> sourceDataset = SparkConversionUtils.createDataFrame(records.rdd(),
sparkEngineContext.getSqlContext().sparkSession(), structType);
Dataset<Row> sortedDataset = reorder(sourceDataset, outputSparkPartitions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> log
}

try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage,
new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
new HoodieLogFile(pathInfoList.get(0).getPath()), HoodieSchema.fromAvroSchema(writerSchema))) {
while (logFileReader.hasNext()) {
HoodieLogBlock logBlock = logFileReader.next();
if (logBlock instanceof HoodieDataBlock) {
Expand Down
Loading
Loading