From 99c76dca105f2b5a5fb59a16486cefa706f42516 Mon Sep 17 00:00:00 2001 From: pawel Date: Fri, 15 Sep 2023 13:50:59 -0700 Subject: [PATCH] prune files and partitions based on hoodie.datasource.read.incr.path.glob --- .../common/model/HoodieCommitMetadata.java | 18 +++++++++++------- .../partitioner/profile/WriteProfiles.java | 2 +- .../HoodieMergeOnReadTableInputFormat.java | 2 +- .../hadoop/utils/HoodieInputFormatUtils.java | 4 ++-- .../org/apache/hudi/DataSourceOptions.scala | 6 ++++++ .../hudi/MergeOnReadIncrementalRelation.scala | 11 +++++++---- 6 files changed, 28 insertions(+), 15 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java index b1014885c815..bdb376b232a9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCommitMetadata.java @@ -18,6 +18,7 @@ package org.apache.hudi.common.model; +import org.apache.hadoop.fs.GlobPattern; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.JsonUtils; import org.apache.hudi.common.util.Option; @@ -175,18 +176,21 @@ public Map getFileGroupIdAndFullPaths(String basePath * @param basePath The base path * @return the file full path to file status mapping */ - public Map getFullPathToFileStatus(Configuration hadoopConf, String basePath) { + public Map getFullPathToFileStatus(Configuration hadoopConf, String basePath, String fileNamePattern) { Map fullPathToFileStatus = new HashMap<>(); + GlobPattern globMatcher = new GlobPattern(fileNamePattern); for (List stats : getPartitionToWriteStats().values()) { // Iterate through all the written files. for (HoodieWriteStat stat : stats) { String relativeFilePath = stat.getPath(); - Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null; - if (fullPath != null) { - long blockSize = FSUtils.getFs(fullPath.toString(), hadoopConf).getDefaultBlockSize(fullPath); - FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), false, 0, blockSize, - 0, fullPath); - fullPathToFileStatus.put(fullPath.getName(), fileStatus); + if (fileNamePattern.isEmpty() || globMatcher.matches(relativeFilePath)) { + Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null; + if (fullPath != null) { + long blockSize = FSUtils.getFs(fullPath.toString(), hadoopConf).getDefaultBlockSize(fullPath); + FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), false, 0, blockSize, + 0, fullPath); + fullPathToFileStatus.put(fullPath.getName(), fileStatus); + } } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java index eaaa1a5a082d..c902a62282d5 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfiles.java @@ -184,7 +184,7 @@ private static Map getFilesToRead( case COPY_ON_WRITE: return metadata.getFileIdToFileStatus(hadoopConf, basePath); case MERGE_ON_READ: - return metadata.getFullPathToFileStatus(hadoopConf, basePath); + return metadata.getFullPathToFileStatus(hadoopConf, basePath, ""); default: throw new AssertionError(); } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java index 014548401719..ea618adcc41d 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieMergeOnReadTableInputFormat.java @@ -170,7 +170,7 @@ protected List listStatusForIncrementalMode(JobConf job, // build fileGroup from fsView List affectedFileStatus = Arrays.asList(HoodieInputFormatUtils - .listAffectedFilesForCommits(job, new Path(tableMetaClient.getBasePath()), metadataList)); + .listAffectedFilesForCommits(job, new Path(tableMetaClient.getBasePath()), metadataList, "")); // step3 HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0])); // build fileGroup from fsView diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java index f9c2c9ca29be..f5d33e3d05e0 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java @@ -474,12 +474,12 @@ private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFi * * @return the affected file status array */ - public static FileStatus[] listAffectedFilesForCommits(Configuration hadoopConf, Path basePath, List metadataList) { + public static FileStatus[] listAffectedFilesForCommits(Configuration hadoopConf, Path basePath, List metadataList, String fileNamePattern) { // TODO: Use HoodieMetaTable to extract affected file directly. HashMap fullPathToFileStatus = new HashMap<>(); // Iterate through the given commits. for (HoodieCommitMetadata metadata: metadataList) { - fullPathToFileStatus.putAll(metadata.getFullPathToFileStatus(hadoopConf, basePath.toString())); + fullPathToFileStatus.putAll(metadata.getFullPathToFileStatus(hadoopConf, basePath.toString(), fileNamePattern)); } return fullPathToFileStatus.values().toArray(new FileStatus[0]); } diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala index d2c8629df98e..b29b9ce44071 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DataSourceOptions.scala @@ -134,6 +134,12 @@ object DataSourceReadOptions { .withDocumentation("For the use-cases like users only want to incremental pull from certain partitions " + "instead of the full table. This option allows using glob pattern to directly filter on path.") + val INCR_PARTITION_GLOB: ConfigProperty[String] = ConfigProperty + .key("hoodie.datasource.read.incr.partition.glob") + .defaultValue("") + .withDocumentation("For the use-cases like users only want to incremental pull from certain partitions " + + "instead of the full table. This option allows using glob pattern to directly filter on partition.") + val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = HoodieCommonConfig.TIMESTAMP_AS_OF val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala index 93bf730a56d9..69cc71515e0f 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala @@ -102,13 +102,14 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext, val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits) val modifiedPartitions = getWritePartitionPaths(commitsMetadata) + val globMatcher = new GlobPattern("*" + globPartitionPattern) - modifiedPartitions.asScala.flatMap { relativePartitionPath => + modifiedPartitions.asScala.filter(p => globMatcher.matches(p)).flatMap { relativePartitionPath => fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala }.toSeq } - buildSplits(filterFileSlices(fileSlices, globPattern)) + buildSplits(fileSlices) } } @@ -163,7 +164,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { protected lazy val commitsMetadata = includedCommits.map(getCommitMetadata(_, super.timeline)).asJava protected lazy val affectedFilesInCommits: Array[FileStatus] = { - listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata) + listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata, "*" + globPathPattern) } // Record filters making sure that only records w/in the requested bounds are being fetched as part of the @@ -201,8 +202,10 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation { } } - protected def globPattern: String = + protected def globPathPattern: String = optParams.getOrElse(DataSourceReadOptions.INCR_PATH_GLOB.key, DataSourceReadOptions.INCR_PATH_GLOB.defaultValue) + protected def globPartitionPattern: String = + optParams.getOrElse(DataSourceReadOptions.INCR_PARTITION_GLOB.key, DataSourceReadOptions.INCR_PARTITION_GLOB.defaultValue) }