Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ public static ClosableIterator<Row> getRowsWithExpressionIndexMetadata(ClosableI
@SuppressWarnings("checkstyle:LineLength")
public static ExpressionIndexComputationMetadata getExpressionIndexRecordsUsingColumnStats(Dataset<Row> dataset, HoodieExpressionIndex<Column, Column> expressionIndex, String columnToIndex,
Option<Function<HoodiePairData<String, HoodieColumnRangeMetadata<Comparable>>, HoodieData<HoodieRecord>>> partitionRecordsFunctionOpt,
HoodieIndexVersion indexVersion) {
HoodieIndexVersion indexVersion, String expressionIndexRangeMetadataStorageLevel) {
// Aggregate col stats related data for the column to index
Dataset<Row> columnRangeMetadataDataset = dataset
.select(columnToIndex, SparkMetadataWriterUtils.getExpressionIndexColumnNames())
Expand Down Expand Up @@ -199,7 +199,7 @@ public static ExpressionIndexComputationMetadata getExpressionIndexRecordsUsingC

if (partitionRecordsFunctionOpt.isPresent()) {
// TODO: HUDI-8848: Allow configurable storage level while computing expression index update
rangeMetadataHoodieJavaRDD.persist("MEMORY_AND_DISK_SER");
rangeMetadataHoodieJavaRDD.persist(expressionIndexRangeMetadataStorageLevel);
}
HoodieData<HoodieRecord> colStatRecords = rangeMetadataHoodieJavaRDD.map(pair ->
createColumnStatsRecords(pair.getKey(), Collections.singletonList(pair.getValue()), false, expressionIndex.getIndexName(),
Expand Down Expand Up @@ -310,7 +310,8 @@ public static ExpressionIndexComputationMetadata getExprIndexRecords(

// Generate expression index records
if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_COLUMN_STATS)) {
return getExpressionIndexRecordsUsingColumnStats(rowDataset, expressionIndex, columnToIndex, partitionRecordsFunctionOpt, indexDefinition.getVersion());
return getExpressionIndexRecordsUsingColumnStats(rowDataset, expressionIndex, columnToIndex, partitionRecordsFunctionOpt, indexDefinition.getVersion(),
dataWriteConfig.getIndexingConfig().getExpressionIndexRangeMetadataStorageLevel());
} else if (indexDefinition.getIndexType().equalsIgnoreCase(PARTITION_NAME_BLOOM_FILTERS)) {
return getExpressionIndexRecordsUsingBloomFilter(
rowDataset, columnToIndex, dataWriteConfig.getStorageConfig(), instantTime, indexDefinition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.time.Instant;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -79,6 +80,14 @@ public class HoodieIndexingConfig extends HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Function to be used for building the expression index.");

public static final ConfigProperty<String> EXPRESSION_INDEX_RANGE_METADATA_STORAGE_LEVEL_VALUE = ConfigProperty
.key("hoodie.expression.index.range.metadata.storage.level")
.defaultValue("MEMORY_AND_DISK_SER")
.markAdvanced()
.sinceVersion("1.2.0")
.withDocumentation("Determine what level of persistence is used to cache range metadata RDDs created to compute expression index. "
+ "Refer to org.apache.spark.storage.StorageLevel for different values");

public static final ConfigProperty<String> INDEX_DEFINITION_CHECKSUM = ConfigProperty
.key("hoodie.table.checksum")
.noDefaultValue()
Expand All @@ -92,6 +101,10 @@ public HoodieIndexingConfig() {
super();
}

public String getExpressionIndexRangeMetadataStorageLevel() {
return getStringOrDefault(EXPRESSION_INDEX_RANGE_METADATA_STORAGE_LEVEL_VALUE).toUpperCase(Locale.ROOT);
}

public static void update(HoodieStorage storage, StoragePath metadataFolder,
Properties updatedProps) {
modify(storage, metadataFolder, updatedProps, ConfigUtils::upsertProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public enum RecordMergeMode {

@EnumFieldDescription("Using event time as the ordering to merge records, i.e., the record "
+ "with the larger event time overwrites the record with the smaller event time on the "
+ "same key, regardless of transaction time. The event time or preCombine field needs "
+ "same key, regardless of transaction time. The event time or ordering fields need "
+ "to be specified by the user.")
EVENT_TIME_ORDERING,

Expand Down
Loading