Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
5dbb271
MDT Test framework without writing data files
Dec 23, 2025
677aa96
MDT Test framework - using filterFileSlics for colstats
Jan 2, 2026
7294abc
MDT Test framework - using createCommitMetadata, tagLocation and file…
Jan 2, 2026
c688bf1
MDT Test framework - using createCommitMetadata, tagLocation and file…
Jan 2, 2026
5ed79d6
MDT Test framework - initializing files partition before commit
Jan 2, 2026
68a9fab
MDT Test framework - bug fixes with files partition
Jan 3, 2026
a8f01d3
MDT Test framework - Added writing colStats to same upsertPreppedReco…
Jan 3, 2026
328065f
MDT Test framework - Added read path changes using filterFileSlices
Jan 5, 2026
3fc0d7c
MDT Test framework - create empty parquet data files from commit meta…
Jan 6, 2026
718056e
MDT Test framework - disable partition stats and reconcile markers
Jan 7, 2026
a49beee
Add spark context to the HoodieMDTStats class
vamsikarnika Jan 7, 2026
8ebdb0a
Modify colsToIndex config to take column names
vamsikarnika Jan 7, 2026
4c2a7e6
Fix Partition field config
vamsikarnika Jan 7, 2026
ef17392
Add md file on how to use the tool
vamsikarnika Jan 7, 2026
24f41a6
Fix usage file
vamsikarnika Jan 7, 2026
6807f9c
Add config for enabling partition stats
vamsikarnika Jan 7, 2026
d68824f
Creating files using engine context
vamsikarnika Jan 7, 2026
fe41155
parallelize the empty parquet file creation based on no of partitions
vamsikarnika Jan 7, 2026
5e05e80
Writes files through multiple commits
vamsikarnika Jan 7, 2026
0bd440f
Revert "fix: Partition stats should be controlled using column stats …
Jan 8, 2026
7f7f8e6
MDT Test framework without writing data files
Dec 23, 2025
0bef6ae
MDT Test framework - using filterFileSlics for colstats
Jan 2, 2026
cf39439
MDT Test framework - using createCommitMetadata, tagLocation and file…
Jan 2, 2026
3569b72
MDT Test framework - using createCommitMetadata, tagLocation and file…
Jan 2, 2026
077667e
MDT Test framework - initializing files partition before commit
Jan 2, 2026
f0c619f
MDT Test framework - bug fixes with files partition
Jan 3, 2026
a87ddce
MDT Test framework - Added writing colStats to same upsertPreppedReco…
Jan 3, 2026
3c37ff2
MDT Test framework - Added read path changes using filterFileSlices
Jan 5, 2026
0a8ace9
MDT Test framework - create empty parquet data files from commit meta…
Jan 6, 2026
145ea3b
MDT Test framework - disable partition stats and reconcile markers
Jan 7, 2026
ed53d79
Add spark context to the HoodieMDTStats class
vamsikarnika Jan 7, 2026
f8556a2
Modify colsToIndex config to take column names
vamsikarnika Jan 7, 2026
3909f3a
Fix Partition field config
vamsikarnika Jan 7, 2026
d6cdd75
Add md file on how to use the tool
vamsikarnika Jan 7, 2026
e4ea970
Fix usage file
vamsikarnika Jan 7, 2026
698e51c
Add config for enabling partition stats
vamsikarnika Jan 7, 2026
9d705bf
Creating files using engine context
vamsikarnika Jan 7, 2026
ac0ee85
parallelize the empty parquet file creation based on no of partitions
vamsikarnika Jan 7, 2026
e711058
Writes files through multiple commits
vamsikarnika Jan 7, 2026
c1ef9d2
MDT Test framework - rename files and disabling partition stats
Jan 8, 2026
6b2a071
MDT Test framework - rename files and disabling partition stats
Jan 8, 2026
8abc18d
Bulk Insert Files & Column stats
vamsikarnika Jan 8, 2026
6295a96
MDT Test framework - removing disabling partition stats code
Jan 8, 2026
3f59e97
MDT Test framework - Setting partition stats as false in table config
Jan 8, 2026
71c8b36
Merge remote-tracking branch 'vamsi/mdt_stats_tool' into ENG-35484
Jan 8, 2026
46270b9
MDT Test framework - delete
Jan 8, 2026
01a919e
MDT Test framework - rename
Jan 8, 2026
7702c4a
MDT Test framework - rebased vamsi changes and bug fixes
Jan 9, 2026
5bd0a84
Fixing write side benchmarking
nsivabalan Jan 9, 2026
45c6d48
MDT Test framework - Refactored input params and added tenantId columns
Jan 9, 2026
19074bc
Add Partition filters to the query
vamsikarnika Jan 9, 2026
83a4344
refactor code
vamsikarnika Jan 9, 2026
7d83cdc
Add benchmarking mode config
vamsikarnika Jan 9, 2026
48845ed
fix default partition filter config value
vamsikarnika Jan 9, 2026
65b2771
Add configs for data filters
vamsikarnika Jan 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2144,7 +2144,10 @@ public boolean isMetadataColumnStatsIndexEnabled() {
* @return {@code true} if the partition stats index is enabled, {@code false} otherwise.
*/
public boolean isPartitionStatsIndexEnabled() {
return isMetadataColumnStatsIndexEnabled();
if (isMetadataColumnStatsIndexEnabled()) {
return isMetadataTableEnabled() && getMetadataConfig().isPartitionStatsIndexEnabled();
}
return false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,8 @@ private boolean initializeFromFilesystem(String dataTableInstantTime, List<Metad
while (iterator.hasNext()) {
MetadataPartitionType partitionType = iterator.next();
if (partitionType == PARTITION_STATS && !dataMetaClient.getTableConfig().isTablePartitioned()) {
// Partition stats index cannot be enabled for a non-partitioned table
LOG.debug("Partition stats index cannot be enabled for a non-partitioned table. Removing from initialization list. Please disable {}",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key());
iterator.remove();
this.enabledPartitionTypes.remove(partitionType);
}
Expand Down Expand Up @@ -487,12 +488,6 @@ private boolean initializeFromFilesystem(String dataTableInstantTime, List<Metad
initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition);
break;
case PARTITION_STATS:
// For PARTITION_STATS, COLUMN_STATS should also be enabled
if (!dataWriteConfig.isMetadataColumnStatsIndexEnabled()) {
LOG.debug("Skipping partition stats initialization as column stats index is not enabled. Please enable {}",
HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
continue;
}
fileGroupCountAndRecordsPair = initializePartitionStatsIndex(lazyLatestMergedPartitionFileSliceList, tableSchema);
initializeFilegroupsAndCommit(partitionType, PARTITION_STATS.getPartitionPath(), fileGroupCountAndRecordsPair, instantTimeForPartition);
break;
Expand Down Expand Up @@ -697,19 +692,19 @@ private Lazy<List<Pair<String, FileSlice>>> getLazyLatestMergedPartitionFileSlic
});
}

void initializeFilegroupsAndCommit(MetadataPartitionType partitionType,
String relativePartitionPath,
Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair,
String instantTimeForPartition) throws IOException {
public void initializeFilegroupsAndCommit(MetadataPartitionType partitionType,
String relativePartitionPath,
Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair,
String instantTimeForPartition) throws IOException {
initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair,
instantTimeForPartition, Collections.emptyList());
}

void initializeFilegroupsAndCommit(MetadataPartitionType partitionType,
String relativePartitionPath,
Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair,
String instantTimeForPartition,
List<String> columnsToIndex) throws IOException {
public void initializeFilegroupsAndCommit(MetadataPartitionType partitionType,
String relativePartitionPath,
Pair<Integer, HoodieData<HoodieRecord>> fileGroupCountAndRecordsPair,
String instantTimeForPartition,
List<String> columnsToIndex) throws IOException {
String partitionTypeName = partitionType.name();
LOG.info("Initializing {} index with {} mappings", partitionTypeName, fileGroupCountAndRecordsPair.getKey());
HoodieTimer partitionInitTimer = HoodieTimer.start();
Expand Down Expand Up @@ -903,6 +898,9 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeFilesPartition(Map<Str
final int totalDataFilesCount = partitionIdToAllFilesMap.values().stream().mapToInt(Map::size).sum();
LOG.info("Committing total {} partitions and {} files to metadata", partitions.size(), totalDataFilesCount);

if (partitions.isEmpty()) {
return Pair.of(fileGroupCount, engineContext.emptyHoodieData());
}
// Record which saves the list of all partitions
HoodieRecord record = HoodieMetadataPayload.createPartitionListRecord(partitions);
HoodieData<HoodieRecord> allPartitionsRecord = engineContext.parallelize(Collections.singletonList(record), 1);
Expand Down Expand Up @@ -938,7 +936,7 @@ String getTimelineHistoryPath() {
return TIMELINE_HISTORY_PATH.defaultValue();
}

private HoodieTableMetaClient initializeMetaClient() throws IOException {
protected HoodieTableMetaClient initializeMetaClient() throws IOException {
HoodieTableMetaClient.newTableBuilder()
.setTableType(HoodieTableType.MERGE_ON_READ)
.setTableName(dataWriteConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX)
Expand Down Expand Up @@ -1047,7 +1045,7 @@ private List<DirectoryInfo> listAllPartitionsFromMDT(String initializationTime,
* File groups will be named as :
* record-index-bucket-0000, .... -> ..., record-index-bucket-0009
*/
private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime,
public void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime,
int fileGroupCount, String relativePartitionPath, Option<String> dataPartitionName) throws IOException {

// Archival of data table has a dependency on compaction(base files) in metadata table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
// import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaUtils;
Expand Down Expand Up @@ -390,9 +391,8 @@ public static Map<String, HoodieData<HoodieRecord>> convertMetadataToRecords(Hoo
dataMetaClient, metadataConfig, recordTypeOpt);
partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), metadataColumnStatsRDD);
}

if (enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath())) {
checkState(MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient),
"Column stats partition must be enabled to generate partition stats. Please enable: " + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key());
// Generate Hoodie Pair data of partition name and list of column range metadata for all the files in that partition
boolean isDeletePartition = commitMetadata.getOperationType().equals(WriteOperationType.DELETE_PARTITION);
final HoodieData<HoodieRecord> partitionStatsRDD = convertMetadataToPartitionStatRecords(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,9 @@ public interface MetadataTableFileGroupIndexParser extends Serializable {

int getFileGroupIndex(String fileID);

default int getFileGroupIndex(String partitionPath, int fileGroupIndexInPartition) {
return fileGroupIndexInPartition;
}

int getNumberOfFileGroups();
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieDuplicateDataFileDetectedException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieInsertException;
Expand Down Expand Up @@ -110,7 +109,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
Expand Down Expand Up @@ -797,64 +795,8 @@ void reconcileAgainstMarkers(HoodieEngineContext context,
boolean consistencyCheckEnabled,
boolean shouldFailOnDuplicateDataFileDetection,
WriteMarkers markers) throws HoodieIOException {
try {
// Reconcile marker and data files with WriteStats so that partially written data-files due to failed
// (but succeeded on retry) tasks are removed.
String basePath = getMetaClient().getBasePath().toString();

if (!markers.doesMarkerDirExist()) {
// can happen if it was an empty write say.
return;
}

// Ignores log file appended for update, since they are already fail-safe.
// but new created log files should be included.
Set<String> invalidDataPaths = getInvalidDataPaths(markers);
Set<String> validDataPaths = stats.stream()
.map(HoodieWriteStat::getPath)
.collect(Collectors.toSet());
Set<String> validCdcDataPaths = stats.stream()
.map(HoodieWriteStat::getCdcStats)
.filter(Objects::nonNull)
.flatMap(cdcStat -> cdcStat.keySet().stream())
.collect(Collectors.toSet());

// Contains list of partially created files. These needs to be cleaned up.
invalidDataPaths.removeAll(validDataPaths);
invalidDataPaths.removeAll(validCdcDataPaths);

if (!invalidDataPaths.isEmpty()) {
if (shouldFailOnDuplicateDataFileDetection) {
throw new HoodieDuplicateDataFileDetectedException("Duplicate data files detected " + invalidDataPaths);
}

log.info("Removing duplicate files created due to task retries before committing. Paths=" + invalidDataPaths);
Map<String, List<Pair<String, String>>> invalidPathsByPartition = invalidDataPaths.stream()
.map(dp ->
Pair.of(new StoragePath(basePath, dp).getParent().toString(),
new StoragePath(basePath, dp).toString()))
.collect(Collectors.groupingBy(Pair::getKey));

// Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS.
// Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit
if (consistencyCheckEnabled) {
// This will either ensure all files to be deleted are present.
waitForAllFiles(context, invalidPathsByPartition, FileVisibility.APPEAR);
}

// Now delete partially written files
context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName());
deleteInvalidFilesByPartitions(context, invalidPathsByPartition);

// Now ensure the deleted files disappear
if (consistencyCheckEnabled) {
// This will either ensure all files to be deleted are absent.
waitForAllFiles(context, invalidPathsByPartition, FileVisibility.DISAPPEAR);
}
}
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
log.warn("Skipping reconcile markers for instant: {}", instantTs);
return;
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.metadata;

import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.collection.Pair;

import java.util.List;
import java.util.Map;

/**
* Test utility class to access protected methods from HoodieBackedTableMetadataWriter.
* This class is in the same package to access protected methods without duplication.
*/
public class MetadataWriterTestUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PavithranRick : do we still need this? or can we remove

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be required now.


/**
* Tag records with location using the metadata writer's tagRecordsWithLocation method.
* This is a wrapper around the protected method to make it accessible from tests.
*
* @param metadataWriter The metadata writer instance
* @param partitionRecordsMap Map of partition path to records
* @param isInitializing Whether this is during initialization
* @return Pair of tagged records and file group IDs
*/
@SuppressWarnings("rawtypes")
public static <I, O> Pair<HoodieData<HoodieRecord>, List<HoodieFileGroupId>> tagRecordsWithLocation(
HoodieBackedTableMetadataWriter<I, O> metadataWriter,
Map<String, HoodieData<HoodieRecord>> partitionRecordsMap,
boolean isInitializing) {
// Access the protected method - this works because we're in the same package
return metadataWriter.tagRecordsWithLocation(partitionRecordsMap, isInitializing);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ public static HoodieTableMetadataWriter create(StorageConfiguration<?> conf, Hoo
this(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp, false);
}

SparkHoodieBackedTableMetadataWriter(StorageConfiguration<?> hadoopConf,
HoodieWriteConfig writeConfig,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
HoodieEngineContext engineContext,
Option<String> inflightInstantTimestamp,
boolean streamingWrites) {
protected SparkHoodieBackedTableMetadataWriter(StorageConfiguration<?> hadoopConf,
HoodieWriteConfig writeConfig,
HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy,
HoodieEngineContext engineContext,
Option<String> inflightInstantTimestamp,
boolean streamingWrites) {
super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp, streamingWrites);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,8 @@ public void testKeepLatestFileVersions() throws Exception {
public void testKeepLatestFileVersionsWithBootstrapFileClean() throws Exception {
HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath(basePath)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false)
.withMetadataIndexPartitionStats(false).build())
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanBootstrapBaseFileEnabled(true)
.withCleanerParallelism(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,21 @@ public final class HoodieMetadataConfig extends HoodieConfig {
.sinceVersion("1.0.1")
.withDocumentation("Options for the expression index, e.g. \"expr='from_unixtime', format='yyyy-MM-dd'\"");

public static final ConfigProperty<Boolean> ENABLE_METADATA_INDEX_PARTITION_STATS = ConfigProperty
.key(METADATA_PREFIX + ".index.partition.stats.enable")
// The defaultValue(false) here is the initial default, but it's overridden later based on
// column stats setting.
.defaultValue(false)
.sinceVersion("1.0.0")
.withDocumentation("Enable aggregating stats for each column at the storage partition level. "
+ "Enabling this can improve query performance by leveraging partition and column stats "
+ "for (partition) filtering. "
+ "Important: The default value for this configuration is dynamically set based on the "
+ "effective value of " + ENABLE_METADATA_INDEX_COLUMN_STATS.key() + ". If column stats "
+ "index is enabled (default for Spark engine), partition stats indexing will also be "
+ "enabled by default. Conversely, if column stats indexing is disabled (default for "
+ "Flink and Java engines), partition stats indexing will also be disabled by default.");

public static final ConfigProperty<Integer> METADATA_INDEX_PARTITION_STATS_FILE_GROUP_COUNT = ConfigProperty
.key(METADATA_PREFIX + ".index.partition.stats.file.group.count")
.defaultValue(1)
Expand Down Expand Up @@ -821,7 +836,7 @@ private Map<String, String> getExpressionIndexOptions(String configValue) {
}

public boolean isPartitionStatsIndexEnabled() {
return getBooleanOrDefault(ENABLE_METADATA_INDEX_COLUMN_STATS);
return getBooleanOrDefault(ENABLE_METADATA_INDEX_PARTITION_STATS);
}

public int getPartitionStatsIndexFileGroupCount() {
Expand Down Expand Up @@ -1124,6 +1139,11 @@ public Builder withExpressionIndexOptions(Map<String, String> options) {
return this;
}

public Builder withMetadataIndexPartitionStats(boolean enable) {
metadataConfig.setValue(ENABLE_METADATA_INDEX_PARTITION_STATS, String.valueOf(enable));
return this;
}

public Builder withMetadataIndexPartitionStatsFileGroupCount(int fileGroupCount) {
metadataConfig.setValue(METADATA_INDEX_PARTITION_STATS_FILE_GROUP_COUNT, String.valueOf(fileGroupCount));
return this;
Expand Down Expand Up @@ -1192,6 +1212,7 @@ public Builder withRepartitionDefaultPartitions(int defaultPartitions) {
public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, getDefaultColStatsEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_PARTITION_STATS, metadataConfig.isColumnStatsIndexEnabled());
metadataConfig.setDefaultValue(SECONDARY_INDEX_ENABLE_PROP, getDefaultSecondaryIndexEnable(engineType));
metadataConfig.setDefaultValue(STREAMING_WRITE_ENABLED, getDefaultForStreamingWriteEnabled(engineType));
// fix me: disable when schema on read is enabled.
Expand Down Expand Up @@ -1248,13 +1269,6 @@ private boolean getDefaultSecondaryIndexEnable(EngineType engineType) {
}
}

/**
* The config is now deprecated. Partition stats are configured using the column stats config itself.
*/
@Deprecated
public static final String ENABLE_METADATA_INDEX_PARTITION_STATS =
METADATA_PREFIX + ".index.partition.stats.enable";

/**
* @deprecated Use {@link #ENABLE} and its methods.
*/
Expand Down
Loading
Loading