Skip to content

Commit

Permalink
prune files and partitions based on hoodie.datasource.read.incr.path.…
Browse files Browse the repository at this point in the history
…glob
  • Loading branch information
psendyk committed Sep 15, 2023
1 parent a3f0615 commit 9c4dd0f
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hudi.common.model;

import org.apache.hadoop.fs.GlobPattern;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.util.JsonUtils;
import org.apache.hudi.common.util.Option;
Expand Down Expand Up @@ -175,18 +176,21 @@ public Map<HoodieFileGroupId, String> getFileGroupIdAndFullPaths(String basePath
* @param basePath The base path
* @return the file full path to file status mapping
*/
public Map<String, FileStatus> getFullPathToFileStatus(Configuration hadoopConf, String basePath) {
public Map<String, FileStatus> getFullPathToFileStatus(Configuration hadoopConf, String basePath, String fileNamePattern) {
Map<String, FileStatus> fullPathToFileStatus = new HashMap<>();
GlobPattern globMatcher = new GlobPattern(fileNamePattern);
for (List<HoodieWriteStat> stats : getPartitionToWriteStats().values()) {
// Iterate through all the written files.
for (HoodieWriteStat stat : stats) {
String relativeFilePath = stat.getPath();
Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
if (fullPath != null) {
long blockSize = FSUtils.getFs(fullPath.toString(), hadoopConf).getDefaultBlockSize(fullPath);
FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), false, 0, blockSize,
0, fullPath);
fullPathToFileStatus.put(fullPath.getName(), fileStatus);
if (globMatcher.matches(relativeFilePath)) {
Path fullPath = relativeFilePath != null ? FSUtils.getPartitionPath(basePath, relativeFilePath) : null;
if (fullPath != null) {
long blockSize = FSUtils.getFs(fullPath.toString(), hadoopConf).getDefaultBlockSize(fullPath);
FileStatus fileStatus = new FileStatus(stat.getFileSizeInBytes(), false, 0, blockSize,
0, fullPath);
fullPathToFileStatus.put(fullPath.getName(), fileStatus);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ private static Map<String, FileStatus> getFilesToRead(
case COPY_ON_WRITE:
return metadata.getFileIdToFileStatus(hadoopConf, basePath);
case MERGE_ON_READ:
return metadata.getFullPathToFileStatus(hadoopConf, basePath);
return metadata.getFullPathToFileStatus(hadoopConf, basePath, "*");
default:
throw new AssertionError();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,

// build fileGroup from fsView
List<FileStatus> affectedFileStatus = Arrays.asList(HoodieInputFormatUtils
.listAffectedFilesForCommits(job, new Path(tableMetaClient.getBasePath()), metadataList));
.listAffectedFilesForCommits(job, new Path(tableMetaClient.getBasePath()), metadataList, "*"));
// step3
HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(tableMetaClient, commitsTimelineToReturn, affectedFileStatus.toArray(new FileStatus[0]));
// build fileGroup from fsView
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,12 @@ private static HoodieBaseFile refreshFileStatus(Configuration conf, HoodieBaseFi
*
* @return the affected file status array
*/
public static FileStatus[] listAffectedFilesForCommits(Configuration hadoopConf, Path basePath, List<HoodieCommitMetadata> metadataList) {
public static FileStatus[] listAffectedFilesForCommits(Configuration hadoopConf, Path basePath, List<HoodieCommitMetadata> metadataList, String fileNamePattern) {
// TODO: Use HoodieMetaTable to extract affected file directly.
HashMap<String, FileStatus> fullPathToFileStatus = new HashMap<>();
// Iterate through the given commits.
for (HoodieCommitMetadata metadata: metadataList) {
fullPathToFileStatus.putAll(metadata.getFullPathToFileStatus(hadoopConf, basePath.toString()));
fullPathToFileStatus.putAll(metadata.getFullPathToFileStatus(hadoopConf, basePath.toString(), fileNamePattern));
}
return fullPathToFileStatus.values().toArray(new FileStatus[0]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ object DataSourceReadOptions {
.withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
+ "instead of the full table. This option allows using glob pattern to directly filter on path.")

val INCR_PARTITION_GLOB: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.partition.glob")
.defaultValue("")
.withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
+ "instead of the full table. This option allows using glob pattern to directly filter on partition.")

val TIME_TRAVEL_AS_OF_INSTANT: ConfigProperty[String] = HoodieCommonConfig.TIMESTAMP_AS_OF

val ENABLE_DATA_SKIPPING: ConfigProperty[Boolean] = ConfigProperty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,14 @@ case class MergeOnReadIncrementalRelation(override val sqlContext: SQLContext,
val fsView = new HoodieTableFileSystemView(metaClient, timeline, affectedFilesInCommits)

val modifiedPartitions = getWritePartitionPaths(commitsMetadata)
val globMatcher = new GlobPattern("*" + globPartitionPattern)

modifiedPartitions.asScala.flatMap { relativePartitionPath =>
modifiedPartitions.asScala.filter(p => globMatcher.matches(p)).flatMap { relativePartitionPath =>
fsView.getLatestMergedFileSlicesBeforeOrOn(relativePartitionPath, latestCommit).iterator().asScala
}.toSeq
}

buildSplits(filterFileSlices(fileSlices, globPattern))
buildSplits(fileSlices)
}
}

Expand Down Expand Up @@ -163,7 +164,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
protected lazy val commitsMetadata = includedCommits.map(getCommitMetadata(_, super.timeline)).asJava

protected lazy val affectedFilesInCommits: Array[FileStatus] = {
listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata)
listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), commitsMetadata, "*" + globPathPattern)
}

// Record filters making sure that only records w/in the requested bounds are being fetched as part of the
Expand Down Expand Up @@ -201,8 +202,10 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
}
}

protected def globPattern: String =
protected def globPathPattern: String =
optParams.getOrElse(DataSourceReadOptions.INCR_PATH_GLOB.key, DataSourceReadOptions.INCR_PATH_GLOB.defaultValue)

protected def globPartitionPattern: String =
optParams.getOrElse(DataSourceReadOptions.INCR_PARTITION_GLOB.key, DataSourceReadOptions.INCR_PARTITION_GLOB.defaultValue)
}

0 comments on commit 9c4dd0f

Please sign in to comment.