From 2281b743cf8ca3fffd7eefe0a310b12649e0d118 Mon Sep 17 00:00:00 2001 From: Jingsong Lee Date: Thu, 26 Sep 2024 11:48:43 +0800 Subject: [PATCH] [core] Push bucket filter into manifest entry reader (#4258) --- .../org/apache/paimon/utils/BiFilter.java | 38 ++++++ .../apache/paimon/AppendOnlyFileStore.java | 48 ++++--- .../org/apache/paimon/KeyValueFileStore.java | 38 +++--- .../manifest/ManifestEntrySerializer.java | 4 + .../operation/AbstractFileStoreScan.java | 126 +++++------------- .../operation/AbstractFileStoreWrite.java | 33 ++++- .../operation/AppendOnlyFileStoreScan.java | 13 +- .../operation/AppendOnlyFileStoreWrite.java | 3 +- .../AppendOnlyFixedBucketFileStoreWrite.java | 2 + ...AppendOnlyUnawareBucketFileStoreWrite.java | 2 + ...Filter.java => BucketSelectConverter.java} | 41 ++---- .../paimon/operation/FileStoreScan.java | 3 + .../operation/KeyValueFileStoreScan.java | 14 +- .../operation/KeyValueFileStoreWrite.java | 1 + .../operation/MemoryFileStoreWrite.java | 4 + .../paimon/operation/metrics/ScanMetrics.java | 6 - .../paimon/operation/metrics/ScanStats.java | 13 +- ...st.java => BucketSelectConverterTest.java} | 22 +-- .../operation/metrics/ScanMetricsTest.java | 16 +-- 19 files changed, 195 insertions(+), 232 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/utils/BiFilter.java rename paimon-core/src/main/java/org/apache/paimon/operation/{ScanBucketFilter.java => BucketSelectConverter.java} (83%) rename paimon-core/src/test/java/org/apache/paimon/operation/{ScanBucketSelectorTest.java => BucketSelectConverterTest.java} (91%) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/BiFilter.java b/paimon-common/src/main/java/org/apache/paimon/utils/BiFilter.java new file mode 100644 index 000000000000..4e0f0d80d965 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/BiFilter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.predicate.Predicate; + +/** + * Represents a filter (boolean-valued function) of two argument. This class is for avoiding name + * conflicting to {@link Predicate}. + */ +@FunctionalInterface +public interface BiFilter { + + BiFilter ALWAYS_TRUE = (t, u) -> true; + + boolean test(T t, U u); + + @SuppressWarnings({"unchecked", "rawtypes"}) + static BiFilter alwaysTrue() { + return (BiFilter) ALWAYS_TRUE; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java index b0e3ea5f428e..30208cebb8cc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java @@ -27,8 +27,8 @@ import org.apache.paimon.operation.AppendOnlyFileStoreWrite; import org.apache.paimon.operation.AppendOnlyFixedBucketFileStoreWrite; import org.apache.paimon.operation.AppendOnlyUnawareBucketFileStoreWrite; +import org.apache.paimon.operation.BucketSelectConverter; import org.apache.paimon.operation.RawFileSplitRead; -import org.apache.paimon.operation.ScanBucketFilter; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -38,6 +38,7 @@ import java.util.Comparator; import java.util.List; +import java.util.Optional; import static org.apache.paimon.predicate.PredicateBuilder.and; import static org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping; @@ -106,6 +107,7 @@ public AppendOnlyFileStoreWrite newWrite( newRead(), schema.id(), rowType, + partitionType, pathFactory(), snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), @@ -119,6 +121,7 @@ public AppendOnlyFileStoreWrite newWrite( schema.id(), commitUser, rowType, + partitionType, pathFactory(), snapshotManager(), newScan(true).withManifestCacheFilter(manifestFilter), @@ -129,39 +132,34 @@ public AppendOnlyFileStoreWrite newWrite( } private AppendOnlyFileStoreScan newScan(boolean forWrite) { - ScanBucketFilter bucketFilter = - new ScanBucketFilter(bucketKeyType) { - @Override - public void pushdown(Predicate predicate) { - if (bucketMode() != BucketMode.HASH_FIXED) { - return; - } - - if (bucketKeyType.getFieldCount() == 0) { - return; - } - - List bucketFilters = - pickTransformFieldMapping( - splitAnd(predicate), - rowType.getFieldNames(), - bucketKeyType.getFieldNames()); - if (!bucketFilters.isEmpty()) { - setBucketKeyFilter(and(bucketFilters)); - } + BucketSelectConverter bucketSelectConverter = + predicate -> { + if (bucketMode() != BucketMode.HASH_FIXED) { + return Optional.empty(); } + + if (bucketKeyType.getFieldCount() == 0) { + return Optional.empty(); + } + + List bucketFilters = + pickTransformFieldMapping( + splitAnd(predicate), + rowType.getFieldNames(), + bucketKeyType.getFieldNames()); + if (!bucketFilters.isEmpty()) { + return BucketSelectConverter.create(and(bucketFilters), bucketKeyType); + } + return Optional.empty(); }; return new AppendOnlyFileStoreScan( newManifestsReader(forWrite), - partitionType, - bucketFilter, + bucketSelectConverter, snapshotManager(), schemaManager, schema, manifestFileFactory(forWrite), - options.bucket(), - forWrite, options.scanManifestParallelism(), options.fileIndexReadEnabled()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java index d2d1d9972fa1..cc4579898401 100644 --- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java @@ -28,11 +28,11 @@ import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.mergetree.compact.MergeFunctionFactory; +import org.apache.paimon.operation.BucketSelectConverter; import org.apache.paimon.operation.KeyValueFileStoreScan; import org.apache.paimon.operation.KeyValueFileStoreWrite; import org.apache.paimon.operation.MergeFileSplitRead; import org.apache.paimon.operation.RawFileSplitRead; -import org.apache.paimon.operation.ScanBucketFilter; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.KeyValueFieldsExtractor; import org.apache.paimon.schema.SchemaManager; @@ -50,6 +50,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Supplier; @@ -210,35 +211,30 @@ private Map format2PathFactory() { } private KeyValueFileStoreScan newScan(boolean forWrite) { - ScanBucketFilter bucketFilter = - new ScanBucketFilter(bucketKeyType) { - @Override - public void pushdown(Predicate keyFilter) { - if (bucketMode() != BucketMode.HASH_FIXED) { - return; - } - - List bucketFilters = - pickTransformFieldMapping( - splitAnd(keyFilter), - keyType.getFieldNames(), - bucketKeyType.getFieldNames()); - if (bucketFilters.size() > 0) { - setBucketKeyFilter(and(bucketFilters)); - } + BucketSelectConverter bucketSelectConverter = + keyFilter -> { + if (bucketMode() != BucketMode.HASH_FIXED) { + return Optional.empty(); } + + List bucketFilters = + pickTransformFieldMapping( + splitAnd(keyFilter), + keyType.getFieldNames(), + bucketKeyType.getFieldNames()); + if (bucketFilters.size() > 0) { + return BucketSelectConverter.create(and(bucketFilters), bucketKeyType); + } + return Optional.empty(); }; return new KeyValueFileStoreScan( newManifestsReader(forWrite), - partitionType, - bucketFilter, + bucketSelectConverter, snapshotManager(), schemaManager, schema, keyValueFieldsExtractor, manifestFileFactory(forWrite), - options.bucket(), - forWrite, options.scanManifestParallelism(), options.deletionVectorsEnabled(), options.mergeEngine(), diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java index 2c3ba2aeaab3..b1030448a7e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java @@ -96,4 +96,8 @@ public static Function totalBucketGetter() { public static Function fileNameGetter() { return row -> row.getRow(5, DataFileMeta.SCHEMA.getFieldCount()).getString(0).toString(); } + + public static Function levelGetter() { + return row -> row.getRow(5, DataFileMeta.SCHEMA.getFieldCount()).getInt(10); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index d043932810c3..dd89944cb3f1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -18,7 +18,6 @@ package org.apache.paimon.operation; -import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; @@ -38,8 +37,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.ScanMode; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.BiFilter; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; @@ -68,20 +66,17 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private final ManifestsReader manifestsReader; - private final RowType partitionType; private final SnapshotManager snapshotManager; private final ManifestFile.Factory manifestFileFactory; - private final int numOfBuckets; - private final boolean checkNumOfBuckets; private final Integer parallelism; private final ConcurrentMap tableSchemas; private final SchemaManager schemaManager; private final TableSchema schema; - protected final ScanBucketFilter bucketKeyFilter; private Snapshot specifiedSnapshot = null; private Filter bucketFilter = null; + private BiFilter totalAwareBucketFilter = null; private List specifiedManifests = null; protected ScanMode scanMode = ScanMode.ALL; private Filter levelFilter = null; @@ -93,24 +88,16 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { public AbstractFileStoreScan( ManifestsReader manifestsReader, - RowType partitionType, - ScanBucketFilter bucketKeyFilter, SnapshotManager snapshotManager, SchemaManager schemaManager, TableSchema schema, ManifestFile.Factory manifestFileFactory, - int numOfBuckets, - boolean checkNumOfBuckets, @Nullable Integer parallelism) { this.manifestsReader = manifestsReader; - this.partitionType = partitionType; - this.bucketKeyFilter = bucketKeyFilter; this.snapshotManager = snapshotManager; this.schemaManager = schemaManager; this.schema = schema; this.manifestFileFactory = manifestFileFactory; - this.numOfBuckets = numOfBuckets; - this.checkNumOfBuckets = checkNumOfBuckets; this.tableSchemas = new ConcurrentHashMap<>(); this.parallelism = parallelism; } @@ -145,6 +132,13 @@ public FileStoreScan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public FileStoreScan withTotalAwareBucketFilter( + BiFilter totalAwareBucketFilter) { + this.totalAwareBucketFilter = totalAwareBucketFilter; + return this; + } + @Override public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) { if (manifestCacheFilter != null && manifestFileFactory.isCacheEnabled()) { @@ -229,7 +223,6 @@ public ManifestsReader manifestsReader() { @Override public Plan plan() { - Pair> planResult = doPlan(); final Snapshot readSnapshot = planResult.getLeft(); @@ -301,45 +294,14 @@ private Pair> doPlan() { Collection mergedEntries = readAndMergeFileEntries(manifests, this::readManifest); - List files = new ArrayList<>(); long skippedByPartitionAndStats = startDataFiles - mergedEntries.size(); - for (ManifestEntry file : mergedEntries) { - if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) { - String partInfo = - partitionType.getFieldCount() > 0 - ? "partition " - + FileStorePathFactory.getPartitionComputer( - partitionType, - CoreOptions.PARTITION_DEFAULT_NAME - .defaultValue()) - .generatePartValues(file.partition()) - : "table"; - throw new RuntimeException( - String.format( - "Try to write %s with a new bucket num %d, but the previous bucket num is %d. " - + "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", - partInfo, numOfBuckets, file.totalBuckets())); - } - // bucket filter should not be applied along with partition filter - // because the specifiedBucket is computed against the current - // numOfBuckets - // however entry.bucket() was computed against the old numOfBuckets - // and thus the filtered manifest entries might be empty - // which renders the bucket check invalid - if (filterMergedManifestEntry(file)) { - files.add(file); - } - } - - long afterBucketFilter = files.size(); - long skippedByBucketAndLevelFilter = mergedEntries.size() - files.size(); // We group files by bucket here, and filter them by the whole bucket filter. // Why do this: because in primary key table, we can't just filter the value // by the stat in files (see `PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`), // but we can do this by filter the whole bucket files - files = - files.stream() + List files = + mergedEntries.stream() .collect( Collectors.groupingBy( // we use LinkedHashMap to avoid disorder @@ -352,13 +314,10 @@ private Pair> doPlan() { .flatMap(Collection::stream) .collect(Collectors.toList()); - long skippedByWholeBucketFiles = afterBucketFilter - files.size(); + long skippedByWholeBucketFiles = mergedEntries.size() - files.size(); long scanDuration = (System.nanoTime() - started) / 1_000_000; checkState( - startDataFiles - - skippedByPartitionAndStats - - skippedByBucketAndLevelFilter - - skippedByWholeBucketFiles + startDataFiles - skippedByPartitionAndStats - skippedByWholeBucketFiles == files.size()); if (scanMetrics != null) { scanMetrics.reportScan( @@ -366,7 +325,6 @@ private Pair> doPlan() { scanDuration, manifests.size(), skippedByPartitionAndStats, - skippedByBucketAndLevelFilter, skippedByWholeBucketFiles, files.size())); } @@ -401,13 +359,6 @@ protected TableSchema scanTableSchema(long id) { /** Note: Keep this thread-safe. */ protected abstract boolean filterByStats(ManifestEntry entry); - /** Note: Keep this thread-safe. */ - private boolean filterMergedManifestEntry(ManifestEntry entry) { - return (bucketFilter == null || bucketFilter.test(entry.bucket())) - && bucketKeyFilter.select(entry.bucket(), entry.totalBuckets()) - && (levelFilter == null || levelFilter.test(entry.file().level())); - } - /** Note: Keep this thread-safe. */ protected abstract List filterWholeBucketByStats(List entries); @@ -420,12 +371,8 @@ public List readManifest(ManifestFileMeta manifest) { .read( manifest.fileName(), manifest.fileSize(), - createCacheRowFilter(manifestCacheFilter, numOfBuckets), - createEntryRowFilter( - manifestsReader.partitionFilter(), - bucketFilter, - fileNameFilter, - numOfBuckets)); + createCacheRowFilter(), + createEntryRowFilter()); List filteredEntries = new ArrayList<>(entries.size()); for (ManifestEntry entry : entries) { if ((manifestEntryFilter == null || manifestEntryFilter.test(entry)) @@ -446,12 +393,8 @@ private List readSimpleEntries(ManifestFileMeta manifest) { // use filter for ManifestEntry // currently, projection is not pushed down to file format // see SimpleFileEntrySerializer - createCacheRowFilter(manifestCacheFilter, numOfBuckets), - createEntryRowFilter( - manifestsReader.partitionFilter(), - bucketFilter, - fileNameFilter, - numOfBuckets)); + createCacheRowFilter(), + createEntryRowFilter()); } /** @@ -460,8 +403,7 @@ private List readSimpleEntries(ManifestFileMeta manifest) { * *

Implemented to {@link InternalRow} is for performance (No deserialization). */ - private static Filter createCacheRowFilter( - @Nullable ManifestCacheFilter manifestCacheFilter, int numOfBuckets) { + private Filter createCacheRowFilter() { if (manifestCacheFilter == null) { return Filter.alwaysTrue(); } @@ -469,15 +411,7 @@ private static Filter createCacheRowFilter( Function partitionGetter = ManifestEntrySerializer.partitionGetter(); Function bucketGetter = ManifestEntrySerializer.bucketGetter(); - Function totalBucketGetter = - ManifestEntrySerializer.totalBucketGetter(); - return row -> { - if (numOfBuckets != totalBucketGetter.apply(row)) { - return true; - } - - return manifestCacheFilter.test(partitionGetter.apply(row), bucketGetter.apply(row)); - }; + return row -> manifestCacheFilter.test(partitionGetter.apply(row), bucketGetter.apply(row)); } /** @@ -485,25 +419,31 @@ private static Filter createCacheRowFilter( * *

Implemented to {@link InternalRow} is for performance (No deserialization). */ - private static Filter createEntryRowFilter( - @Nullable PartitionPredicate partitionFilter, - @Nullable Filter bucketFilter, - @Nullable Filter fileNameFilter, - int numOfBuckets) { + private Filter createEntryRowFilter() { Function partitionGetter = ManifestEntrySerializer.partitionGetter(); Function bucketGetter = ManifestEntrySerializer.bucketGetter(); Function totalBucketGetter = ManifestEntrySerializer.totalBucketGetter(); Function fileNameGetter = ManifestEntrySerializer.fileNameGetter(); + PartitionPredicate partitionFilter = manifestsReader.partitionFilter(); + Function levelGetter = ManifestEntrySerializer.levelGetter(); return row -> { if ((partitionFilter != null && !partitionFilter.test(partitionGetter.apply(row)))) { return false; } - if (bucketFilter != null - && numOfBuckets == totalBucketGetter.apply(row) - && !bucketFilter.test(bucketGetter.apply(row))) { + int bucket = bucketGetter.apply(row); + if (bucketFilter != null && !bucketFilter.test(bucket)) { + return false; + } + + if (totalAwareBucketFilter != null + && !totalAwareBucketFilter.test(bucket, totalBucketGetter.apply(row))) { + return false; + } + + if (levelFilter != null && !levelFilter.test(levelGetter.apply(row))) { return false; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 2e502e96f328..dab20d642cb9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -34,6 +34,7 @@ import org.apache.paimon.operation.metrics.CompactionMetrics; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.RecordWriter; @@ -54,7 +55,9 @@ import java.util.concurrent.Executors; import java.util.function.Function; +import static org.apache.paimon.CoreOptions.PARTITION_DEFAULT_NAME; import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber; +import static org.apache.paimon.utils.FileStorePathFactory.getPartitionComputer; /** * Base {@link FileStoreWrite} implementation. @@ -70,6 +73,8 @@ public abstract class AbstractFileStoreWrite implements FileStoreWrite { private final int writerNumberMax; @Nullable private final IndexMaintainer.Factory indexFactory; @Nullable private final DeletionVectorsMaintainer.Factory dvMaintainerFactory; + private final int totalBuckets; + private final RowType partitionType; @Nullable protected IOManager ioManager; @@ -90,11 +95,15 @@ protected AbstractFileStoreWrite( @Nullable IndexMaintainer.Factory indexFactory, @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, String tableName, + int totalBuckets, + RowType partitionType, int writerNumberMax) { this.snapshotManager = snapshotManager; this.scan = scan; this.indexFactory = indexFactory; this.dvMaintainerFactory = dvMaintainerFactory; + this.totalBuckets = totalBuckets; + this.partitionType = partitionType; this.writers = new HashMap<>(); this.tableName = tableName; this.writerNumberMax = writerNumberMax; @@ -451,10 +460,26 @@ public FileStoreWrite withMetricRegistry(MetricRegistry metricRegistry) { private List scanExistingFileMetas( long snapshotId, BinaryRow partition, int bucket) { List existingFileMetas = new ArrayList<>(); - // Concat all the DataFileMeta of existing files into existingFileMetas. - scan.withSnapshot(snapshotId).withPartitionBucket(partition, bucket).plan().files().stream() - .map(ManifestEntry::file) - .forEach(existingFileMetas::add); + List files = + scan.withSnapshot(snapshotId).withPartitionBucket(partition, bucket).plan().files(); + for (ManifestEntry entry : files) { + if (entry.totalBuckets() != totalBuckets) { + String partInfo = + partitionType.getFieldCount() > 0 + ? "partition " + + getPartitionComputer( + partitionType, + PARTITION_DEFAULT_NAME.defaultValue()) + .generatePartValues(partition) + : "table"; + throw new RuntimeException( + String.format( + "Try to write %s with a new bucket num %d, but the previous bucket num is %d. " + + "Please switch to batch mode, and perform INSERT OVERWRITE to rescale current data layout first.", + partInfo, totalBuckets, entry.totalBuckets())); + } + existingFileMetas.add(entry.file()); + } return existingFileMetas; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index b911d38841a0..a3bc9c22dc53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -41,6 +41,7 @@ /** {@link FileStoreScan} for {@link AppendOnlyFileStore}. */ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan { + private final BucketSelectConverter bucketSelectConverter; private final SimpleStatsConverters simpleStatsConverters; private final boolean fileIndexReadEnabled; @@ -52,27 +53,21 @@ public class AppendOnlyFileStoreScan extends AbstractFileStoreScan { public AppendOnlyFileStoreScan( ManifestsReader manifestsReader, - RowType partitionType, - ScanBucketFilter bucketFilter, + BucketSelectConverter bucketSelectConverter, SnapshotManager snapshotManager, SchemaManager schemaManager, TableSchema schema, ManifestFile.Factory manifestFileFactory, - int numOfBuckets, - boolean checkNumOfBuckets, Integer scanManifestParallelism, boolean fileIndexReadEnabled) { super( manifestsReader, - partitionType, - bucketFilter, snapshotManager, schemaManager, schema, manifestFileFactory, - numOfBuckets, - checkNumOfBuckets, scanManifestParallelism); + this.bucketSelectConverter = bucketSelectConverter; this.simpleStatsConverters = new SimpleStatsConverters(sid -> scanTableSchema(sid).fields(), schema.id()); this.fileIndexReadEnabled = fileIndexReadEnabled; @@ -80,7 +75,7 @@ public AppendOnlyFileStoreScan( public AppendOnlyFileStoreScan withFilter(Predicate predicate) { this.filter = predicate; - this.bucketKeyFilter.pushdown(predicate); + this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter); return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 40fe5dbfafc5..0a4d5d56a13e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -80,13 +80,14 @@ public AppendOnlyFileStoreWrite( RawFileSplitRead read, long schemaId, RowType rowType, + RowType partitionType, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, String tableName) { - super(snapshotManager, scan, options, null, dvMaintainerFactory, tableName); + super(snapshotManager, scan, options, partitionType, null, dvMaintainerFactory, tableName); this.fileIO = fileIO; this.read = read; this.schemaId = schemaId; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java index e169c0165b4a..c58bad9a9796 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFixedBucketFileStoreWrite.java @@ -49,6 +49,7 @@ public AppendOnlyFixedBucketFileStoreWrite( long schemaId, String commitUser, RowType rowType, + RowType partitionType, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @@ -60,6 +61,7 @@ public AppendOnlyFixedBucketFileStoreWrite( read, schemaId, rowType, + partitionType, pathFactory, snapshotManager, scan, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java index f33b207bb09f..e509b589944d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyUnawareBucketFileStoreWrite.java @@ -46,6 +46,7 @@ public AppendOnlyUnawareBucketFileStoreWrite( RawFileSplitRead read, long schemaId, RowType rowType, + RowType partitionType, FileStorePathFactory pathFactory, SnapshotManager snapshotManager, FileStoreScan scan, @@ -57,6 +58,7 @@ public AppendOnlyUnawareBucketFileStoreWrite( read, schemaId, rowType, + partitionType, pathFactory, snapshotManager, scan, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ScanBucketFilter.java b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java similarity index 83% rename from paimon-core/src/main/java/org/apache/paimon/operation/ScanBucketFilter.java rename to paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java index 1fcd32f04ecc..1c308c5daf51 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ScanBucketFilter.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java @@ -28,6 +28,7 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.sink.KeyAndBucketExtractor; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BiFilter; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet; @@ -47,30 +48,14 @@ import static org.apache.paimon.predicate.PredicateBuilder.splitOr; /** Bucket filter push down in scan to skip files. */ -public abstract class ScanBucketFilter { +public interface BucketSelectConverter { - public static final int MAX_VALUES = 1000; + int MAX_VALUES = 1000; - private final RowType bucketKeyType; + Optional> convert(Predicate predicate); - private ScanBucketSelector selector; - - public ScanBucketFilter(RowType bucketKeyType) { - this.bucketKeyType = bucketKeyType; - } - - public abstract void pushdown(Predicate predicate); - - public void setBucketKeyFilter(Predicate predicate) { - this.selector = create(predicate, bucketKeyType).orElse(null); - } - - public boolean select(int bucket, int numBucket) { - return selector == null || selector.select(bucket, numBucket); - } - - @VisibleForTesting - static Optional create(Predicate bucketPredicate, RowType bucketKeyType) { + static Optional> create( + Predicate bucketPredicate, RowType bucketKeyType) { @SuppressWarnings("unchecked") List[] bucketValues = new List[bucketKeyType.getFieldCount()]; @@ -127,15 +112,15 @@ static Optional create(Predicate bucketPredicate, RowType bu new ArrayList<>(), 0); - return Optional.of(new ScanBucketSelector(hashCodes.stream().mapToInt(i -> i).toArray())); + return Optional.of(new Selector(hashCodes.stream().mapToInt(i -> i).toArray())); } - private static int hash(List columns, InternalRowSerializer serializer) { + static int hash(List columns, InternalRowSerializer serializer) { BinaryRow binaryRow = serializer.toBinaryRow(GenericRow.of(columns.toArray())); return KeyAndBucketExtractor.bucketKeyHashCode(binaryRow); } - private static void assembleRows( + static void assembleRows( List[] rowValues, Consumer> consumer, List stack, @@ -155,18 +140,18 @@ private static void assembleRows( /** Selector to select bucket from {@link Predicate}. */ @ThreadSafe - public static class ScanBucketSelector { + class Selector implements BiFilter { private final int[] hashCodes; private final Map> buckets = new ConcurrentHashMap<>(); - public ScanBucketSelector(int[] hashCodes) { + public Selector(int[] hashCodes) { this.hashCodes = hashCodes; } - @VisibleForTesting - boolean select(int bucket, int numBucket) { + @Override + public boolean test(Integer bucket, Integer numBucket) { return buckets.computeIfAbsent(numBucket, k -> createBucketSet(numBucket)) .contains(bucket); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 197b4dff4962..fdae3f75bf98 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -32,6 +32,7 @@ import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.utils.BiFilter; import org.apache.paimon.utils.Filter; import javax.annotation.Nullable; @@ -57,6 +58,8 @@ public interface FileStoreScan { FileStoreScan withBucketFilter(Filter bucketFilter); + FileStoreScan withTotalAwareBucketFilter(BiFilter bucketFilter); + FileStoreScan withPartitionBucket(BinaryRow partition, int bucket); FileStoreScan withSnapshot(long snapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 5b067de087cd..3311161b54a5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -31,7 +31,6 @@ import org.apache.paimon.stats.SimpleStatsConverter; import org.apache.paimon.stats.SimpleStatsConverters; import org.apache.paimon.table.source.ScanMode; -import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; import java.util.ArrayList; @@ -47,6 +46,7 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan { private final SimpleStatsConverters fieldKeyStatsConverters; private final SimpleStatsConverters fieldValueStatsConverters; + private final BucketSelectConverter bucketSelectConverter; private Predicate keyFilter; private Predicate valueFilter; @@ -56,30 +56,24 @@ public class KeyValueFileStoreScan extends AbstractFileStoreScan { public KeyValueFileStoreScan( ManifestsReader manifestsReader, - RowType partitionType, - ScanBucketFilter bucketFilter, + BucketSelectConverter bucketSelectConverter, SnapshotManager snapshotManager, SchemaManager schemaManager, TableSchema schema, KeyValueFieldsExtractor keyValueFieldsExtractor, ManifestFile.Factory manifestFileFactory, - int numOfBuckets, - boolean checkNumOfBuckets, Integer scanManifestParallelism, boolean deletionVectorsEnabled, MergeEngine mergeEngine, ChangelogProducer changelogProducer) { super( manifestsReader, - partitionType, - bucketFilter, snapshotManager, schemaManager, schema, manifestFileFactory, - numOfBuckets, - checkNumOfBuckets, scanManifestParallelism); + this.bucketSelectConverter = bucketSelectConverter; this.fieldKeyStatsConverters = new SimpleStatsConverters( sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), @@ -95,7 +89,7 @@ public KeyValueFileStoreScan( public KeyValueFileStoreScan withKeyFilter(Predicate predicate) { this.keyFilter = predicate; - this.bucketKeyFilter.pushdown(predicate); + this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter); return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 51dd395a96e6..d061e181618b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -136,6 +136,7 @@ public KeyValueFileStoreWrite( snapshotManager, scan, options, + partitionType, indexFactory, deletionVectorsMaintainerFactory, tableName); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java index 386a72cf5920..b7feeead4bbb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java @@ -28,6 +28,7 @@ import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.metrics.WriterBufferMetric; import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; @@ -62,6 +63,7 @@ public MemoryFileStoreWrite( SnapshotManager snapshotManager, FileStoreScan scan, CoreOptions options, + RowType partitionType, @Nullable IndexMaintainer.Factory indexFactory, @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory, String tableName) { @@ -71,6 +73,8 @@ public MemoryFileStoreWrite( indexFactory, dvMaintainerFactory, tableName, + options.bucket(), + partitionType, options.writeMaxWritersToSpill()); this.options = options; this.cacheManager = new CacheManager(options.lookupCacheMaxMemory()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java index 4bf48a78eebc..9fcbb8960fc5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanMetrics.java @@ -52,9 +52,6 @@ public MetricGroup getMetricGroup() { public static final String LAST_SKIPPED_BY_PARTITION_AND_STATS = "lastSkippedByPartitionAndStats"; - public static final String LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER = - "lastSkippedByBucketAndLevelFilter"; - public static final String LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER = "lastSkippedByWholeBucketFilesFilter"; @@ -72,9 +69,6 @@ private void registerGenericScanMetrics() { metricGroup.gauge( LAST_SKIPPED_BY_PARTITION_AND_STATS, () -> latestScan == null ? 0L : latestScan.getSkippedByPartitionAndStats()); - metricGroup.gauge( - LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER, - () -> latestScan == null ? 0L : latestScan.getSkippedByBucketAndLevelFilter()); metricGroup.gauge( LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER, () -> latestScan == null ? 0L : latestScan.getSkippedByWholeBucketFiles()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java index 21f905df392c..e760282e687a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/ScanStats.java @@ -26,7 +26,6 @@ public class ScanStats { private final long duration; private final long scannedManifests; private final long skippedByPartitionAndStats; - private final long skippedByBucketAndLevelFilter; private final long skippedByWholeBucketFiles; private final long skippedTableFiles; @@ -36,18 +35,13 @@ public ScanStats( long duration, long scannedManifests, long skippedByPartitionAndStats, - long skippedByBucketAndLevelFilter, long skippedByWholeBucketFiles, long resultedTableFiles) { this.duration = duration; this.scannedManifests = scannedManifests; this.skippedByPartitionAndStats = skippedByPartitionAndStats; - this.skippedByBucketAndLevelFilter = skippedByBucketAndLevelFilter; this.skippedByWholeBucketFiles = skippedByWholeBucketFiles; - this.skippedTableFiles = - skippedByPartitionAndStats - + skippedByBucketAndLevelFilter - + skippedByWholeBucketFiles; + this.skippedTableFiles = skippedByPartitionAndStats + skippedByWholeBucketFiles; this.resultedTableFiles = resultedTableFiles; } @@ -71,11 +65,6 @@ protected long getSkippedByPartitionAndStats() { return skippedByPartitionAndStats; } - @VisibleForTesting - protected long getSkippedByBucketAndLevelFilter() { - return skippedByBucketAndLevelFilter; - } - @VisibleForTesting protected long getSkippedByWholeBucketFiles() { return skippedByWholeBucketFiles; diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ScanBucketSelectorTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java similarity index 91% rename from paimon-core/src/test/java/org/apache/paimon/operation/ScanBucketSelectorTest.java rename to paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java index 3e7cf739ab69..ae5ae9b466e6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ScanBucketSelectorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java @@ -18,7 +18,7 @@ package org.apache.paimon.operation; -import org.apache.paimon.operation.ScanBucketFilter.ScanBucketSelector; +import org.apache.paimon.operation.BucketSelectConverter.Selector; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.types.IntType; @@ -33,8 +33,8 @@ import static org.apache.paimon.predicate.PredicateBuilder.or; import static org.assertj.core.api.Assertions.assertThat; -/** Test for {@link ScanBucketSelector}. */ -public class ScanBucketSelectorTest { +/** Test for {@link BucketSelectConverter}. */ +public class BucketSelectConverterTest { private final RowType rowType = RowType.of(new IntType(), new IntType(), new IntType()); @@ -68,7 +68,7 @@ public void testOrIllegal() { @Test public void testNormal() { - ScanBucketSelector selector = + Selector selector = newSelector(and(builder.equal(0, 0), builder.equal(1, 1), builder.equal(2, 2))) .get(); assertThat(selector.hashCodes()).containsExactly(1141287431); @@ -77,7 +77,7 @@ public void testNormal() { @Test public void testIn() { - ScanBucketSelector selector = + Selector selector = newSelector( and( builder.in(0, Arrays.asList(5, 6, 7)), @@ -90,7 +90,7 @@ public void testIn() { @Test public void testOr() { - ScanBucketSelector selector = + Selector selector = newSelector( and( or( @@ -106,7 +106,7 @@ public void testOr() { @Test public void testInNull() { - ScanBucketSelector selector = + Selector selector = newSelector( and( builder.in(0, Arrays.asList(5, 6, 7, null)), @@ -119,7 +119,7 @@ public void testInNull() { @Test public void testMultipleIn() { - ScanBucketSelector selector = + Selector selector = newSelector( and( builder.in(0, Arrays.asList(5, 6, 7)), @@ -134,7 +134,7 @@ public void testMultipleIn() { @Test public void testMultipleOr() { - ScanBucketSelector selector = + Selector selector = newSelector( and( or( @@ -150,7 +150,7 @@ public void testMultipleOr() { assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9, 10, 19); } - private Optional newSelector(Predicate predicate) { - return ScanBucketFilter.create(predicate, rowType); + private Optional newSelector(Predicate predicate) { + return (Optional) BucketSelectConverter.create(predicate, rowType); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java index 5b41d0258316..2b9d0e0cb728 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/metrics/ScanMetricsTest.java @@ -50,7 +50,6 @@ public void testGenericMetricsRegistration() { ScanMetrics.LAST_SCAN_SKIPPED_TABLE_FILES, ScanMetrics.LAST_SCAN_RESULTED_TABLE_FILES, ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS, - ScanMetrics.LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER, ScanMetrics.LAST_SKIPPED_BY_WHOLE_BUCKET_FILES_FILTER); } @@ -71,10 +70,6 @@ public void testMetricsAreUpdated() { (Gauge) registeredGenericMetrics.get( ScanMetrics.LAST_SKIPPED_BY_PARTITION_AND_STATS); - Gauge lastSkippedByBucketAndLevelFilter = - (Gauge) - registeredGenericMetrics.get( - ScanMetrics.LAST_SKIPPED_BY_BUCKET_AND_LEVEL_FILTER); Gauge lastSkippedByWholeBucketFilesFilter = (Gauge) registeredGenericMetrics.get( @@ -91,7 +86,6 @@ public void testMetricsAreUpdated() { assertThat(scanDuration.getStatistics().size()).isEqualTo(0); assertThat(lastScannedManifests.getValue()).isEqualTo(0); assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(0); - assertThat(lastSkippedByBucketAndLevelFilter.getValue()).isEqualTo(0); assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(0); assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(0); assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(0); @@ -111,9 +105,8 @@ public void testMetricsAreUpdated() { assertThat(scanDuration.getStatistics().getStdDev()).isEqualTo(0); assertThat(lastScannedManifests.getValue()).isEqualTo(20); assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(25); - assertThat(lastSkippedByBucketAndLevelFilter.getValue()).isEqualTo(40); assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(32); - assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(97); + assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(57); assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(10); // report again @@ -131,19 +124,18 @@ public void testMetricsAreUpdated() { assertThat(scanDuration.getStatistics().getStdDev()).isCloseTo(212.132, offset(0.001)); assertThat(lastScannedManifests.getValue()).isEqualTo(22); assertThat(lastSkippedByPartitionAndStats.getValue()).isEqualTo(30); - assertThat(lastSkippedByBucketAndLevelFilter.getValue()).isEqualTo(42); assertThat(lastSkippedByWholeBucketFilesFilter.getValue()).isEqualTo(33); - assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(105); + assertThat(lastScanSkippedTableFiles.getValue()).isEqualTo(63); assertThat(lastScanResultedTableFiles.getValue()).isEqualTo(8); } private void reportOnce(ScanMetrics scanMetrics) { - ScanStats scanStats = new ScanStats(200, 20, 25, 40, 32, 10); + ScanStats scanStats = new ScanStats(200, 20, 25, 32, 10); scanMetrics.reportScan(scanStats); } private void reportAgain(ScanMetrics scanMetrics) { - ScanStats scanStats = new ScanStats(500, 22, 30, 42, 33, 8); + ScanStats scanStats = new ScanStats(500, 22, 30, 33, 8); scanMetrics.reportScan(scanStats); }