diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java index 03b5506d40d7c..3ba880438b512 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestCleansCommand.java @@ -108,7 +108,7 @@ public void init() throws Exception { INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf); Map extraCommitMetadata = - Collections.singletonMap(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA); + Collections.singletonMap(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.TEST_TABLE_SCHEMA); HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf, fileId1, fileId2, Option.empty(), Option.empty(), extraCommitMetadata); } diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestDiffCommand.java b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestDiffCommand.java index 8804d973573ef..bc0ca312f6558 100644 --- a/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestDiffCommand.java +++ b/hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestDiffCommand.java @@ -110,7 +110,7 @@ public void testDiffFile() throws Exception { INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, timestamp), conf); Map extraCommitMetadata = - Collections.singletonMap(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA); + Collections.singletonMap(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.TEST_TABLE_SCHEMA); HoodieTestCommitMetadataGenerator.createCommitFileWithMetadata(tablePath, timestamp, conf, fileId1, fileId2, Option.empty(), Option.empty(), extraCommitMetadata, false); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 047aa169a6440..98d80ac1494b6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -2144,7 +2144,10 @@ public boolean isMetadataColumnStatsIndexEnabled() { * @return {@code true} if the partition stats index is enabled, {@code false} otherwise. */ public boolean isPartitionStatsIndexEnabled() { - return isMetadataColumnStatsIndexEnabled(); + if (isMetadataColumnStatsIndexEnabled()) { + return isMetadataTableEnabled() && getMetadataConfig().isPartitionStatsIndexEnabled(); + } + return false; } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index dd19afd9aa36d..5683751139414 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -440,7 +440,8 @@ private boolean initializeFromFilesystem(String dataTableInstantTime, List>> getLazyLatestMergedPartitionFileSlic }); } - void initializeFilegroupsAndCommit(MetadataPartitionType partitionType, - String relativePartitionPath, - Pair> fileGroupCountAndRecordsPair, - String instantTimeForPartition) throws IOException { + public void initializeFilegroupsAndCommit(MetadataPartitionType partitionType, + String relativePartitionPath, + Pair> fileGroupCountAndRecordsPair, + String instantTimeForPartition) throws IOException { initializeFilegroupsAndCommit(partitionType, relativePartitionPath, fileGroupCountAndRecordsPair, instantTimeForPartition, Collections.emptyList()); } - void initializeFilegroupsAndCommit(MetadataPartitionType partitionType, - String relativePartitionPath, - Pair> fileGroupCountAndRecordsPair, - String instantTimeForPartition, - List columnsToIndex) throws IOException { + public void initializeFilegroupsAndCommit(MetadataPartitionType partitionType, + String relativePartitionPath, + Pair> fileGroupCountAndRecordsPair, + String instantTimeForPartition, + List columnsToIndex) throws IOException { String partitionTypeName = partitionType.name(); LOG.info("Initializing {} index with {} mappings", partitionTypeName, fileGroupCountAndRecordsPair.getKey()); HoodieTimer partitionInitTimer = HoodieTimer.start(); @@ -903,6 +898,9 @@ private Pair> initializeFilesPartition(Map allPartitionsRecord = engineContext.parallelize(Collections.singletonList(record), 1); @@ -938,7 +936,7 @@ String getTimelineHistoryPath() { return TIMELINE_HISTORY_PATH.defaultValue(); } - private HoodieTableMetaClient initializeMetaClient() throws IOException { + protected HoodieTableMetaClient initializeMetaClient() throws IOException { HoodieTableMetaClient.newTableBuilder() .setTableType(HoodieTableType.MERGE_ON_READ) .setTableName(dataWriteConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX) @@ -1047,7 +1045,7 @@ private List listAllPartitionsFromMDT(String initializationTime, * File groups will be named as : * record-index-bucket-0000, .... -> ..., record-index-bucket-0009 */ - private void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime, + public void initializeFileGroups(HoodieTableMetaClient dataMetaClient, MetadataPartitionType metadataPartition, String instantTime, int fileGroupCount, String relativePartitionPath, Option dataPartitionName) throws IOException { // Archival of data table has a dependency on compaction(base files) in metadata table. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java index c9092464558d8..2889265fb45dd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.WriteConcurrencyMode; +// import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.schema.HoodieSchema; import org.apache.hudi.common.schema.HoodieSchemaUtils; @@ -390,9 +391,8 @@ public static Map> convertMetadataToRecords(Hoo dataMetaClient, metadataConfig, recordTypeOpt); partitionToRecordsMap.put(MetadataPartitionType.COLUMN_STATS.getPartitionPath(), metadataColumnStatsRDD); } + if (enabledPartitionTypes.contains(MetadataPartitionType.PARTITION_STATS.getPartitionPath())) { - checkState(MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient), - "Column stats partition must be enabled to generate partition stats. Please enable: " + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key()); // Generate Hoodie Pair data of partition name and list of column range metadata for all the files in that partition boolean isDeletePartition = commitMetadata.getOperationType().equals(WriteOperationType.DELETE_PARTITION); final HoodieData partitionStatsRDD = convertMetadataToPartitionStatRecords( diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataTableFileGroupIndexParser.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataTableFileGroupIndexParser.java index 2e40e2bfce368..fae490e159d14 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataTableFileGroupIndexParser.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/MetadataTableFileGroupIndexParser.java @@ -29,5 +29,9 @@ public interface MetadataTableFileGroupIndexParser extends Serializable { int getFileGroupIndex(String fileID); + default int getFileGroupIndex(String partitionPath, int fileGroupIndexInPartition) { + return fileGroupIndexInPartition; + } + int getNumberOfFileGroups(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index 6f2b32c36a794..bf1a5de7c9f36 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -74,7 +74,6 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieDuplicateDataFileDetectedException; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieInsertException; @@ -110,7 +109,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; import java.util.concurrent.TimeoutException; import java.util.function.Function; @@ -797,64 +795,8 @@ void reconcileAgainstMarkers(HoodieEngineContext context, boolean consistencyCheckEnabled, boolean shouldFailOnDuplicateDataFileDetection, WriteMarkers markers) throws HoodieIOException { - try { - // Reconcile marker and data files with WriteStats so that partially written data-files due to failed - // (but succeeded on retry) tasks are removed. - String basePath = getMetaClient().getBasePath().toString(); - - if (!markers.doesMarkerDirExist()) { - // can happen if it was an empty write say. - return; - } - - // Ignores log file appended for update, since they are already fail-safe. - // but new created log files should be included. - Set invalidDataPaths = getInvalidDataPaths(markers); - Set validDataPaths = stats.stream() - .map(HoodieWriteStat::getPath) - .collect(Collectors.toSet()); - Set validCdcDataPaths = stats.stream() - .map(HoodieWriteStat::getCdcStats) - .filter(Objects::nonNull) - .flatMap(cdcStat -> cdcStat.keySet().stream()) - .collect(Collectors.toSet()); - - // Contains list of partially created files. These needs to be cleaned up. - invalidDataPaths.removeAll(validDataPaths); - invalidDataPaths.removeAll(validCdcDataPaths); - - if (!invalidDataPaths.isEmpty()) { - if (shouldFailOnDuplicateDataFileDetection) { - throw new HoodieDuplicateDataFileDetectedException("Duplicate data files detected " + invalidDataPaths); - } - - log.info("Removing duplicate files created due to task retries before committing. Paths=" + invalidDataPaths); - Map>> invalidPathsByPartition = invalidDataPaths.stream() - .map(dp -> - Pair.of(new StoragePath(basePath, dp).getParent().toString(), - new StoragePath(basePath, dp).toString())) - .collect(Collectors.groupingBy(Pair::getKey)); - - // Ensure all files in delete list is actually present. This is mandatory for an eventually consistent FS. - // Otherwise, we may miss deleting such files. If files are not found even after retries, fail the commit - if (consistencyCheckEnabled) { - // This will either ensure all files to be deleted are present. - waitForAllFiles(context, invalidPathsByPartition, FileVisibility.APPEAR); - } - - // Now delete partially written files - context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName()); - deleteInvalidFilesByPartitions(context, invalidPathsByPartition); - - // Now ensure the deleted files disappear - if (consistencyCheckEnabled) { - // This will either ensure all files to be deleted are absent. - waitForAllFiles(context, invalidPathsByPartition, FileVisibility.DISAPPEAR); - } - } - } catch (IOException ioe) { - throw new HoodieIOException(ioe.getMessage(), ioe); - } + log.warn("Skipping reconcile markers for instant: {}", instantTs); + return; } /** diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestCommitMetadataResolver.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestCommitMetadataResolver.java index 2f217953e69ac..4bc46defc2ff9 100644 --- a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestCommitMetadataResolver.java +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestCommitMetadataResolver.java @@ -153,7 +153,7 @@ public void testReconcileMetadataForMissingFiles() throws IOException { private static Pair> generateCommitMetadata(String instantTime, Map> partitionToFilePaths, HoodieTableMetaClient metaClient, int... versions) { HoodieCommitMetadata metadata = new HoodieCommitMetadata(); - metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA); + metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.TEST_TABLE_SCHEMA); List allLogFiles = new ArrayList<>(); partitionToFilePaths.forEach((partitionPath, fileList) -> fileList.forEach(f -> { HoodieDeltaWriteStat writeStat = new HoodieDeltaWriteStat(); diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/MetadataWriterTestUtils.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/MetadataWriterTestUtils.java new file mode 100644 index 0000000000000..683ec39496050 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/MetadataWriterTestUtils.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.collection.Pair; + +import java.util.List; +import java.util.Map; + +/** + * Test utility class to access protected methods from HoodieBackedTableMetadataWriter. + * This class is in the same package to access protected methods without duplication. + */ +public class MetadataWriterTestUtils { + + /** + * Tag records with location using the metadata writer's tagRecordsWithLocation method. + * This is a wrapper around the protected method to make it accessible from tests. + * + * @param metadataWriter The metadata writer instance + * @param partitionRecordsMap Map of partition path to records + * @param isInitializing Whether this is during initialization + * @return Pair of tagged records and file group IDs + */ + @SuppressWarnings("rawtypes") + public static Pair, List> tagRecordsWithLocation( + HoodieBackedTableMetadataWriter metadataWriter, + Map> partitionRecordsMap, + boolean isInitializing) { + // Access the protected method - this works because we're in the same package + return metadataWriter.tagRecordsWithLocation(partitionRecordsMap, isInitializing); + } +} + diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index dfb295d5d16a3..205b346d0b858 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -118,12 +118,12 @@ public static HoodieTableMetadataWriter create(StorageConfiguration conf, Hoo this(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp, false); } - SparkHoodieBackedTableMetadataWriter(StorageConfiguration hadoopConf, - HoodieWriteConfig writeConfig, - HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, - HoodieEngineContext engineContext, - Option inflightInstantTimestamp, - boolean streamingWrites) { + protected SparkHoodieBackedTableMetadataWriter(StorageConfiguration hadoopConf, + HoodieWriteConfig writeConfig, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + HoodieEngineContext engineContext, + Option inflightInstantTimestamp, + boolean streamingWrites) { super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp, streamingWrites); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index a6c25b7e9096d..e3059da0b3f3c 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -460,7 +460,7 @@ private HoodieInstant commitWithMdt(String instantTime, Map protected static HoodieCommitMetadata generateCommitMetadata( String instantTime, Map> partitionToFilePaths) { HoodieCommitMetadata metadata = new HoodieCommitMetadata(); - metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA); + metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.TEST_TABLE_SCHEMA); partitionToFilePaths.forEach((partitionPath, fileList) -> fileList.forEach(f -> { HoodieWriteStat writeStat = new HoodieWriteStat(); writeStat.setPartitionPath(partitionPath); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java index e572fef122604..169c95e5911ba 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestCleanPlanExecutor.java @@ -341,7 +341,8 @@ public void testKeepLatestFileVersions() throws Exception { public void testKeepLatestFileVersionsWithBootstrapFileClean() throws Exception { HoodieWriteConfig config = HoodieWriteConfig.newBuilder().withPath(basePath) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().withMetadataIndexColumnStats(false) + .withMetadataIndexPartitionStats(false).build()) .withCleanConfig(HoodieCleanConfig.newBuilder() .withCleanBootstrapBaseFileEnabled(true) .withCleanerParallelism(1) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java index aa1a5ee6a9697..e7ed7cd694823 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieCleanerTestBase.java @@ -61,7 +61,7 @@ public class HoodieCleanerTestBase extends HoodieClientTestBase { protected static HoodieCommitMetadata generateCommitMetadata( String instantTime, Map> partitionToFilePaths) { HoodieCommitMetadata metadata = new HoodieCommitMetadata(); - metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA); + metadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.TEST_TABLE_SCHEMA); partitionToFilePaths.forEach((partitionPath, fileList) -> fileList.forEach(f -> { HoodieWriteStat writeStat = new HoodieWriteStat(); writeStat.setPartitionPath(partitionPath); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index ec688f7768d68..5ae532548447e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -445,6 +445,21 @@ public final class HoodieMetadataConfig extends HoodieConfig { .sinceVersion("1.0.1") .withDocumentation("Options for the expression index, e.g. \"expr='from_unixtime', format='yyyy-MM-dd'\""); + public static final ConfigProperty ENABLE_METADATA_INDEX_PARTITION_STATS = ConfigProperty + .key(METADATA_PREFIX + ".index.partition.stats.enable") + // The defaultValue(false) here is the initial default, but it's overridden later based on + // column stats setting. + .defaultValue(false) + .sinceVersion("1.0.0") + .withDocumentation("Enable aggregating stats for each column at the storage partition level. " + + "Enabling this can improve query performance by leveraging partition and column stats " + + "for (partition) filtering. " + + "Important: The default value for this configuration is dynamically set based on the " + + "effective value of " + ENABLE_METADATA_INDEX_COLUMN_STATS.key() + ". If column stats " + + "index is enabled (default for Spark engine), partition stats indexing will also be " + + "enabled by default. Conversely, if column stats indexing is disabled (default for " + + "Flink and Java engines), partition stats indexing will also be disabled by default."); + public static final ConfigProperty METADATA_INDEX_PARTITION_STATS_FILE_GROUP_COUNT = ConfigProperty .key(METADATA_PREFIX + ".index.partition.stats.file.group.count") .defaultValue(1) @@ -821,7 +836,7 @@ private Map getExpressionIndexOptions(String configValue) { } public boolean isPartitionStatsIndexEnabled() { - return getBooleanOrDefault(ENABLE_METADATA_INDEX_COLUMN_STATS); + return getBooleanOrDefault(ENABLE_METADATA_INDEX_PARTITION_STATS); } public int getPartitionStatsIndexFileGroupCount() { @@ -1124,6 +1139,11 @@ public Builder withExpressionIndexOptions(Map options) { return this; } + public Builder withMetadataIndexPartitionStats(boolean enable) { + metadataConfig.setValue(ENABLE_METADATA_INDEX_PARTITION_STATS, String.valueOf(enable)); + return this; + } + public Builder withMetadataIndexPartitionStatsFileGroupCount(int fileGroupCount) { metadataConfig.setValue(METADATA_INDEX_PARTITION_STATS_FILE_GROUP_COUNT, String.valueOf(fileGroupCount)); return this; @@ -1192,6 +1212,7 @@ public Builder withRepartitionDefaultPartitions(int defaultPartitions) { public HoodieMetadataConfig build() { metadataConfig.setDefaultValue(ENABLE, getDefaultMetadataEnable(engineType)); metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, getDefaultColStatsEnable(engineType)); + metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_PARTITION_STATS, metadataConfig.isColumnStatsIndexEnabled()); metadataConfig.setDefaultValue(SECONDARY_INDEX_ENABLE_PROP, getDefaultSecondaryIndexEnable(engineType)); metadataConfig.setDefaultValue(STREAMING_WRITE_ENABLED, getDefaultForStreamingWriteEnabled(engineType)); // fix me: disable when schema on read is enabled. @@ -1248,13 +1269,6 @@ private boolean getDefaultSecondaryIndexEnable(EngineType engineType) { } } - /** - * The config is now deprecated. Partition stats are configured using the column stats config itself. - */ - @Deprecated - public static final String ENABLE_METADATA_INDEX_PARTITION_STATS = - METADATA_PREFIX + ".index.partition.stats.enable"; - /** * @deprecated Use {@link #ENABLE} and its methods. */ diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index eebcdd8c5c97f..99ead4577cbc1 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -19,6 +19,7 @@ package org.apache.hudi.sink; import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.WriteConcurrencyMode; @@ -753,6 +754,7 @@ public void testWriteMultiWriterPartialOverlapping(WriteConcurrencyMode writeCon public void testReuseEmbeddedServer() throws IOException { conf.setString("hoodie.filesystem.view.remote.timeout.secs", "500"); conf.setString("hoodie.metadata.enable","true"); + conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false"); // HUDI-8814 HoodieFlinkWriteClient writeClient = null; HoodieFlinkWriteClient writeClient2 = null; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java index 7515f89043272..ee92a46fe681c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java @@ -19,6 +19,7 @@ package org.apache.hudi.sink; import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.PartialUpdateAvroPayload; import org.apache.hudi.common.model.WriteConcurrencyMode; @@ -96,6 +97,7 @@ public void testNonBlockingConcurrencyControlWithPartialUpdatePayload() throws E // disable schedule compaction in writers conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false); conf.set(FlinkOptions.PRE_COMBINE, true); + conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false"); // HUDI-8814 // start pipeline1 and insert record: [id1,Danny,null,1,par1], suspend the tx commit List dataset1 = Collections.singletonList( @@ -290,6 +292,7 @@ public void testBulkInsertInSequenceWithNonBlockingConcurrencyControl() throws E // disable schedule compaction in writers conf.set(FlinkOptions.COMPACTION_SCHEDULE_ENABLED, false); conf.set(FlinkOptions.PRE_COMBINE, true); + conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false"); Configuration conf1 = conf.clone(); conf1.set(FlinkOptions.OPERATION, "BULK_INSERT"); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java index ae7aaa022df35..83966a408735c 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java @@ -186,6 +186,7 @@ void testFileListingWithPartitionStatsPruning(HoodieTableType tableType) throws conf.set(READ_DATA_SKIPPING_ENABLED, true); conf.set(METADATA_ENABLED, true); conf.set(TABLE_TYPE, tableType.name()); + conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "true"); conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true"); if (tableType == HoodieTableType.MERGE_ON_READ) { // enable CSI for MOR table to collect col stats for delta write stats, diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java index 391daaf1e9e96..beb522116a1eb 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java @@ -362,6 +362,7 @@ void testInputSplitsWithPartitionStatsPruner(HoodieTableType tableType) throws E conf.set(FlinkOptions.READ_AS_STREAMING, true); conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true); conf.set(FlinkOptions.TABLE_TYPE, tableType.name()); + conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "true"); conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true"); if (tableType == HoodieTableType.MERGE_ON_READ) { // enable CSI for MOR table to collect col stats for delta write stats, diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 0c7bbc46ad0bb..21ceaae856812 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -563,6 +563,7 @@ void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean hiveSt .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) .option(FlinkOptions.METADATA_ENABLED, true) .option(FlinkOptions.READ_AS_STREAMING, true) + .option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), true) .option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), false) .option(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true) .option(FlinkOptions.TABLE_TYPE, tableType) @@ -570,6 +571,7 @@ void testReadWithPartitionStatsPruning(HoodieTableType tableType, boolean hiveSt .end(); streamTableEnv.executeSql(hoodieTableDDL); Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "true"); conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true"); conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true); conf.set(FlinkOptions.TABLE_TYPE, tableType.name()); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java index cebc24651ec24..c2af715bb256d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java @@ -393,7 +393,8 @@ private TableOptions defaultTableOptions(String tablePath) { FlinkOptions.COMPACTION_SCHEDULE_ENABLED.key(), false, HoodieWriteConfig.EMBEDDED_TIMELINE_SERVER_REUSE_ENABLED.key(), false, HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true, - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true"); + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false"); } private void checkAnswerEvolved(String... expectedResult) throws Exception { diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java index 4e8713b53bb16..9d0c9803358eb 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableSource.java @@ -179,6 +179,7 @@ void testDataSkippingFilterShouldBeNotNullWhenTableSourceIsCopied() { void testDataSkippingWithPartitionStatsPruning(List filters, List expectedPartitions) throws Exception { final String path = tempFile.getAbsolutePath(); conf = TestConfigurations.getDefaultConf(path); + conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "true"); conf.setString(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key(), "true"); conf.set(FlinkOptions.READ_DATA_SKIPPING_ENABLED, true); TestData.writeData(TestData.DATA_SET_INSERT, conf); diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java index b81585275ac8d..e2c0828e992cc 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java @@ -149,7 +149,7 @@ @Slf4j public class HoodieTestTable implements AutoCloseable { - public static final String PHONY_TABLE_SCHEMA = + public static String TEST_TABLE_SCHEMA = "{\"namespace\": \"org.apache.hudi.avro.model\", \"type\": \"record\", \"name\": \"PhonyRecord\", \"fields\": []}"; private static final Random RANDOM = new Random(); @@ -286,7 +286,7 @@ public HoodieCommitMetadata createCommitMetadata(WriteOperationType operationTyp writeStats.addAll(generateHoodieWriteStatForPartitionLogFiles(testTableState.getPartitionToLogFileInfoMap(commitTime), commitTime, bootstrap)); } Map extraMetadata = singletonMap("test", "test"); - return buildMetadata(writeStats, partitionToReplaceFileIds, Option.of(extraMetadata), operationType, PHONY_TABLE_SCHEMA, action); + return buildMetadata(writeStats, partitionToReplaceFileIds, Option.of(extraMetadata), operationType, TEST_TABLE_SCHEMA, action); } public HoodieTestTable moveInflightCommitToComplete(String instantTime, HoodieCommitMetadata metadata) throws IOException { @@ -657,7 +657,7 @@ private Pair genera .setInputGroups(clusteringGroups).build()); HoodieReplaceCommitMetadata replaceMetadata = new HoodieReplaceCommitMetadata(); - replaceMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.PHONY_TABLE_SCHEMA); + replaceMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY, HoodieTestTable.TEST_TABLE_SCHEMA); replacedFileIds.forEach(replacedFileId -> replaceMetadata.addReplaceFileId(partition, replacedFileId)); replaceMetadata.setOperationType(operationType); if (newFileId.isPresent() && !StringUtils.isNullOrEmpty(newFileId.get())) { @@ -1141,7 +1141,7 @@ public HoodieReplaceCommitMetadata doCluster(String commitTime, Map inserts = makeInsertDf("000", 100); Dataset batch1 = inserts.where(matchCond); Dataset batch2 = inserts.where(nonMatchCond); @@ -189,6 +190,7 @@ public void testBaseFileAndLogFileUpdateMatchesDeleteBlock() { */ @Test public void testBaseFileAndLogFileUpdateMatchesDeleteBlockCompact() { + options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false"); testBaseFileAndLogFileUpdateMatchesHelper(false, true,true, false); } @@ -202,6 +204,7 @@ public void testBaseFileAndLogFileUpdateMatchesDeleteBlockCompact() { */ @Test public void testBaseFileAndLogFileUpdateMatchesAndRollBack() { + options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false"); testBaseFileAndLogFileUpdateMatchesHelper(false, false,false, true); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java index a35348f6e3aa7..5526db0fe1932 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java @@ -347,6 +347,7 @@ public void testTurnOffMetadataIndexAfterEnable() throws Exception { .withMetadataConfig(HoodieMetadataConfig.newBuilder() .withProperties(cfg.getMetadataConfig().getProps()) .withMetadataIndexColumnStats(false) + .withMetadataIndexPartitionStats(false) .build()) .build(); @@ -1150,6 +1151,7 @@ public void testMetadataRollbackDuringInit() throws Exception { // Disable the other two default index for this test because the test orchestrates // the rollback with the assumption of init commits being in certain order .withMetadataIndexColumnStats(false) + .withMetadataIndexPartitionStats(false) .build()) .build(); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestMDTStats.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestMDTStats.java new file mode 100644 index 0000000000000..cabedaa64c05c --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestMDTStats.java @@ -0,0 +1,765 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.functional; + +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteClientTestUtils; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.testutils.InProcessTimeGenerator; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieMetadataWriteUtils; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hudi.metadata.MetadataPartitionType; +import org.apache.hudi.metadata.MetadataWriterTestUtils; +import org.apache.hudi.metadata.SparkMetadataWriterFactory; +import org.apache.hudi.stats.HoodieColumnRangeMetadata; +import org.apache.hudi.stats.ValueMetadata; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.testutils.HoodieSparkClientTestHarness; + +import org.apache.hudi.common.model.FileSlice; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.execution.datasources.NoopCache$; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.stream.Collectors; + +import scala.collection.JavaConverters; + +/** + * Test for Hudi Metadata Table (MDT) column stats index. + * + * This test follows a 5-step process: + * STEP 1: Create a sample table with two columns. Insert 50 rows to initialize + * table schema, metadata table partition, etc. + * STEP 2: Use HoodieTestTable.createCommitMetadata to create commit metadata + * without writing data files. Files are spread evenly across partitions and + * file counts. + * STEP 3: Write the /files partition of metadata table using the same file + * structure from STEP 2. + * STEP 4: Write column stats using existing logic, ensuring file names match + * STEP 3. + * STEP 5: Use HoodieFileIndex.filterFileSlices to query column stats index and + * verify correct file slices are pruned. + */ +public class TestMDTStats extends HoodieSparkClientTestHarness { + + private static final Logger LOG = LoggerFactory.getLogger(TestMDTStats.class); + + // Configuration constants + private static final int NUM_COLUMNS = 2; + private static final int FILE_GROUP_COUNT = 10; + private static final int DEFAULT_NUM_FILES = 1000; // Configurable via system property + private static final int INITIAL_ROW_COUNT = 50; // Rows to insert in STEP 1 + + @BeforeEach + public void setUp() throws Exception { + initSparkContexts("TestMDTStats"); + initPath(); + initHoodieStorage(); + initTestDataGenerator(); + initMetaClient(); + initTimelineService(); + } + + @AfterEach + public void tearDown() throws Exception { + cleanupResources(); + } + + /** + * Main test that follows the 5-step process. + */ + @Test + public void testMDTStatsWithFileSlices() throws Exception { + // Get number of files from system property or use default + int numFiles = Integer.getInteger("hudi.mdt.stats.num.files", DEFAULT_NUM_FILES); + int numPartitions = 1; // Start with 1 partition + + LOG.info("Starting MDT stats test with {} files, {} partitions, {} columns, {} file groups", + numFiles, numPartitions, NUM_COLUMNS, FILE_GROUP_COUNT); + LOG.info("Data table base path: {}", basePath); + String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); + LOG.info("Metadata table base path: {}", metadataTableBasePath); + + // Create data table config with metadata enabled + HoodieWriteConfig dataWriteConfig = getConfig(); + HoodieMetadataConfig metadataConfig = dataWriteConfig.getMetadataConfig(); + HoodieWriteConfig dataConfig = HoodieWriteConfig.newBuilder() + .withProperties(dataWriteConfig.getProps()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .fromProperties(metadataConfig.getProps()) + .enable(true) + .withMetadataIndexColumnStats(true) + .withMetadataIndexColumnStatsFileGroupCount(FILE_GROUP_COUNT) + .withMetadataIndexPartitionStats(false) + .build()) + .build(); + + HoodieTableMetaClient dataMetaClient = HoodieTableMetaClient.builder() + .setBasePath(dataConfig.getBasePath()) + .setConf(context.getStorageConf().newInstance()) + .build(); + + // STEP 1: Insert 50 rows with age and salary columns to initialize table schema + // and metadata table + String tableName = initializeTableWithSampleData(); + dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient); + + // STEP 2: Create commit metadata using HoodieTestTable without writing data + // files + String dataCommitTime = InProcessTimeGenerator.createNewInstantTime(); + List partitions = Collections.singletonList("frisco"); + int filesPerPartition = numFiles / numPartitions; // Evenly distribute files + HoodieTestTable testTable = HoodieTestTable.of(dataMetaClient); + HoodieCommitMetadata commitMetadata = testTable.createCommitMetadata( + dataCommitTime, + WriteOperationType.INSERT, + partitions, + filesPerPartition, + false); // bootstrap + + // Add commit to timeline using HoodieTestTable + testTable.addCommit(dataCommitTime, Option.of(commitMetadata)); + LOG.info("Created commit metadata with {} files per partition at instant {}", filesPerPartition, dataCommitTime); + // Create actual empty parquet files on disk so filesystem listing finds them + createEmptyParquetFiles(dataMetaClient, commitMetadata); + LOG.info("Created {} empty parquet files on disk", numFiles); + dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient); + + HoodieWriteConfig mdtConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig( + dataConfig, + HoodieFailedWritesCleaningPolicy.EAGER, + HoodieTableVersion.NINE); + + // STEP 3: Write both /files and /column_stats partitions to metadata table in a single commit + @SuppressWarnings("rawtypes") + Map>> expectedStats = + writeFilesAndColumnStatsToMetadataTable(dataConfig, dataMetaClient, commitMetadata, dataCommitTime, mdtConfig, NUM_COLUMNS); + + // STEP 4: Print column stats for verification (up to 10 files per partition) + printColumnStatsForVerification(commitMetadata, expectedStats); + + // STEP 5: Use HoodieFileIndex.filterFileSlices to query and verify + queryAndVerifyColumnStats(dataConfig, dataMetaClient, expectedStats, numFiles); + } + + /** + * Print column stats for verification - shows min/max values for up to 10 files per partition. + * This helps verify that column stats were constructed properly before querying. + * + * @param commitMetadata The commit metadata containing partition and file information + * @param expectedStats The expected column stats map (file name -> column name -> stats) + */ + @SuppressWarnings("rawtypes") + private void printColumnStatsForVerification( + HoodieCommitMetadata commitMetadata, + Map>> expectedStats) { + + LOG.info("=== STEP 4: Verifying column stats construction (max 10 files per partition) ==="); + + Map> partitionToWriteStats = commitMetadata.getPartitionToWriteStats(); + + for (Map.Entry> entry : partitionToWriteStats.entrySet()) { + String partitionPath = entry.getKey(); + List writeStats = entry.getValue(); + + LOG.info(""); + LOG.info("Partition: {} ({} files total)", partitionPath, writeStats.size()); + LOG.info(String.format("%-50s %-15s %-15s %-15s %-15s", + "FileName", "age_min", "age_max", "salary_min", "salary_max")); + LOG.info(String.join("", Collections.nCopies(110, "-"))); + + int filesDisplayed = 0; + for (HoodieWriteStat writeStat : writeStats) { + if (filesDisplayed >= 10) { + LOG.info("... and {} more files", writeStats.size() - 10); + break; + } + + String filePath = writeStat.getPath(); + String fileName = new StoragePath(filePath).getName(); + + Map> fileStats = expectedStats.get(fileName); + if (fileStats != null) { + HoodieColumnRangeMetadata ageStats = fileStats.get("age"); + HoodieColumnRangeMetadata salaryStats = fileStats.get("salary"); + + String ageMin = (ageStats != null) ? String.valueOf(ageStats.getMinValue()) : "N/A"; + String ageMax = (ageStats != null) ? String.valueOf(ageStats.getMaxValue()) : "N/A"; + String salaryMin = (salaryStats != null) ? String.valueOf(salaryStats.getMinValue()) : "N/A"; + String salaryMax = (salaryStats != null) ? String.valueOf(salaryStats.getMaxValue()) : "N/A"; + + LOG.info(String.format("%-50s %-15s %-15s %-15s %-15s", + fileName.length() > 48 ? fileName.substring(0, 48) + ".." : fileName, + ageMin, ageMax, salaryMin, salaryMax)); + } else { + LOG.info(String.format("%-50s %-15s", fileName, "NO STATS FOUND")); + } + + filesDisplayed++; + } + } + + LOG.info(""); + LOG.info("Total files with stats: {}", expectedStats.size()); + } + + /** + * Query the column stats index using HoodieFileIndex.filterFileSlices and verify results. + * + * @param dataConfig The write config for the data table + * @param dataMetaClient The meta client for the data table + * @param expectedStats The expected column stats for verification + * @param numFiles The total number of files in the commit + */ + @SuppressWarnings("rawtypes") + private void queryAndVerifyColumnStats( + HoodieWriteConfig dataConfig, + HoodieTableMetaClient dataMetaClient, + Map>> expectedStats, + int numFiles) throws Exception { + + LOG.info("=== STEP 5: Querying column stats index using HoodieFileIndex ==="); + dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient); + + // Create HoodieFileIndex + Map options = new HashMap<>(); + options.put("path", dataConfig.getBasePath()); + options.put("hoodie.datasource.read.data.skipping.enable", "true"); + options.put("hoodie.metadata.enable", "true"); + options.put("hoodie.metadata.index.column.stats.enable", "true"); + options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false"); + // Also ensure the columns are specified for column stats + options.put("hoodie.metadata.index.column.stats.column.list", "age,salary"); + sparkSession.sqlContext().conf().setConfString("hoodie.fileIndex.dataSkippingFailureMode", "strict"); + + // Create schema with the columns used for data skipping + StructType dataSchema = new StructType() + .add("id", "string") + .add("name", "string") + .add("city", "string") + .add("age", "int") + .add("salary", "long"); + scala.Option schemaOption = scala.Option.apply(dataSchema); + + @SuppressWarnings("deprecation") + scala.collection.immutable.Map scalaOptions = JavaConverters.mapAsScalaMap(options) + .toMap(scala.Predef$.MODULE$.>conforms()); + + org.apache.hudi.HoodieFileIndex fileIndex = new org.apache.hudi.HoodieFileIndex( + sparkSession, + dataMetaClient, + schemaOption, + scalaOptions, + NoopCache$.MODULE$, + false, + false); + + // Create data filters for age and salary columns + // Unresolved expressions cause translateIntoColumnStatsIndexFilterExpr to return TrueLiteral (no filtering). + List dataFilters = new ArrayList<>(); + String filterString = "age > 90"; + Expression filter1 = org.apache.spark.sql.HoodieCatalystExpressionUtils$.MODULE$ + .resolveExpr(sparkSession, filterString, dataSchema); + LOG.info("DEBUG: Resolved filter expression: {}", filter1); + LOG.info("DEBUG: Resolved filter resolved: {}", filter1.resolved()); + LOG.info("DEBUG: Resolved filter tree:\n{}", filter1.treeString()); + + dataFilters.add(filter1); + // Expression filter2 = org.apache.spark.sql.HoodieCatalystExpressionUtils.resolveExpr( + // sparkSession, "salary > 100000", dataSchema); + // dataFilters.add(filter2); + + List partitionFilters = new ArrayList<>(); // Empty partition filters + + // Convert to Scala Seq + scala.collection.immutable.List dataFiltersList = JavaConverters.asScalaBuffer(dataFilters) + .toList(); + scala.collection.Seq dataFiltersSeq = dataFiltersList; + scala.collection.immutable.List partitionFiltersList = JavaConverters + .asScalaBuffer(partitionFilters).toList(); + scala.collection.Seq partitionFiltersSeq = partitionFiltersList; + + // Call filterFileSlices + scala.collection.Seq, + scala.collection.Seq>> filteredSlices = fileIndex + .filterFileSlices( + dataFiltersSeq, + partitionFiltersSeq, + false); + + // Print results + LOG.info(""); + LOG.info("Filtered File Slices Min/Max Values:"); + LOG.info(String.format("%-30s %-20s %-20s %-20s %-20s", + "FileName", "age_min", "age_max", "salary_min", "salary_max")); + LOG.info(String.join("", Collections.nCopies(100, "-"))); + + int totalFileSlices = 0; + for (int j = 0; j < filteredSlices.size(); j++) { + scala.Tuple2, + scala.collection.Seq> tuple = filteredSlices.apply(j); + scala.collection.Seq fileSliceSeq = tuple._2(); + totalFileSlices += fileSliceSeq.size(); + + for (int k = 0; k < fileSliceSeq.size(); k++) { + FileSlice fileSlice = fileSliceSeq.apply(k); + String fileName = fileSlice.getBaseFile().get().getFileName(); + + Map> fileExpectedStats = expectedStats.get(fileName); + if (fileExpectedStats != null) { + HoodieColumnRangeMetadata ageStats = fileExpectedStats.get("age"); + HoodieColumnRangeMetadata salaryStats = fileExpectedStats.get("salary"); + + Object ageMin = (ageStats != null) ? ageStats.getMinValue() : "null"; + Object ageMax = (ageStats != null) ? ageStats.getMaxValue() : "null"; + Object salaryMin = (salaryStats != null) ? salaryStats.getMinValue() : "null"; + Object salaryMax = (salaryStats != null) ? salaryStats.getMaxValue() : "null"; + + LOG.info(String.format("%-30s %-20s %-20s %-20s %-20s", + fileName, ageMin.toString(), ageMax.toString(), salaryMin.toString(), + salaryMax.toString())); + } + } + } + + LOG.info(String.join("", Collections.nCopies(100, "-"))); + LOG.info("Total file slices returned: {}", totalFileSlices); + LOG.info("Total files in commit: {}", numFiles); + + if (numFiles > 0) { + double skippingRatio = ((double) (numFiles - totalFileSlices) / numFiles) * 100.0; + LOG.info(String.format("Data skipping ratio: %.2f%%", skippingRatio)); + } + } + + /** + * Write both /files and /column_stats partitions to metadata table in a single commit. + * This method handles initialization of partitions if needed, tags records with location, + * and writes them together to simulate how actual code writes metadata. + * + * @param dataConfig The write config for the data table + * @param dataMetaClient The meta client for the data table + * @param commitMetadata The commit metadata containing file information + * @param dataCommitTime The commit time for the data table commit + * @param mdtWriteConfig The write config for the metadata table + * @param numColumns The number of columns to generate stats for + * @return Map of file names to their column stats metadata for verification + * @throws Exception if there's an error writing to the metadata table + */ + @SuppressWarnings("rawtypes") + private Map>> writeFilesAndColumnStatsToMetadataTable( + HoodieWriteConfig dataConfig, + HoodieTableMetaClient dataMetaClient, + HoodieCommitMetadata commitMetadata, + String dataCommitTime, + HoodieWriteConfig mdtWriteConfig, + int numColumns) throws Exception { + + try (HoodieTableMetadataWriter metadataWriter = SparkMetadataWriterFactory.create( + context.getStorageConf(), + dataConfig, + context, + Option.empty(), + dataMetaClient.getTableConfig())) { + + // STEP 3a: Check if /files partition exists and initialize if needed + String metadataBasePath = HoodieTableMetadata.getMetadataTableBasePath(basePath); + HoodieTableMetaClient metadataMetaClient = HoodieTableMetaClient.builder() + .setBasePath(metadataBasePath) + .setConf(context.getStorageConf().newInstance()) + .build(); + + boolean filesPartitionExists = dataMetaClient.getTableConfig() + .isMetadataPartitionAvailable(MetadataPartitionType.FILES); + + LOG.info("BEFORE initialization - Metadata table exists: {}, partitions: {}", + filesPartitionExists, + metadataMetaClient.getTableConfig().getMetadataPartitions()); + + if (!filesPartitionExists) { + // Mark partition as inflight in table config - this is required for tagRecordsWithLocation + // to work with isInitializing=true + dataMetaClient.getTableConfig().setMetadataPartitionsInflight( + dataMetaClient, MetadataPartitionType.FILES); + LOG.info("Marked /files partition as inflight for initialization"); + } + + // Explicitly disable partition_stats if it exists in table config + boolean partitionStatsExists = dataMetaClient.getTableConfig() + .isMetadataPartitionAvailable(MetadataPartitionType.PARTITION_STATS); + if (partitionStatsExists) { + dataMetaClient.getTableConfig().setMetadataPartitionState( + dataMetaClient, MetadataPartitionType.PARTITION_STATS.getPartitionPath(), false); + LOG.info("Disabled /partition_stats partition in table config"); + } + + // Also mark column_stats partition as inflight for initialization + boolean colStatsPartitionExists = dataMetaClient.getTableConfig() + .isMetadataPartitionAvailable(MetadataPartitionType.COLUMN_STATS); + if (!colStatsPartitionExists) { + dataMetaClient.getTableConfig().setMetadataPartitionsInflight( + dataMetaClient, MetadataPartitionType.COLUMN_STATS); + LOG.info("Marked /column_stats partition as inflight for initialization"); + + // Create index definition for column stats - this tells data skipping which columns are indexed + org.apache.hudi.common.model.HoodieIndexDefinition colStatsIndexDef = + new org.apache.hudi.common.model.HoodieIndexDefinition.Builder() + .withIndexName(MetadataPartitionType.COLUMN_STATS.getPartitionPath()) + .withIndexType("column_stats") + .withSourceFields(java.util.Arrays.asList("age", "salary")) + .build(); + dataMetaClient.buildIndexDefinition(colStatsIndexDef); + LOG.info("Created column stats index definition for columns: age, salary"); + } + + // Convert commit metadata to files partition records + @SuppressWarnings("unchecked") + List> filesRecords = (List>) (List) + HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords(commitMetadata, dataCommitTime); + + // Generate column stats records + @SuppressWarnings("rawtypes") + Map>> expectedStats = new HashMap<>(); + List> columnStatsRecords = generateColumnStatsRecordsForCommitMetadata( + commitMetadata, + numColumns, + expectedStats, + dataCommitTime); + + LOG.info("Generated {} files records and {} column stats records", filesRecords.size(), columnStatsRecords.size()); + + // STEP 3b: Tag records with location for both partitions and write them together + // IMPORTANT: Use dataCommitTime for the metadata table commit to ensure timeline sync. + // HoodieFileIndex expects metadata table commits to match data table commit times. + String mdtCommitTime = dataCommitTime; + try (SparkRDDWriteClient mdtWriteClient = new SparkRDDWriteClient<>(context, mdtWriteConfig)) { + + WriteClientTestUtils.startCommitWithTime(mdtWriteClient, mdtCommitTime); + JavaRDD> filesRDD = jsc.parallelize(filesRecords, 1); + JavaRDD> colStatsRDD = jsc.parallelize(columnStatsRecords, 1); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + org.apache.hudi.metadata.HoodieBackedTableMetadataWriter, JavaRDD> + sparkMetadataWriter + = (org.apache.hudi.metadata.HoodieBackedTableMetadataWriter) metadataWriter; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + Map> partitionRecordsMap = new HashMap<>(); + partitionRecordsMap.put( + HoodieTableMetadataUtil.PARTITION_NAME_FILES, + (org.apache.hudi.common.data.HoodieData) (org.apache.hudi.common.data.HoodieData) HoodieJavaRDD + .of(filesRDD)); + partitionRecordsMap.put( + HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, + (org.apache.hudi.common.data.HoodieData) (org.apache.hudi.common.data.HoodieData) HoodieJavaRDD + .of(colStatsRDD)); + + // Tag records for all partitions together - use isInitializing=true if /files partition was just marked as inflight + @SuppressWarnings("rawtypes") + Pair, List> taggedResult = + MetadataWriterTestUtils.tagRecordsWithLocation( + sparkMetadataWriter, + partitionRecordsMap, + !filesPartitionExists // isInitializing = true if we just marked /files as inflight + ); + + // Check metadata table state after tagging + metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient); + LOG.info("AFTER tagging - Metadata table exists: {}, partitions: {}", + metadataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES), + metadataMetaClient.getTableConfig().getMetadataPartitions()); + + // Convert back to JavaRDD - taggedResult contains all records from all partitions + @SuppressWarnings("unchecked") + JavaRDD> allTaggedRecords = + (JavaRDD>) (JavaRDD) HoodieJavaRDD + .getJavaRDD(taggedResult.getKey()); + + allTaggedRecords.take(3).forEach(r -> + LOG.info("DEBUG: Record key={}, location={}", r.getRecordKey(), r.getCurrentLocation())); + + JavaRDD writeStatuses = mdtWriteClient.upsertPreppedRecords(allTaggedRecords, mdtCommitTime); + List statusList = writeStatuses.collect(); + mdtWriteClient.commit(mdtCommitTime, jsc.parallelize(statusList, 1), Option.empty(), HoodieTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap()); + + // Mark partition as completed if we initialized it + if (!filesPartitionExists) { + dataMetaClient.getTableConfig().setMetadataPartitionState( + dataMetaClient, MetadataPartitionType.FILES.getPartitionPath(), true); + LOG.info("Marked /files partition as completed"); + } + + if (!colStatsPartitionExists) { + dataMetaClient.getTableConfig().setMetadataPartitionState( + dataMetaClient, MetadataPartitionType.COLUMN_STATS.getPartitionPath(), true); + LOG.info("Marked /column_stats partition as completed"); + } + + // Verify final state + dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient); + LOG.info("AFTER commit - Metadata table exists: {}, partitions: {}", + dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES), + dataMetaClient.getTableConfig().getMetadataPartitions()); + + LOG.info("Wrote {} files partition records and {} column stats records to metadata table in a single commit", + filesRecords.size(), columnStatsRecords.size()); + } + return expectedStats; + } + } + + /** + * Initialize Hudi table with sample data to set up the table schema and + * metadata table. + * + * @return the name of the created table + */ + private String initializeTableWithSampleData() { + // Define a schema with 'id', 'age', 'salary' columns + String tableName = "test_mdt_stats_tbl"; + StructType schema = new StructType() + .add("id", "string") + .add("name", "string") + .add("city", "string") + .add("age", "int") + .add("salary", "double"); + + // Generate 50 rows of sample data + List rows = new ArrayList<>(); + for (int i = 0; i < INITIAL_ROW_COUNT; i++) { + rows.add(org.apache.spark.sql.RowFactory.create( + UUID.randomUUID().toString(), + "user_" + i, + "frisco", + 20 + (i % 30), // age: 20..49 + 50000.0 + (i * 1000) // salary varies + )); + } + Dataset df = sparkSession.createDataFrame(rows, schema); + + // Write the data to the Hudi table using spark sql + df.write() + .format("hudi") + .option("hoodie.table.name", tableName) + .option("hoodie.datasource.write.recordkey.field", "id") + .option("hoodie.datasource.write.partitionpath.field", "city") + .option("hoodie.datasource.write.table.name", tableName) + .option("hoodie.datasource.write.operation", "insert") + .option("hoodie.datasource.write.precombine.field", "id") + .option("hoodie.metadata.enabled", "true") + .option("path", basePath) + .mode("overwrite") + .save(); + + // Refresh table metadata in Spark + sparkSession.catalog().clearCache(); + sparkSession.read().format("hudi").load(basePath).createOrReplaceTempView(tableName); + + // print total rows in table + long totalRows = sparkSession.read().format("hudi").load(basePath).count(); + LOG.info("Total rows in table: {}", totalRows); + // print the table first few rows + Dataset tableDF = sparkSession.read().format("hudi").load(basePath); + tableDF.show(10, false); + + return tableName; + } + + /** + * Creates empty parquet files on disk for all files in the commit metadata. + * This ensures that filesystem listing will find these files even if metadata table + * lookup falls back to filesystem. + * + * @param metaClient The meta client for the data table + * @param commitMetadata The commit metadata containing file information + */ + private void createEmptyParquetFiles(HoodieTableMetaClient metaClient, + HoodieCommitMetadata commitMetadata) throws Exception { + org.apache.hudi.storage.HoodieStorage storage = metaClient.getStorage(); + StoragePath basePath = metaClient.getBasePath(); + + for (Map.Entry> entry : + commitMetadata.getPartitionToWriteStats().entrySet()) { + String partitionPath = entry.getKey(); + StoragePath partitionDir = new StoragePath(basePath, partitionPath); + if (!storage.exists(partitionDir)) { + storage.createDirectory(partitionDir); + } + for (HoodieWriteStat stat : entry.getValue()) { + String relativePath = stat.getPath(); + StoragePath filePath = new StoragePath(basePath, relativePath); + if (!storage.exists(filePath)) { + storage.create(filePath).close(); + } + } + } + } + + /** + * Generates column stats records based on commit metadata file structure. + * Ensures file names match those in the commit metadata. + */ + @SuppressWarnings("rawtypes") + private List> generateColumnStatsRecordsForCommitMetadata( + HoodieCommitMetadata commitMetadata, + int numColumns, + Map>> expectedStatsMap, + String commitTime) { + + Random random = new Random(42); + List> allRecords = new ArrayList<>(); + + // Extract file information from commit metadata + Map> partitionToWriteStats = commitMetadata.getPartitionToWriteStats(); + + for (Map.Entry> entry : partitionToWriteStats.entrySet()) { + String partitionPath = entry.getKey(); + List writeStats = entry.getValue(); + + LOG.info("Processing partition {} with {} write stats", partitionPath, writeStats.size()); + + for (HoodieWriteStat writeStat : writeStats) { + String fileId = writeStat.getFileId(); + String filePath = writeStat.getPath(); + String fileName = new StoragePath(filePath).getName(); + + if (allRecords.size() < 5) { + LOG.debug("Processing file: {} (fileId: {}, path: {})", fileName, fileId, filePath); + } + + List> columnRangeMetadata = new ArrayList<>(); + + // Generate stats for age (int) and salary (long) columns + String[] columnNames = { "age", "salary" }; + for (int colIdx = 0; colIdx < numColumns; colIdx++) { + String colName = columnNames[colIdx]; + + Comparable minValue; + Comparable maxValue; + + if (colIdx == 0) { + // age column: values between 20-100 + int minAge = 20 + random.nextInt(30); + int maxAge = minAge + random.nextInt(50); + minValue = minAge; + maxValue = maxAge; + } else { + // salary column: values between 50000-250000 + long minSalary = 50000L + random.nextInt(100000); + long maxSalary = minSalary + random.nextInt(100000); + minValue = minSalary; + maxValue = maxSalary; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + int compareResult = minValue.compareTo(maxValue); + if (compareResult > 0) { + Comparable temp = minValue; + minValue = maxValue; + maxValue = temp; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + HoodieColumnRangeMetadata colStats = HoodieColumnRangeMetadata.create( + fileName, + colName, + minValue, + maxValue, + 0, + 1000, + 123456, + 123456, + ValueMetadata.V1EmptyMetadata.get()); + + columnRangeMetadata.add(colStats); + expectedStatsMap.computeIfAbsent(fileName, k -> new HashMap<>()).put(colName, colStats); + } + + @SuppressWarnings("unchecked") + List> fileRecords = HoodieMetadataPayload + .createColumnStatsRecords(partitionPath, columnRangeMetadata, false) + .map(record -> (HoodieRecord) record) + .collect(Collectors.toList()); + + allRecords.addAll(fileRecords); + } + } + + LOG.info("Generated {} column stats records total for {} unique files", + allRecords.size(), expectedStatsMap.size()); + if (expectedStatsMap.size() <= 10) { + LOG.info("File names in expectedStatsMap: {}", expectedStatsMap.keySet()); + } else { + LOG.info("First 10 file names: {}", + expectedStatsMap.keySet().stream().limit(10).collect(Collectors.toList())); + } + + // Log sample record keys and file names to verify they're unique + if (allRecords.size() > 0) { + LOG.info("Sample record keys (first 5):"); + for (int i = 0; i < Math.min(5, allRecords.size()); i++) { + HoodieRecord record = allRecords.get(i); + HoodieMetadataPayload payload = record.getData(); + if (payload.getColumnStatMetadata().isPresent()) { + String fileName = payload.getColumnStatMetadata().get().getFileName(); + String columnName = payload.getColumnStatMetadata().get().getColumnName(); + LOG.info(" Record {}: key={}, fileName={}, columnName={}", + i, record.getRecordKey(), fileName, columnName); + } + } + } + + return allRecords; + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java index 9dae84c22ff4f..12ca0b8ade8e0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/TestMetadataWriterCommit.java @@ -129,7 +129,7 @@ public void testCreateHandleRLIStats() throws IOException { assertEquals(10, mdtCommitMetadata.getPartitionToWriteStats().get(RECORD_INDEX.getPartitionPath()).size()); assertFalse(mdtCommitMetadata.getPartitionToWriteStats().containsKey(COLUMN_STATS.getPartitionPath())); - // Create commit in MDT with col stats enabled (partition stats is enabled with column stats) + // Create commit in MDT with col stats enabled config.getMetadataConfig().setValue(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS, "true"); instantTime = InProcessTimeGenerator.createNewInstantTime(); mdtWriter = (HoodieBackedTableMetadataWriter) SparkMetadataWriterFactory.createWithStreamingWrites(storageConf, config, @@ -138,8 +138,8 @@ public void testCreateHandleRLIStats() throws IOException { mdtWriteStatus = mdtWriter.streamWriteToMetadataPartitions(HoodieJavaRDD.of(Collections.singletonList(writeStatus), context, 1), instantTime); mdtWriteStats = mdtWriteStatus.collectAsList().stream().map(WriteStatus::getStat).collect(Collectors.toList()); mdtWriter.completeStreamingCommit(instantTime, context, mdtWriteStats, commitMetadata); - // 3 bootstrap commits for 4 enabled partitions, 2 commits due to update - assertEquals(6, mdtMetaClient.reloadActiveTimeline().filterCompletedInstants().countInstants()); + // 3 bootstrap commits for 3 enabled partitions, 2 commits due to update + assertEquals(5, mdtMetaClient.reloadActiveTimeline().filterCompletedInstants().countInstants()); // Verify commit metadata mdtCommitMetadata = mdtMetaClient.getActiveTimeline().readCommitMetadata(mdtMetaClient.getActiveTimeline().lastInstant().get()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala index 561e829dbe35b..cc4ecd1f08b92 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestHoodieSparkSqlWriter.scala @@ -896,7 +896,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = { spark, inputDf.schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(inputDf.schema, "record", ""), - HoodieMetadataConfig.newBuilder().enable(true).build(), + HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build(), metaClient) val partitionStats = partitionStatsIndex.loadColumnStatsIndexRecords(List("partition", "ts"), shouldReadInMemory = true).collectAsList() partitionStats.forEach(stat => { diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala index 81fd89ec6d41a..f763f69ce54ff 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/PartitionStatsIndexTestBase.scala @@ -43,6 +43,7 @@ class PartitionStatsIndexTestBase extends HoodieStatsIndexTestBase { val targetColumnsToIndex: Seq[String] = Seq("rider", "driver") val metadataOpts: Map[String, String] = Map( HoodieMetadataConfig.ENABLE.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> targetColumnsToIndex.mkString(",") ) @@ -114,7 +115,7 @@ class PartitionStatsIndexTestBase extends HoodieStatsIndexTestBase { spark, inputDf.schema, HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(inputDf.schema, "record", ""), - HoodieMetadataConfig.newBuilder().enable(true).build(), + HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build(), metaClient) val partitionStats = partitionStatsIndex.loadColumnStatsIndexRecords(List("partition", "trip_type"), shouldReadInMemory = true).collectAsList() assertEquals(0, partitionStats.size()) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala index 34963c7b4e225..9e4aa43ced96d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestBasicSchemaEvolution.scala @@ -19,6 +19,7 @@ package org.apache.hudi.functional import org.apache.hudi.{DataSourceWriteOptions, HoodieSchemaConversionUtils, ScalaAssertionSupport, SparkAdapterSupport} import org.apache.hudi.HoodieConversionUtils.toJavaOption +import org.apache.hudi.common.config.HoodieMetadataConfig import org.apache.hudi.common.config.RecordMergeMode import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType} import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver} @@ -55,7 +56,8 @@ class TestBasicSchemaEvolution extends HoodieSparkClientTestBase with ScalaAsser DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition", HoodieTableConfig.ORDERING_FIELDS.key -> "timestamp", - HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true" ) val verificationCol: String = "driver" diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala index 85f20ad3050fb..c0c5910da9eb1 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala @@ -34,7 +34,7 @@ import org.apache.hudi.common.table.timeline.TimelineMetadataUtils.deserializeAv import org.apache.hudi.common.table.view.HoodieTableFileSystemView import org.apache.hudi.common.testutils.HoodieTestDataGenerator.recordsToStrings import org.apache.hudi.config.{HoodieCleanConfig, HoodieClusteringConfig, HoodieCompactionConfig, HoodieLockConfig, HoodieWriteConfig} -import org.apache.hudi.exception.HoodieWriteConflictException +import org.apache.hudi.exception.{HoodieException, HoodieWriteConflictException} import org.apache.hudi.keygen.constant.KeyGeneratorOptions import org.apache.hudi.metadata.{HoodieBackedTableMetadata, MetadataPartitionType} import org.apache.hudi.util.{JavaConversions, JFunction} @@ -45,7 +45,7 @@ import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, Bitwi import org.apache.spark.sql.hudi.DataSkippingUtils import org.apache.spark.sql.types.StringType import org.junit.jupiter.api.{Tag, Test} -import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} +import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue, fail} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, EnumSource, MethodSource} @@ -65,6 +65,24 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { val sqlTempTable = "hudi_tbl" + /** + * Test case to validate partition stats cannot be created without column stats. + */ + @Test + def testPartitionStatsWithoutColumnStats(): Unit = { + // remove column stats enable key from commonOpts + val hudiOpts = commonOpts + (HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false") + // should throw an exception as column stats is required for partition stats + try { + doWriteAndValidateDataAndPartitionStats(hudiOpts, + operation = DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, + saveMode = SaveMode.Overwrite) + fail("Should have thrown exception") + } catch { + case e: HoodieException => assertTrue(e.getCause.getMessage.startsWith("Column stats partition must be enabled to generate partition stats.")) + } + } + /** * Test case to validate partition stats for a logical type column */ @@ -393,7 +411,8 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name(), HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", - HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key -> "true") + HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true") // First ingest. doWriteAndValidateDataAndPartitionStats( @@ -467,6 +486,7 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(latestDf.schema, "record", ""), HoodieMetadataConfig.newBuilder() .enable(true) + .withMetadataIndexPartitionStats(true) .build(), metaClient) val partitionStats = partitionStatsIndex.loadColumnStatsIndexRecords( @@ -481,6 +501,7 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { hoodieSchema, HoodieMetadataConfig.newBuilder() .enable(true) + .withMetadataIndexPartitionStats(true) .build(), metaClient) val columnStats = columnStatsIndex @@ -544,6 +565,7 @@ class TestPartitionStatsIndex extends PartitionStatsIndexTestBase { DataSourceWriteOptions.TABLE_TYPE.key -> HoodieTableType.MERGE_ON_READ.name(), HoodieMetadataConfig.ENABLE.key() -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key() -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key() -> "true", DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> "true") doWriteAndValidateDataAndPartitionStats( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala index 1e10ce1c8618b..738ab169f0856 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala @@ -705,7 +705,7 @@ class TestPartitionStatsIndexWithSql extends HoodieSparkSqlTestBase { } private def getTableFileSystemView(metaClient: HoodieTableMetaClient): HoodieTableFileSystemView = { - val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build() + val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build() val metadataTable = new HoodieBackedTableMetadata(new HoodieSparkEngineContext(spark.sparkContext), metaClient.getStorage, metadataConfig, metaClient.getBasePath.toString) new HoodieTableFileSystemView( diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsPruning.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsPruning.scala index 3b5d3eb0320aa..2e07b702592da 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsPruning.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsPruning.scala @@ -47,7 +47,8 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase { def testMetadataPSISimple(testCase: ColumnStatsTestCase): Unit = { val metadataOpts = Map( HoodieMetadataConfig.ENABLE.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true" ) val commonOpts = Map( @@ -76,7 +77,8 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase { def testMetadataColumnStatsIndex(testCase: ColumnStatsTestCase): Unit = { val metadataOpts = Map( HoodieMetadataConfig.ENABLE.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true" + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true" ) val commonOpts = Map( @@ -123,6 +125,7 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase { val metadataOpts1 = Map( HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true", HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5,c6,c7,c8" // ignore c4 ) @@ -142,6 +145,7 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase { val metadataOpts2 = Map( HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true", HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c5,c7,c8" // ignore c4,c6 ) @@ -159,7 +163,8 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase { // disable cols stats val metadataOpts3 = Map( HoodieMetadataConfig.ENABLE.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false" + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "false" ) // disable col stats @@ -181,7 +186,8 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase { val partitionCol : String = "c8" val metadataOpts = Map( HoodieMetadataConfig.ENABLE.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false" + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "false" ) val commonOpts = Map( @@ -210,7 +216,8 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase { val metadataOpts0 = Map( HoodieMetadataConfig.ENABLE.key -> "true", - HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false" + HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "false", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "false" ) // updates @@ -238,6 +245,7 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase { val metadataOpts1 = Map( HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true", HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c4,c5,c6,c7,c8" ) @@ -334,6 +342,7 @@ class TestPartitionStatsPruning extends ColumnStatIndexTestBase { val metadataOpts1 = Map( HoodieMetadataConfig.ENABLE.key -> "true", HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key -> "true", + HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key -> "true", HoodieMetadataConfig.COLUMN_STATS_INDEX_FOR_COLUMNS.key -> "c1,c2,c3,c4,c5,c6,c7,c8" ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala index 31e57d7448459..7153c4eb8c3b0 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/feature/index/TestExpressionIndex.scala @@ -2130,7 +2130,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase with SparkAdapterSuppor val lastCompletedInstant = metaClient.reloadActiveTimeline().getCommitsTimeline.filterCompletedInstants().lastInstant() val configBuilder = getWriteConfigBuilder(Map.empty, metaClient.getBasePath.toString) configBuilder.withMetadataConfig(HoodieMetadataConfig.newBuilder() - .withMetadataIndexColumnStats(false).build()) + .withMetadataIndexColumnStats(false).withMetadataIndexPartitionStats(false).build()) val writeClient = new SparkRDDWriteClient(new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)), configBuilder.build()) writeClient.rollback(lastCompletedInstant.get().requestedTime) writeClient.close() @@ -2452,7 +2452,7 @@ class TestExpressionIndex extends HoodieSparkSqlTestBase with SparkAdapterSuppor private def getTableFileSystemView(metaClient: HoodieTableMetaClient): HoodieTableFileSystemView = { val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) - val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).build() + val metadataConfig = HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexPartitionStats(true).build() val metadataTable = new HoodieBackedTableMetadata(engineContext, metaClient.getStorage, metadataConfig, metaClient.getBasePath.toString) new HoodieTableFileSystemView( metadataTable, diff --git a/hudi-utilities/HOODIE_MDT_STATS_USAGE.md b/hudi-utilities/HOODIE_MDT_STATS_USAGE.md new file mode 100644 index 0000000000000..51fb25be76ff4 --- /dev/null +++ b/hudi-utilities/HOODIE_MDT_STATS_USAGE.md @@ -0,0 +1,194 @@ + + + +# MetadataBenchmarkingTool Configuration Guide + +## Overview + +`MetadataBenchmarkingTool` is a utility for testing Hudi Metadata Table column statistics functionality. It creates a Hudi table, generates column statistics, and demonstrates data skipping capabilities. + +## Configuration Parameters + +### Required Parameters + +| Parameter | Short Flag | Description | Example Value | +|-----------|------------|-------------|---------------| +| `--table-base-path` | `-tbp` | Base path where the Hudi table will be created | `/tmp/hudi_table` | +| `--num-cols-to-index` | `-num-cols` | Number of columns to index (1 for tenantID, 2 for tenantID & age) | `2` | +| `--col-stats-file-group-count` | `-col-fg-count` | Number of file groups for the column stats partition in metadata table | `10` | +| `--num-files-to-bootstrap` | `-nfb` | Number of files to create during bootstrap phase | `1000` | +| `--num-partitions` | `-np` | Number of date partitions to generate (starting from 2025-01-01) | `5` | + +### Optional Parameters + +| Parameter | Short Flag | Description | Usage | Default Value | +|-----------|------------|-------------|-------|---------------| +| `--mode` | `-m` | Benchmark mode: `BOOTSTRAP` (write only), `QUERY` (read only), `BOOTSTRAP_AND_QUERY` (both) | `--mode BOOTSTRAP` | `BOOTSTRAP_AND_QUERY` | +| `--num-files-for-incremental` | `-nfi` | Total number of files to create across incremental commits | `--num-files-for-incremental 200` | `0` | +| `--num-commits-for-incremental` | `-nci` | Number of incremental commits to distribute files across | `--num-commits-for-incremental 2` | `0` | +| `--partition-filter` | `-pf` | Partition filter predicate for querying (e.g., `"dt > '2025-01-01'"`) | `--partition-filter "dt > '2025-01-01'"` | `"dt = '2025-01-01'"` | +| `--data-filter` | `-df` | Data filter predicate for querying (e.g., `"age > 70"`). Can specify multiple filters separated by commas | `--data-filter "tenantID > 50000"` | `""` (uses default filters based on indexed columns) | +| `--props` | | Path to properties file containing Hudi configurations | `--props /path/to/config.properties` | None | +| `--hoodie-conf` | | Individual Hudi configuration (can be specified multiple times) | `--hoodie-conf hoodie.metadata.enable=true` | None | +| `--help` | `-h` | Display help message | `--help` | - | + +## Configuration Details + +### `--table-base-path` +The location where the test Hudi table will be created. Can be a local filesystem path or HDFS path. + +### `--mode` +Benchmark execution mode: +- `BOOTSTRAP`: Only creates the table and bootstraps the metadata table (write-only) +- `QUERY`: Only runs data skipping benchmark queries (read-only, requires existing table) +- `BOOTSTRAP_AND_QUERY`: Runs both bootstrap and query phases (default) + +### `--num-cols-to-index` +Number of columns to index for column statistics. Accepts values 1 or 2. + +**Supported columns:** +- `tenantID` - Long type, values range 30000-60000 (30k-60k), always indexed +- `age` - Integer type, values range 20-100, indexed only when `--num-cols-to-index 2` + +**Examples:** +- Single column (tenantID only): `--num-cols-to-index 1` +- Two columns (tenantID & age): `--num-cols-to-index 2` + +### `--col-stats-file-group-count` +Determines the parallelism and file distribution in the metadata table's column stats partition. Higher values mean: +- More parallel write/read operations +- More files in the column stats partition +- Better distribution of column statistics records + +### `--num-files-to-bootstrap` +Number of data files to create during the bootstrap phase. Files are evenly distributed across partitions. +- Example: 1000 files with 5 partitions = 200 files per partition + +### `--num-partitions` +Number of date-based partitions to create. Partitions are generated sequentially starting from `2025-01-01`: +- `1` → `["2025-01-01"]` +- `3` → `["2025-01-01", "2025-01-02", "2025-01-03"]` +- `10` → `["2025-01-01" through "2025-01-10"]` + +### `--num-files-for-incremental` and `--num-commits-for-incremental` +These parameters enable incremental ingestion after bootstrap: +- `--num-files-for-incremental`: Total number of files to create across all incremental commits +- `--num-commits-for-incremental`: Number of incremental commits to distribute files across + +Files are evenly distributed across commits. Both parameters must be set to a value > 0 to enable incremental ingestion. + +**Example:** `--num-files-for-incremental 200 --num-commits-for-incremental 2` creates 100 files per commit across 2 commits. + +### `--partition-filter` +Partition filter predicate used during query benchmarking. Must match the partition format (`dt = 'yyyy-MM-dd'` or `dt > 'yyyy-MM-dd'`). + +**Default:** `"dt = '2025-01-01'"` (matches first partition) + +**Examples:** +- `--partition-filter "dt = '2025-01-01'"` - Query only first partition +- `--partition-filter "dt > '2025-01-03'"` - Query partitions after the third one + +### `--data-filter` +Data filter predicate(s) used during query benchmarking. Multiple filters can be specified separated by commas. + +**Default behavior:** If not specified, uses default filters based on indexed columns: +- 1 column indexed: `age > 70` (note: age is not indexed, so this won't skip files) +- 2 columns indexed: `age > 70` and `tenantID > 50000` + +**Examples:** +- `--data-filter "tenantID > 50000"` - Filter on tenantID +- `--data-filter "age > 70, tenantID > 50000"` - Multiple filters + + +## Example Usage + +### Example 1: Index tenantID only (Bootstrap and Query) +```bash +spark-submit \ + --class org.apache.hudi.utilities.benchmarking.MetadataBenchmarkingTool \ + --master "local[*]" \ + packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-1.2.0-SNAPSHOT.jar \ + --table-base-path /tmp/hudi_test \ + --num-cols-to-index 1 \ + --col-stats-file-group-count 10 \ + --num-files-to-bootstrap 1000 \ + --num-partitions 5 +``` + +### Example 2: Index tenantID & age +```bash +spark-submit \ + --class org.apache.hudi.utilities.benchmarking.MetadataBenchmarkingTool \ + --master "local[*]" \ + packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-1.2.0-SNAPSHOT.jar \ + --table-base-path /tmp/hudi_test \ + --num-cols-to-index 2 \ + --col-stats-file-group-count 10 \ + --num-files-to-bootstrap 1000 \ + --num-partitions 5 +``` + +### Example 3: Bootstrap only (no query) +```bash +spark-submit \ + --class org.apache.hudi.utilities.benchmarking.MetadataBenchmarkingTool \ + --master "local[*]" \ + packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-1.2.0-SNAPSHOT.jar \ + --mode BOOTSTRAP \ + --table-base-path /tmp/hudi_test \ + --num-cols-to-index 2 \ + --col-stats-file-group-count 10 \ + --num-files-to-bootstrap 1000 \ + --num-partitions 5 +``` + +### Example 4: Query only (table must already exist) +```bash +spark-submit \ + --class org.apache.hudi.utilities.benchmarking.MetadataBenchmarkingTool \ + --master "local[*]" \ + packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-1.2.0-SNAPSHOT.jar \ + --mode QUERY \ + --table-base-path /tmp/hudi_test \ + --num-cols-to-index 2 \ + --col-stats-file-group-count 10 \ + --num-files-to-bootstrap 1000 \ + --num-partitions 5 \ + --partition-filter "dt > '2025-01-02'" \ + --data-filter "tenantID > 50000" +``` + +### Example 5: Bootstrap with incremental commits +```bash +spark-submit \ + --class org.apache.hudi.utilities.benchmarking.MetadataBenchmarkingTool \ + --master "local[*]" \ + packaging/hudi-utilities-bundle/target/hudi-utilities-bundle_2.12-1.2.0-SNAPSHOT.jar \ + --table-base-path /tmp/hudi_test \ + --num-cols-to-index 2 \ + --col-stats-file-group-count 10 \ + --num-files-to-bootstrap 100 \ + --num-partitions 3 \ + --num-files-for-incremental 200 \ + --num-commits-for-incremental 2 +``` + +This example: +- Bootstraps with 100 files across 3 partitions +- Then performs 2 incremental commits with 100 files each (200 total) +- Runs query benchmarking at the end \ No newline at end of file diff --git a/hudi-utilities/pom.xml b/hudi-utilities/pom.xml index 121e20552cd0c..1d23127e6b1f4 100644 --- a/hudi-utilities/pom.xml +++ b/hudi-utilities/pom.xml @@ -451,7 +451,6 @@ ${project.version} tests test-jar - test org.apache.hudi @@ -467,7 +466,6 @@ ${project.version} tests test-jar - test org.apache.hudi @@ -483,7 +481,6 @@ ${project.version} tests test-jar - test org.apache.hudi diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/benchmarking/MetadataBenchmarkingTool.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/benchmarking/MetadataBenchmarkingTool.java new file mode 100644 index 0000000000000..27ace78cd49d6 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/benchmarking/MetadataBenchmarkingTool.java @@ -0,0 +1,908 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.benchmarking; + +import org.apache.hudi.BaseHoodieTableFileIndex; +import org.apache.hudi.HoodieFileIndex; +import org.apache.hudi.HoodieSchemaConversionUtils; +import org.apache.hudi.client.SparkRDDWriteClient; +import org.apache.hudi.client.WriteClientTestUtils; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; +import org.apache.hudi.common.table.timeline.versioning.v1.InstantComparatorV1; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.testutils.InProcessTimeGenerator; +import org.apache.hudi.common.util.HoodieTimer; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.HoodieMetadataPayload; +import org.apache.hudi.metadata.HoodieMetadataWriteUtils; +import org.apache.hudi.metadata.HoodieTableMetadataUtil; +import org.apache.hudi.metadata.MetadataWriterTestUtils; +import org.apache.hudi.stats.HoodieColumnRangeMetadata; +import org.apache.hudi.stats.ValueMetadata; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.utilities.IdentitySplitter; +import org.apache.hudi.utilities.UtilHelpers; + +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import org.apache.hadoop.fs.Path; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.sql.HoodieCatalystExpressionUtils$; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.execution.datasources.NoopCache$; +import org.apache.spark.sql.types.StructType; +import org.bouncycastle.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.stream.Collectors; + +import scala.Tuple2; +import scala.collection.JavaConverters; +import scala.collection.Seq; + +import static org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP; +import static org.apache.hudi.metadata.MetadataPartitionType.COLUMN_STATS; +import static org.apache.hudi.metadata.MetadataPartitionType.FILES; + +public class MetadataBenchmarkingTool implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(MetadataBenchmarkingTool.class); + + // Table and column constants + private static final String TABLE_NAME = "test_mdt_stats_tbl"; + private static final String COL_TENANT_ID = "tenantID"; + private static final String COL_AGE = "age"; + + // Partition generation constants + private static final LocalDate PARTITION_START_DATE = LocalDate.of(2025, 1, 1); + private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + + // TenantID column stats range: 30000-60000 + private static final long TENANT_ID_MIN_BASE = 30000L; + private static final int TENANT_ID_RANGE = 30000; + private static final long TENANT_ID_MAX = 60000L; + + // Age column stats range: 20-99 + private static final int AGE_MIN_BASE = 20; + private static final int AGE_MIN_RANGE = 30; + private static final int AGE_MAX_RANGE = 50; + + // Column stats metadata defaults + private static final int COL_STATS_NULL_COUNT = 0; + private static final int COL_STATS_VALUE_COUNT = 1000; + private static final long COL_STATS_TOTAL_SIZE = 123456L; + private static final long COL_STATS_TOTAL_UNCOMPRESSED_SIZE = 123456L; + + private final Config cfg; + // Properties with source, hoodie client, key generator etc. + private TypedProperties props; + + private final SparkSession spark; + + private final JavaSparkContext jsc; + + private final HoodieEngineContext engineContext; + + /** + * Returns the AVRO schema string for the table. + * Schema includes: id, name, city, age, tenantID, dt + */ + private static String getAvroSchema() { + return "{\n" + + " \"type\": \"record\",\n" + + " \"name\": \"Employee\",\n" + + " \"namespace\": \"com.example.avro\",\n" + + " \"fields\": [\n" + + " { \"name\": \"id\", \"type\": \"string\" },\n" + + " { \"name\": \"name\", \"type\": \"string\" },\n" + + " { \"name\": \"city\", \"type\": \"string\" },\n" + + " { \"name\": \"age\", \"type\": \"int\" },\n" + + " { \"name\": \"tenantID\", \"type\": \"long\" },\n" + + " { \"name\": \"dt\", \"type\": \"string\" }\n" + + " ]\n" + + "}\n"; + } + + private static final String RECORD_ID = "id"; + + private static final String PARTITION_FIELDS = "dt"; + /** + * Returns the Spark StructType schema for data skipping queries. + * Reused across multiple places to avoid duplication. + */ + private static StructType getDataSchema() { + return new StructType() + .add("id", "string") + .add("name", "string") + .add("city", "string") + .add("age", "int") + .add("tenantID", "long") + .add("dt", "string"); + } + + /** + * Returns the list of columns to index based on numColumnsToIndex config. + * @param numColumnsToIndex 1 for tenantID only, 2 for tenantID & age + * @return List of column names to index + */ + private List getColumnsToIndex(int numColumnsToIndex) { + return numColumnsToIndex == 2 ? Arrays.asList(COL_TENANT_ID, COL_AGE) + : Collections.singletonList(COL_TENANT_ID); + } + + private String getColumnsToIndexString(int numColumnsToIndex) { + return String.join(",", getColumnsToIndex(numColumnsToIndex)); + } + + public MetadataBenchmarkingTool(SparkSession spark, Config cfg) { + this.spark = spark; + this.jsc = new JavaSparkContext(spark.sparkContext()); + this.engineContext = new HoodieSparkEngineContext(jsc); + this.cfg = cfg; + this.props = StringUtils.isNullOrEmpty(cfg.propsFilePath) + ? UtilHelpers.buildProperties(cfg.configs) + : readConfigFromFileSystem(jsc, cfg); + } + + private TypedProperties readConfigFromFileSystem(JavaSparkContext jsc, Config cfg) { + return UtilHelpers.readConfig(jsc.hadoopConfiguration(), new Path(cfg.propsFilePath), cfg.configs) + .getProps(true); + } + + public static class Config implements Serializable { + + /** + * Benchmark mode options. + */ + public enum BenchmarkMode { + BOOTSTRAP, // Only bootstrap metadata table + QUERY, // Only run data skipping benchmark + BOOTSTRAP_AND_QUERY // Run both (default) + } + + @Parameter(names = {"--mode", "-m"}, description = "Benchmark mode: BOOTSTRAP (write only), QUERY (read only), BOOTSTRAP_AND_QUERY (default)") + public BenchmarkMode mode = BenchmarkMode.BOOTSTRAP_AND_QUERY; + + @Parameter(names = {"--table-base-path", "-tbp"}, description = "Base path for the Hudi table", required = true) + public String tableBasePath = null; + + @Parameter(names = {"--num-cols-to-index", "-num-cols"}, description = "Number of columns to index (1 for tenantID, 2 for tenantID & age)", required = true) + public Integer numColumnsToIndex = 1; + + @Parameter(names = {"--col-stats-file-group-count", "-col-fg-count"}, description = "Number of file groups for column stats partition in metadata table", required = true) + public Integer colStatsFileGroupCount = 10; + + @Parameter(names = {"--num-partitions", "-np"}, description = "Number of partitions to create in the table", required = true) + public Integer numPartitions = 1; + + @Parameter(names = {"--num-files-to-bootstrap", "-nfb"}, description = "Number of files to create during bootstrap", required = true) + public Integer numFilesToBootstrap = 1000; + + @Parameter(names = {"--num-files-for-incremental", "-nfi"}, description = "Total number of files to create across incremental commits") + public Integer numFilesForIncrementalIngestion = 0; + + @Parameter(names = {"--num-commits-for-incremental", "-nci"}, description = "Number of incremental commits to distribute files across") + public Integer numOfcommitForIncrementalIngestion = 0; + + @Parameter(names = {"--partition-filter", "-pf"}, description = "Partition filter predicate for querying (e.g., \"dt > '2025-01-01'\")") + public String partitionFilter = "dt = '2025-01-01'"; + + @Parameter(names = {"--data-filter", "-df"}, description = "data filter predicate for querying (e.g., \"age > 70\")") + public String dataFilters = ""; + + @Parameter(names = {"--hoodie-conf"}, description = "Any configuration that can be set in the properties file " + + "(using the CLI parameter \"--props\") can also be passed command line using this parameter. This can be repeated", + splitter = IdentitySplitter.class) + public List configs = new ArrayList<>(); + + @Parameter(names = {"--props"}, description = "Path to properties file on localfs or dfs, with configurations for " + + "Hoodie client") + public String propsFilePath = null; + + @Parameter(names = {"--help", "-h"}, help = true) + public Boolean help = false; + + @Override + public String toString() { + return "MetadataBenchmarkingTool {\n" + + " --mode " + mode + ",\n" + + " --table-base-path " + tableBasePath + ",\n" + + " --num-partitions " + numPartitions + ",\n" + + " --num-cols-to-index " + numColumnsToIndex + ",\n" + + " --col-stats-file-group-count " + colStatsFileGroupCount + ",\n" + + " --num-files-to-bootstrap " + numFilesToBootstrap + ",\n" + + " --num-files-for-incremental " + numFilesForIncrementalIngestion + ",\n" + + " --num-commits-for-incremental " + numOfcommitForIncrementalIngestion + ",\n" + + " --partition-filter " + partitionFilter + "\n" + + "}"; + } + } + + public static void main(String[] args) { + final Config cfg = new Config(); + JCommander cmd = new JCommander(cfg, null, args); + + if (cfg.help || args.length == 0) { + cmd.usage(); + System.exit(1); + } + + final LocalDateTime now = LocalDateTime.now(); + final String currentHour = now.format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH")); + String jobName = "metadata-table-stats-analyzer"; + String sparkAppName = jobName + "-" + currentHour; + SparkSession spark = SparkSession.builder() + .appName(sparkAppName) + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate(); + + + try (MetadataBenchmarkingTool metadataBenchmarkingTool = new MetadataBenchmarkingTool(spark, cfg)) { + metadataBenchmarkingTool.run(); + } catch (Throwable throwable) { + LOG.error("Failed to get table size stats for " + cfg, throwable); + } finally { + spark.stop(); + } + } + + public void run() throws Exception { + int numPartitions = cfg.numPartitions; + List colsToIndex = getColumnsToIndex(cfg.numColumnsToIndex); + LOG.info("Data table base path: {}", cfg.tableBasePath); + LOG.info("Benchmark mode: {}", cfg.mode); + + HoodieWriteConfig dataWriteConfig = getWriteConfig(getAvroSchema(), cfg.tableBasePath, HoodieFailedWritesCleaningPolicy.EAGER); + + int totalFilesCreated = 0; + if (cfg.mode == Config.BenchmarkMode.BOOTSTRAP || cfg.mode == Config.BenchmarkMode.BOOTSTRAP_AND_QUERY) { + HoodieTableMetaClient dataMetaClient = initializeDataTableMetaClient(TABLE_NAME, dataWriteConfig); + + // Bootstrap phase + Pair> bootstrapResult = bootstrapMetadataTable( + cfg.numFilesToBootstrap, numPartitions, colsToIndex, dataWriteConfig, dataMetaClient); + totalFilesCreated = bootstrapResult.getLeft(); + List partitions = bootstrapResult.getRight(); + LOG.info("Completed bootstrapping Metadata table with {} files", totalFilesCreated); + + // Incremental ingestion phase + if (cfg.numFilesForIncrementalIngestion > 0 && cfg.numOfcommitForIncrementalIngestion > 0) { + int incrementalFiles = runIncrementalIngestion( + cfg.numFilesForIncrementalIngestion, + cfg.numOfcommitForIncrementalIngestion, + partitions, + colsToIndex, + dataWriteConfig, + dataMetaClient); + totalFilesCreated += incrementalFiles; + LOG.info("Completed incremental ingestion with {} additional files across {} commits", + incrementalFiles, cfg.numOfcommitForIncrementalIngestion); + } + } + + if (cfg.mode == Config.BenchmarkMode.QUERY || cfg.mode == Config.BenchmarkMode.BOOTSTRAP_AND_QUERY) { + HoodieTableMetaClient dataMetaClient = loadExistingMetaClient(dataWriteConfig); + if (totalFilesCreated == 0) { + totalFilesCreated = cfg.numFilesToBootstrap + cfg.numFilesForIncrementalIngestion; + if (totalFilesCreated == 0) { + LOG.warn("Total files count is 0. Data skipping ratio calculation may be inaccurate."); + } + } + benchmarkDataSkipping(dataWriteConfig, dataMetaClient, totalFilesCreated); + LOG.info("Completed query benchmarking"); + } + } + + private Pair> bootstrapMetadataTable( + int numFiles, int numPartitions, List colsToIndex, + HoodieWriteConfig dataWriteConfig, HoodieTableMetaClient dataTableMetaClient) throws Exception { + + LOG.info("Bootstrapping metadata table: {} files, {} partitions, columns [{}], {} col stats file groups", + numFiles, numPartitions, String.join(",", colsToIndex), cfg.colStatsFileGroupCount); + + List partitions = generatePartitions(numPartitions); + int filesPerPartition = numFiles / numPartitions; + + String dataCommitTime = InProcessTimeGenerator.createNewInstantTime(); + + // Create partition directories on the filesystem + createPartitionPaths(dataTableMetaClient, partitions, dataCommitTime); + + HoodieCommitMetadata commitMetadata = createCommitMetadataAndAddToTimeline(partitions, filesPerPartition, dataCommitTime, dataTableMetaClient); + + HoodieTimer timer = HoodieTimer.start(); + try (SparkHoodieBackedTableMetadataBenchmarkWriter metadataWriter = + new SparkHoodieBackedTableMetadataBenchmarkWriter( + engineContext.getStorageConf(), dataWriteConfig, + HoodieFailedWritesCleaningPolicy.EAGER, engineContext, Option.empty(), false)) { + + metadataWriter.initMetadataMetaClient(); + bootstrapFilesPartition(metadataWriter, commitMetadata, dataCommitTime); + bootstrapColumnStatsPartition(metadataWriter, commitMetadata, colsToIndex); + } + LOG.info("Time taken to perform bootstrapping metadata table is {}", timer.endTimer()); + + return Pair.of(filesPerPartition * numPartitions, partitions); + } + + /** + * Creates commit metadata for the test table and adds it to the timeline. + */ + private HoodieCommitMetadata createCommitMetadataAndAddToTimeline(List partitions, + int filesPerPartition, String dataCommitTime, HoodieTableMetaClient dataTableMetaClient) throws Exception { + HoodieTestTable testTable = HoodieTestTable.of(dataTableMetaClient); + HoodieSchema hoodieSchema = HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(getDataSchema(), "mdt_benchmarking_struct","mdt_benchmarking_namespace"); + HoodieTestTable.TEST_TABLE_SCHEMA = hoodieSchema.toString(); + + HoodieCommitMetadata commitMetadata = testTable.createCommitMetadata( + dataCommitTime, WriteOperationType.INSERT, partitions, filesPerPartition, false); + + HoodieInstant requestedInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.COMMIT_ACTION, dataCommitTime, + InstantComparatorV1.REQUESTED_TIME_BASED_COMPARATOR); + dataTableMetaClient.getActiveTimeline().createNewInstant(requestedInstant); + dataTableMetaClient.getActiveTimeline().transitionRequestedToInflight(requestedInstant, Option.empty()); + + Map extraMetadata = new HashMap<>(); + extraMetadata.put(HoodieCommitMetadata.SCHEMA_KEY, commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY)); + + dataTableMetaClient.getActiveTimeline().saveAsComplete(false, + dataTableMetaClient.createNewInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMMIT_ACTION, dataCommitTime), Option.of(commitMetadata)); + LOG.info("Created commit metadata at instant {} with {} files per partition", dataCommitTime, filesPerPartition); + return commitMetadata; + } + + /** + * Bootstraps the FILES partition in the metadata table. + */ + @SuppressWarnings("unchecked") + private void bootstrapFilesPartition( + SparkHoodieBackedTableMetadataBenchmarkWriter metadataWriter, + HoodieCommitMetadata commitMetadata, + String dataCommitTime) throws IOException { + HoodieTimer timer = HoodieTimer.start(); + List filesRecords = HoodieTableMetadataUtil + .convertMetadataToFilesPartitionRecords(commitMetadata, dataCommitTime); + LOG.info("Bootstrapping FILES partition with {} records", filesRecords.size()); + + String instantTime = generateUniqueInstantTime(0); + Pair> fileGroupCountAndRecords = + Pair.of(1, engineContext.parallelize(filesRecords, 10)); + metadataWriter.initializeFilegroupsAndCommit(FILES, FILES.getPartitionPath(), fileGroupCountAndRecords, instantTime); + LOG.info("Time taken for bootstrapping files partition is {}", timer.endTimer()); + } + + /** + * Bootstraps the COLUMN_STATS partition in the metadata table. + */ + @SuppressWarnings("rawtypes") + private void bootstrapColumnStatsPartition( + SparkHoodieBackedTableMetadataBenchmarkWriter metadataWriter, + HoodieCommitMetadata commitMetadata, + List colsToIndex) throws IOException { + + HoodieTimer timer = HoodieTimer.start(); + HoodieData columnStatsRecords = generateColumnStatsRecordsForCommitMetadata(commitMetadata); + LOG.info("Bootstrapping COLUMN_STATS partition with {} file groups", cfg.colStatsFileGroupCount); + + String instantTime = generateUniqueInstantTime(1); + Pair> fileGroupCountAndRecords = + Pair.of(cfg.colStatsFileGroupCount, columnStatsRecords); + metadataWriter.initializeFilegroupsAndCommit( + COLUMN_STATS, COLUMN_STATS.getPartitionPath(), fileGroupCountAndRecords, instantTime, colsToIndex); + LOG.info("Time taken to bootstrap column stats is {}", timer.endTimer()); + } + + /** + * Runs incremental ingestion rounds after bootstrap. + * Distributes files across multiple commits and uses upsertPreppedRecords for each commit. + * Reuses the partitions created during bootstrap. + */ + private int runIncrementalIngestion( + int totalFiles, + int numCommits, + List partitions, + List colsToIndex, + HoodieWriteConfig dataWriteConfig, + HoodieTableMetaClient dataMetaClient) throws Exception { + + LOG.info("Starting incremental ingestion: {} files across {} commits using {} existing partitions", + totalFiles, numCommits, partitions.size()); + + int filesPerCommit = totalFiles / numCommits; + int remainingFiles = totalFiles % numCommits; + + HoodieWriteConfig mdtWriteConfig = HoodieMetadataWriteUtils.createMetadataWriteConfig( + dataWriteConfig, + HoodieFailedWritesCleaningPolicy.EAGER, + HoodieTableVersion.NINE); + + int totalFilesCreated = 0; + + try (HoodieBackedTableMetadataWriter metadataWriter = + (HoodieBackedTableMetadataWriter) org.apache.hudi.metadata.SparkMetadataWriterFactory.create( + engineContext.getStorageConf(), + dataWriteConfig, + engineContext, + Option.empty(), + dataMetaClient.getTableConfig())) { + + // HoodieBackedTableMetadataWriter initializes metadata reader in constructor, no need to call initMetadataMetaClient() + + for (int commitIdx = 0; commitIdx < numCommits; commitIdx++) { + int filesThisCommit = filesPerCommit + (commitIdx < remainingFiles ? 1 : 0); + if (filesThisCommit == 0) { + continue; + } + + String dataCommitTime = InProcessTimeGenerator.createNewInstantTime(); + HoodieCommitMetadata commitMetadata = createCommitMetadataAndAddToTimeline( + partitions, filesThisCommit, dataCommitTime, dataMetaClient); + + // Partitions already exist from bootstrap, no need to create them again + // createFilesForCommit(dataMetaClient, commitMetadata); + + // Generate records for both partitions + @SuppressWarnings("unchecked") + List> filesRecords = + (List>) (List) + HoodieTableMetadataUtil.convertMetadataToFilesPartitionRecords(commitMetadata, dataCommitTime); + + HoodieData columnStatsRecords = generateColumnStatsRecordsForCommitMetadata(commitMetadata); + + // Use upsertPreppedRecords for incremental commits + performIncrementalCommit( + metadataWriter, + mdtWriteConfig, + dataCommitTime, + filesRecords, + columnStatsRecords, + colsToIndex); + + totalFilesCreated += filesThisCommit; + LOG.info("Completed incremental commit {}: {} files (total: {})", + commitIdx + 1, filesThisCommit, totalFilesCreated); + } + } + + return totalFilesCreated; + } + + /** + * Performs an incremental commit using upsertPreppedRecords. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private void performIncrementalCommit( + HoodieBackedTableMetadataWriter metadataWriter, + HoodieWriteConfig mdtWriteConfig, + String dataCommitTime, + List> filesRecords, + HoodieData columnStatsRecords, + List colsToIndex) throws Exception { + + String mdtCommitTime = dataCommitTime; + + try (SparkRDDWriteClient mdtWriteClient = + new SparkRDDWriteClient<>(engineContext, mdtWriteConfig)) { + + WriteClientTestUtils.startCommitWithTime(mdtWriteClient, mdtCommitTime); + + JavaRDD> filesRDD = jsc.parallelize(filesRecords, 1); + + org.apache.hudi.metadata.HoodieBackedTableMetadataWriter, JavaRDD> + sparkMetadataWriter = (org.apache.hudi.metadata.HoodieBackedTableMetadataWriter) metadataWriter; + + @SuppressWarnings({"rawtypes", "unchecked"}) + Map> partitionRecordsMap = new HashMap<>(); + partitionRecordsMap.put( + HoodieTableMetadataUtil.PARTITION_NAME_FILES, + (HoodieData) (HoodieData) HoodieJavaRDD.of(filesRDD)); + partitionRecordsMap.put( + HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS, + columnStatsRecords); + + // Tag records with location + Pair, List> taggedResult = + MetadataWriterTestUtils.tagRecordsWithLocation( + sparkMetadataWriter, + partitionRecordsMap, + false // isInitializing = false for incremental commits + ); + + // Convert back to JavaRDD + JavaRDD> allTaggedRecords = + (JavaRDD>) (JavaRDD) HoodieJavaRDD + .getJavaRDD(taggedResult.getKey()); + + // Use upsertPreppedRecords for incremental commits + JavaRDD writeStatuses = mdtWriteClient.upsertPreppedRecords(allTaggedRecords, mdtCommitTime); + mdtWriteClient.commit(mdtCommitTime, writeStatuses, Option.empty(), HoodieTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap()); + } + } + + String generateUniqueInstantTime(int offset) { + return HoodieInstantTimeGenerator.instantTimePlusMillis(SOLO_COMMIT_TIMESTAMP, offset); + } + + private HoodieTableMetaClient initializeDataTableMetaClient(String tableName, HoodieWriteConfig dataConfig) throws IOException { + return HoodieTableMetaClient.newTableBuilder() + .setTableVersion(HoodieTableVersion.NINE) + .setTableType(HoodieTableType.COPY_ON_WRITE) + .setTableName(tableName) + .setPartitionFields(PARTITION_FIELDS) + .setRecordKeyFields(RECORD_ID) + .initTable(engineContext.getStorageConf(), dataConfig.getBasePath()); + } + + /** + * Loads an existing HoodieTableMetaClient. Throws exception if table doesn't exist. + * Used in QUERY mode where the table must already exist. + */ + private HoodieTableMetaClient loadExistingMetaClient(HoodieWriteConfig dataConfig) { + try { + return HoodieTableMetaClient.builder() + .setConf(engineContext.getStorageConf()) + .setBasePath(dataConfig.getBasePath()) + .build(); + } catch (Exception e) { + throw new IllegalStateException( + "Table does not exist at " + dataConfig.getBasePath() + ". " + + "QUERY mode requires an existing table. Use BOOTSTRAP or BOOTSTRAP_AND_QUERY mode first.", e); + } + } + + /** + * Creates partition directories on the filesystem. + */ + private void createPartitionPaths(HoodieTableMetaClient metaClient, List partitions, String instantTime) throws IOException { + StoragePath basePath = metaClient.getBasePath(); + for (String partition : partitions) { + StoragePath fullPartitionPath = new StoragePath(basePath, partition); + metaClient.getStorage().createDirectory(fullPartitionPath); + + new HoodiePartitionMetadata(metaClient.getStorage(), instantTime, + new StoragePath(metaClient.getBasePath().toString()), fullPartitionPath, + metaClient.getTableConfig().getPartitionMetafileFormat()).trySave(); + } + LOG.info("Created {} partition directories under {}", partitions.size(), basePath); + } + + /** + * Generates a list of date-based partition paths incrementing by day. + * Starting from 2020-01-01, creates partitions for consecutive days based on numPartitions. + *

+ * Example: + * numPartitions = 1 -> ["2020-01-01"] + * numPartitions = 3 -> ["2020-01-01", "2020-01-02", "2020-01-03"] + * numPartitions = 10 -> ["2020-01-01", "2020-01-02", ..., "2020-01-10"] + * + * @param numPartitions Number of partitions to generate + * @return List of partition paths in yyyy-MM-dd format + */ + private List generatePartitions(int numPartitions) { + if (numPartitions <= 0) { + throw new IllegalArgumentException("numPartitions must be greater than 0, got: " + numPartitions); + } + + List partitions = new ArrayList<>(); + for (int i = 0; i < numPartitions; i++) { + LocalDate partitionDate = PARTITION_START_DATE.plusDays(i); + partitions.add(partitionDate.format(DATE_FORMATTER)); + } + + LOG.warn("Generated {} partitions: {} to {}. Ensure --partition-filter matches these partitions.", + numPartitions, + partitions.get(0), + partitions.get(partitions.size() - 1)); + + return partitions; + } + + /** + * Benchmarks data skipping using column stats index via HoodieFileIndex.filterFileSlices. + * + * @param dataConfig The write config for the data table + * @param dataMetaClient The meta client for the data table + * @param numFiles The total number of files in the commit + */ + private void benchmarkDataSkipping( + HoodieWriteConfig dataConfig, + HoodieTableMetaClient dataMetaClient, + int numFiles) { + + LOG.info("Running data skipping benchmark"); + dataMetaClient = HoodieTableMetaClient.reload(dataMetaClient); + + HoodieFileIndex fileIndex = createHoodieFileIndex(dataConfig, dataMetaClient); + StructType dataSchema = getDataSchema(); + + Seq dataFiltersSeq = JavaConverters + .asScalaBuffer(buildDataFilters(dataSchema)).toList(); + Seq partitionFiltersSeq = JavaConverters + .asScalaBuffer(Collections.singletonList(buildPartitionFilter(dataSchema))).toList(); + + long startTime = System.currentTimeMillis(); + Seq, Seq>> filteredSlices = + fileIndex.filterFileSlices(dataFiltersSeq, partitionFiltersSeq, false); + long filterTimeMs = System.currentTimeMillis() - startTime; + + int totalFileSlices = countFileSlices(filteredSlices); + + LOG.info("filterFileSlices took {} ms", filterTimeMs); + LOG.info("File slices returned: {} / {}", totalFileSlices, numFiles); + if (numFiles > 0) { + double skippingRatio = ((double) (numFiles - totalFileSlices) / numFiles) * 100.0; + LOG.info(String.format("Data skipping ratio: %.2f%%", skippingRatio)); + } + } + + /** + * Creates a HoodieFileIndex configured for data skipping with column stats. + */ + @SuppressWarnings("deprecation") + private HoodieFileIndex createHoodieFileIndex(HoodieWriteConfig dataConfig, HoodieTableMetaClient metaClient) { + Map options = new HashMap<>(); + options.put("path", dataConfig.getBasePath()); + options.put("hoodie.datasource.read.data.skipping.enable", "true"); + options.put("hoodie.metadata.enable", "true"); + options.put("hoodie.metadata.index.column.stats.enable", "true"); + options.put(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "false"); + options.put("hoodie.metadata.index.column.stats.column.list", getColumnsToIndexString(cfg.numColumnsToIndex)); + spark.sqlContext().conf().setConfString("hoodie.fileIndex.dataSkippingFailureMode", "strict"); + + scala.collection.immutable.Map scalaOptions = JavaConverters.mapAsScalaMap(options) + .toMap(scala.Predef$.MODULE$.conforms()); + + return new HoodieFileIndex( + spark, + metaClient, + scala.Option.apply(getDataSchema()), + scalaOptions, + NoopCache$.MODULE$, + false, + false); + } + + /** + * Builds a data filter expression based on the indexed columns. + */ + private List buildDataFilters(StructType dataSchema) { + final List filterStrings; + + if (StringUtils.nonEmpty(cfg.dataFilters)) { + filterStrings = Arrays.stream(Strings.split(cfg.dataFilters, ',')) + .map(String::trim) + .filter(StringUtils::nonEmpty) + .collect(Collectors.toList()); + } else { + filterStrings = getDefaultDataFilters(); + } + + LOG.info("Using data filters: {}", filterStrings); + + return filterStrings.stream() + .map(filter -> + HoodieCatalystExpressionUtils$.MODULE$.resolveExpr(spark, filter, dataSchema)) + .collect(Collectors.toList()); + } + + private List getDefaultDataFilters() { + if (cfg.numColumnsToIndex == 1) { + return Collections.singletonList(COL_AGE + " > 70"); + } + return Arrays.asList( + COL_AGE + " > 70", + COL_TENANT_ID + " > 50000" + ); + } + + /** + * Builds a partition filter expression based on the configured filter percentage. + */ + private Expression buildPartitionFilter(StructType dataSchema) { + String partitionFilter = getPartitionFilter(); + LOG.info("Using partition filter: {}", partitionFilter); + return HoodieCatalystExpressionUtils$.MODULE$.resolveExpr(spark, partitionFilter, dataSchema); + } + + /** + * Counts total file slices across all partitions. + */ + private int countFileSlices(Seq, Seq>> filteredSlices) { + int total = 0; + for (int j = 0; j < filteredSlices.size(); j++) { + total += filteredSlices.apply(j)._2().size(); + } + return total; + } + + /** + * Generates column stats records based on commit metadata file structure in a distributed manner. + * This method distributes work by table partition - each Spark partition processes + * all files within a single table partition to avoid memory issues. + * + * @param commitMetadata The commit metadata containing partition and file information + * @return HoodieData of column stats records, distributed across Spark partitions + */ + @SuppressWarnings("rawtypes") + private HoodieData generateColumnStatsRecordsForCommitMetadata( + HoodieCommitMetadata commitMetadata) { + + Map> partitionToWriteStats = commitMetadata.getPartitionToWriteStats(); + List>> partitionEntries = new ArrayList<>(partitionToWriteStats.entrySet()); + + LOG.info("Processing {} partitions with {} total files for column stats generation", + partitionEntries.size(), + partitionToWriteStats.values().stream().mapToInt(List::size).sum()); + + final int numColumnsToIndex = cfg.numColumnsToIndex; + + JavaRDD recordsRDD = jsc + .parallelize(partitionEntries, partitionEntries.size()) + .flatMap(entry -> processPartitionWriteStats(entry.getKey(), entry.getValue(), numColumnsToIndex).iterator()); + + return HoodieJavaRDD.of(recordsRDD); + } + + /** + * Processes all write stats for a partition and generates column stats records. + */ + @SuppressWarnings("unchecked") + private static List processPartitionWriteStats( + String partitionPath, List writeStats, int numColumnsToIndex) { + + List partitionRecords = new ArrayList<>(); + + for (HoodieWriteStat writeStat : writeStats) { + String fileName = new StoragePath(writeStat.getPath()).getName(); + List> columnRangeMetadata = + generateColumnRangeMetadataForFile(fileName, numColumnsToIndex); + + List> fileRecords = HoodieMetadataPayload + .createColumnStatsRecords(partitionPath, columnRangeMetadata, false) + .map(record -> (HoodieRecord) record) + .collect(Collectors.toList()); + + partitionRecords.addAll(fileRecords); + } + + return partitionRecords; + } + + /** + * Generates column range metadata for a single file. + */ + private static List> generateColumnRangeMetadataForFile( + String fileName, int numColumnsToIndex) { + + Random fileRandom = new Random(fileName.hashCode()); + List> columnRangeMetadata = new ArrayList<>(); + + columnRangeMetadata.add(createTenantIDStats(fileName, fileRandom)); + + if (numColumnsToIndex == 2) { + columnRangeMetadata.add(createAgeStats(fileName, fileRandom)); + } + + return columnRangeMetadata; + } + + /** + * Creates column stats for tenantID column with random values in range 30000-60000. + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private static HoodieColumnRangeMetadata createTenantIDStats(String fileName, Random random) { + long minTenantID = TENANT_ID_MIN_BASE + random.nextInt(TENANT_ID_RANGE); + long maxTenantID = minTenantID + random.nextInt((int) (TENANT_ID_MAX - minTenantID + 1)); + + return (HoodieColumnRangeMetadata) (HoodieColumnRangeMetadata) + HoodieColumnRangeMetadata.create( + fileName, COL_TENANT_ID, minTenantID, maxTenantID, + COL_STATS_NULL_COUNT, COL_STATS_VALUE_COUNT, + COL_STATS_TOTAL_SIZE, COL_STATS_TOTAL_UNCOMPRESSED_SIZE, + ValueMetadata.V1EmptyMetadata.get()); + } + + /** + * Creates column stats for age column with random values in range 20-99. + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private static HoodieColumnRangeMetadata createAgeStats(String fileName, Random random) { + int minAge = AGE_MIN_BASE + random.nextInt(AGE_MIN_RANGE); + int maxAge = minAge + random.nextInt(AGE_MAX_RANGE); + + return (HoodieColumnRangeMetadata) (HoodieColumnRangeMetadata) + HoodieColumnRangeMetadata.create( + fileName, COL_AGE, minAge, maxAge, + COL_STATS_NULL_COUNT, COL_STATS_VALUE_COUNT, + COL_STATS_TOTAL_SIZE, COL_STATS_TOTAL_UNCOMPRESSED_SIZE, + ValueMetadata.V1EmptyMetadata.get()); + } + + private HoodieWriteConfig getWriteConfig(String schemaStr, String basePath, HoodieFailedWritesCleaningPolicy cleaningPolicy) { + HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder().withPath(basePath) + .withProperties(props) + .forTable(TABLE_NAME) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .withMetadataIndexColumnStats(true) + .withMetadataIndexColumnStatsFileGroupCount(cfg.colStatsFileGroupCount) + .withMetadataIndexPartitionStats(false) + .build()); + if (StringUtils.nonEmpty(schemaStr)) { + builder.withSchema(schemaStr); + } + builder.withEngineType(EngineType.SPARK); + return builder.build(); + } + + /** + * Returns the partition filter predicate from configuration. + * + * @return Partition filter expression string + */ + private String getPartitionFilter() { + return cfg.partitionFilter; + } + + public void close() { + engineContext.cancelAllJobs(); + } +} diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/benchmarking/SparkHoodieBackedTableMetadataBenchmarkWriter.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/benchmarking/SparkHoodieBackedTableMetadataBenchmarkWriter.java new file mode 100644 index 0000000000000..27b17360d8980 --- /dev/null +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/benchmarking/SparkHoodieBackedTableMetadataBenchmarkWriter.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.benchmarking; + +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter; +import org.apache.hudi.storage.StorageConfiguration; + +import java.io.IOException; + +public class SparkHoodieBackedTableMetadataBenchmarkWriter extends SparkHoodieBackedTableMetadataWriter { + + SparkHoodieBackedTableMetadataBenchmarkWriter(StorageConfiguration hadoopConf, + HoodieWriteConfig writeConfig, + HoodieFailedWritesCleaningPolicy failedWritesCleaningPolicy, + HoodieEngineContext engineContext, + Option inflightInstantTimestamp, + boolean streamingWrites) { + super(hadoopConf, writeConfig, failedWritesCleaningPolicy, engineContext, inflightInstantTimestamp, streamingWrites); + } + + protected boolean initializeIfNeeded(HoodieTableMetaClient dataMetaClient, + Option inflightInstantTimestamp) throws IOException { + // no op. + return false; + } + + protected void initMetadataMetaClient() throws IOException { + metadataMetaClient = initializeMetaClient(); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java index 9aebbfd4dc3f1..e81af6a89b802 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java @@ -471,6 +471,7 @@ public void testPartitionStatsValidation(String tableType) throws Exception { Dataset inserts = makeInsertDf("000", 5); inserts.write().format("hudi").options(writeOptions) .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.BULK_INSERT.value()) + .option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "true") .mode(SaveMode.Overwrite) .save(basePath); // validate MDT partition stats @@ -479,6 +480,7 @@ public void testPartitionStatsValidation(String tableType) throws Exception { Dataset updates = makeUpdateDf("001", 5); updates.write().format("hudi").options(writeOptions) .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.UPSERT.value()) + .option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_PARTITION_STATS.key(), "true") .mode(SaveMode.Append) .save(basePath); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/benchmarking/TestMetadataBenchmarkingTool.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/benchmarking/TestMetadataBenchmarkingTool.java new file mode 100644 index 0000000000000..e29705afa27b7 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/benchmarking/TestMetadataBenchmarkingTool.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.benchmarking; + +import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Method; +import java.nio.file.Path; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Test cases for {@link MetadataBenchmarkingTool}. + */ +public class TestMetadataBenchmarkingTool { + + private static final Logger LOG = LoggerFactory.getLogger(TestMetadataBenchmarkingTool.class); + private static SparkSession sparkSession; + + @BeforeAll + public static void setUpClass() { + // Initialize SparkSession for tests + sparkSession = SparkSession.builder() + .appName("TestMetadataBenchmarkingTool") + .master("local[2]") + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") + .getOrCreate(); + + LOG.info("SparkSession and EngineContext initialized for tests"); + } + + @AfterAll + public static void tearDownClass() { + if (sparkSession != null) { + sparkSession.stop(); + sparkSession = null; + LOG.info("SparkSession stopped"); + } + } + + @Test + public void testMetadataBenchmarkingToolRunWithTwoColumns(@TempDir Path tempDir) { + LOG.info("Running MetadataBenchmarkingTool test with temp directory: {}", tempDir); + + // Create config for MetadataBenchmarkingTool with 2 columns (tenantID & age) + MetadataBenchmarkingTool.Config config = new MetadataBenchmarkingTool.Config(); + config.tableBasePath = tempDir.resolve("test_table_2cols").toString(); + config.numColumnsToIndex = 2; // tenantID & age + config.colStatsFileGroupCount = 10; + config.numFilesToBootstrap = 100; + config.numPartitions = 3; + + LOG.info("Test config: tableBasePath={}, numFilesToBootstrap={}, numPartitions={}, numColumnsToIndex={}, colStatsFileGroupCount={}", + config.tableBasePath, config.numFilesToBootstrap, config.numPartitions, config.numColumnsToIndex, config.colStatsFileGroupCount); + + // Run MetadataBenchmarkingTool + assertDoesNotThrow(() -> { + try (MetadataBenchmarkingTool metadataBenchmarkingTool = new MetadataBenchmarkingTool(sparkSession, config)) { + metadataBenchmarkingTool.run(); + } + }, "MetadataBenchmarkingTool.run1() should complete without throwing exceptions"); + + LOG.info("MetadataBenchmarkingTool test with 2 columns completed successfully"); + } + + @Test + public void testMetadataBenchmarkingToolRunWithOneColumn(@TempDir Path tempDir) { + LOG.info("Running MetadataBenchmarkingTool test with temp directory: {}", tempDir); + + // Create config for MetadataBenchmarkingTool with 1 column (tenantID only) + MetadataBenchmarkingTool.Config config = new MetadataBenchmarkingTool.Config(); + config.tableBasePath = tempDir.resolve("test_table_1col").toString(); + config.numColumnsToIndex = 1; // tenantID only + config.colStatsFileGroupCount = 10; + config.numFilesToBootstrap = 100; + config.numPartitions = 3; + + LOG.info("Test config: tableBasePath={}, numFilesToBootstrap={}, numPartitions={}, numColumnsToIndex={}, colStatsFileGroupCount={}", + config.tableBasePath, config.numFilesToBootstrap, config.numPartitions, config.numColumnsToIndex, config.colStatsFileGroupCount); + + // Run MetadataBenchmarkingTool + assertDoesNotThrow(() -> { + try (MetadataBenchmarkingTool metadataBenchmarkingTool = new MetadataBenchmarkingTool(sparkSession, config)) { + metadataBenchmarkingTool.run(); + } + }, "MetadataBenchmarkingTool.run1() should complete without throwing exceptions"); + + LOG.info("MetadataBenchmarkingTool test with 1 column completed successfully"); + } + + @Test + public void testMetadataBenchmarkingToolRunWithIncrementalCommits(@TempDir Path tempDir) { + LOG.info("Running MetadataBenchmarkingTool test with incremental commits, temp directory: {}", tempDir); + + // Create config for MetadataBenchmarkingTool with incremental commits + MetadataBenchmarkingTool.Config config = new MetadataBenchmarkingTool.Config(); + config.tableBasePath = tempDir.resolve("test_table_incremental").toString(); + config.numColumnsToIndex = 2; // tenantID & age + config.colStatsFileGroupCount = 10; + config.numFilesToBootstrap = 100; // Bootstrap with 100 files + config.numPartitions = 3; + config.numFilesForIncrementalIngestion = 200; // 2 commits * 100 files each + config.numOfcommitForIncrementalIngestion = 2; // 2 incremental commits + + LOG.info("Test config: tableBasePath={}, numFilesToBootstrap={}, numFilesForIncremental={}, numCommitsForIncremental={}, numPartitions={}, numColumnsToIndex={}, colStatsFileGroupCount={}", + config.tableBasePath, config.numFilesToBootstrap, config.numFilesForIncrementalIngestion, + config.numOfcommitForIncrementalIngestion, config.numPartitions, config.numColumnsToIndex, config.colStatsFileGroupCount); + + // Run MetadataBenchmarkingTool + assertDoesNotThrow(() -> { + try (MetadataBenchmarkingTool metadataBenchmarkingTool = new MetadataBenchmarkingTool(sparkSession, config)) { + metadataBenchmarkingTool.run(); + } + }, "MetadataBenchmarkingTool.run() with incremental commits should complete without throwing exceptions"); + + LOG.info("MetadataBenchmarkingTool test with incremental commits completed successfully"); + } + + /** + * Test getPartitionFilter method with various numPartitions values. + * Validates that the filter correctly excludes ~25% of partitions. + */ + @ParameterizedTest + @CsvSource({ + "1, ''", // Single partition - no filtering possible + "2, dt > '2020-01-01'", // 2 partitions: exclude 1 (50%), keeps 1 + "3, dt > '2020-01-01'", // 3 partitions: ceil(0.75)=1 excluded, keeps 2 (66%) + "4, dt > '2020-01-01'", // 4 partitions: ceil(1)=1 excluded, keeps 3 (75%) + "10, dt > '2020-01-03'", // 10 partitions: ceil(2.5)=3 excluded, keeps 7 (70%) + "100, dt > '2020-01-25'" // 100 partitions: ceil(25)=25 excluded, keeps 75 (75%) + }) + public void testGetPartitionFilter(int numPartitions, String expectedFilter) throws Exception { + LOG.info("Testing getPartitionFilter with numPartitions={}", numPartitions); + + // Create config with specified numPartitions + MetadataBenchmarkingTool.Config config = new MetadataBenchmarkingTool.Config(); + config.tableBasePath = "/tmp/test_partition_filter"; + config.numPartitions = numPartitions; + config.numColumnsToIndex = 1; + config.colStatsFileGroupCount = 1; + config.numFilesToBootstrap = 10; + + // Use reflection to call private getPartitionFilter method + MetadataBenchmarkingTool tool = new MetadataBenchmarkingTool(sparkSession, config); + Method method = MetadataBenchmarkingTool.class.getDeclaredMethod("getPartitionFilter"); + method.setAccessible(true); + String actualFilter = (String) method.invoke(tool); + + LOG.info("numPartitions={}, expectedFilter='{}', actualFilter='{}'", numPartitions, expectedFilter, actualFilter); + + assertEquals(expectedFilter, actualFilter, + String.format("Partition filter mismatch for numPartitions=%d", numPartitions)); + + // Calculate and log the expected data retention percentage + if (numPartitions > 1) { + int numPartitionsToExclude = (int) Math.ceil(numPartitions * 0.25); + int partitionsKept = numPartitions - numPartitionsToExclude; + double retentionPercentage = (partitionsKept * 100.0) / numPartitions; + LOG.info("numPartitions={}: excluding {} partitions, keeping {} partitions ({}%)", + numPartitions, numPartitionsToExclude, partitionsKept, String.format("%.1f", retentionPercentage)); + } + + tool.close(); + } +} +