diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 418149ae3c3d..65f084aa2da9 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -35,7 +35,7 @@
changelog-producer.row-deduplicate
false -

Boolean

+ Boolean Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction. @@ -300,7 +300,7 @@
scan.manifest.parallelism
(none) Integer - The parallelism of scanning manifest files, default value is the size of cpu processor.Note: Scale-up this parameter will increase memory usage while scanning manifest files.We can consider downsize it when we encounter an out of memory exception while scanning + The parallelism of scanning manifest files, default value is the size of cpu processor. Note: Scale-up this parameter will increase memory usage while scanning manifest files. We can consider downsize it when we encounter an out of memory exception while scanning
scan.mode
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java index ad7ccda11a55..c49833e0b97c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java @@ -399,8 +399,8 @@ public class CoreOptions implements Serializable { .intType() .noDefaultValue() .withDescription( - "The parallelism of scanning manifest files, default value is the size of cpu processor." - + "Note: Scale-up this parameter will increase memory usage while scanning manifest files." + "The parallelism of scanning manifest files, default value is the size of cpu processor. " + + "Note: Scale-up this parameter will increase memory usage while scanning manifest files. " + "We can consider downsize it when we encounter an out of memory exception while scanning"); public static final ConfigOption LOG_CONSISTENCY = diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java deleted file mode 100644 index cdd54cc0a9de..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/AbstractManifestEntry.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.manifest; - -import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.utils.FileStorePathFactory; -import org.apache.paimon.utils.Preconditions; - -import java.util.Collection; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Objects; - -/** Abstract a simplest model of manifest file. */ -public abstract class AbstractManifestEntry { - protected final FileKind kind; - protected final String fileName; - // for tables without partition this field should be a row with 0 columns (not null) - protected final BinaryRow partition; - protected final int bucket; - protected final int totalBuckets; - protected final int level; - - public AbstractManifestEntry( - FileKind kind, - String fileName, - BinaryRow partition, - int bucket, - int totalBuckets, - int level) { - this.kind = kind; - this.fileName = fileName; - this.partition = partition; - this.bucket = bucket; - this.totalBuckets = totalBuckets; - this.level = level; - } - - public FileKind kind() { - return kind; - } - - public BinaryRow partition() { - return partition; - } - - public int bucket() { - return bucket; - } - - public int totalBuckets() { - return totalBuckets; - } - - public int level() { - return level; - } - - public Identifier identifier() { - return new Identifier(partition, bucket, level, fileName); - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof AbstractManifestEntry)) { - return false; - } - AbstractManifestEntry that = (AbstractManifestEntry) o; - return Objects.equals(kind, that.kind) - && Objects.equals(partition, that.partition) - && bucket == that.bucket - && level == that.level - && Objects.equals(fileName, that.fileName); - } - - @Override - public int hashCode() { - return Objects.hash(kind, partition, bucket, level, fileName); - } - - @Override - public String toString() { - return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, level, fileName); - } - - public static Collection mergeEntries( - Iterable entries) { - LinkedHashMap map = new LinkedHashMap<>(); - mergeEntries(entries, map); - return map.values(); - } - - public static void mergeEntries( - Iterable entries, Map map) { - for (T entry : entries) { - Identifier identifier = entry.identifier(); - switch (entry.kind()) { - case ADD: - Preconditions.checkState( - !map.containsKey(identifier), - "Trying to add file %s which is already added. Manifest might be corrupted.", - identifier); - map.put(identifier, entry); - break; - case DELETE: - // each dataFile will only be added once and deleted once, - // if we know that it is added before then both add and delete entry can be - // removed because there won't be further operations on this file, - // otherwise we have to keep the delete entry because the add entry must be - // in the previous manifest files - if (map.containsKey(identifier)) { - map.remove(identifier); - } else { - map.put(identifier, entry); - } - break; - default: - throw new UnsupportedOperationException( - "Unknown value kind " + entry.kind().name()); - } - } - } - - /** - * The same {@link Identifier} indicates that the {@link AbstractManifestEntry} refers to the - * same data file. - */ - public static class Identifier { - public final BinaryRow partition; - public final int bucket; - public final int level; - public final String fileName; - - private Identifier(BinaryRow partition, int bucket, int level, String fileName) { - this.partition = partition; - this.bucket = bucket; - this.level = level; - this.fileName = fileName; - } - - @Override - public boolean equals(Object o) { - if (!(o instanceof Identifier)) { - return false; - } - Identifier that = (Identifier) o; - return Objects.equals(partition, that.partition) - && bucket == that.bucket - && level == that.level - && Objects.equals(fileName, that.fileName); - } - - @Override - public int hashCode() { - return Objects.hash(partition, bucket, level, fileName); - } - - @Override - public String toString() { - return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName); - } - - public String toString(FileStorePathFactory pathFactory) { - return pathFactory.getPartitionString(partition) - + ", bucket " - + bucket - + ", level " - + level - + ", file " - + fileName; - } - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index 5ad9f9972fc2..1c175e589be5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -24,30 +24,61 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.types.TinyIntType; +import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Preconditions; import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.Objects; import static org.apache.paimon.utils.SerializationUtils.newBytesType; /** Entry of a manifest file, representing an addition / deletion of a data file. */ -public class ManifestEntry extends AbstractManifestEntry { +public class ManifestEntry { + private final FileKind kind; + // for tables without partition this field should be a row with 0 columns (not null) + private final BinaryRow partition; + private final int bucket; + private final int totalBuckets; private final DataFileMeta file; public ManifestEntry( FileKind kind, BinaryRow partition, int bucket, int totalBuckets, DataFileMeta file) { - super(kind, file.fileName(), partition, bucket, totalBuckets, file.level()); + this.kind = kind; + this.partition = partition; + this.bucket = bucket; + this.totalBuckets = totalBuckets; this.file = file; } + public FileKind kind() { + return kind; + } + + public BinaryRow partition() { + return partition; + } + + public int bucket() { + return bucket; + } + + public int totalBuckets() { + return totalBuckets; + } + public DataFileMeta file() { return file; } + public Identifier identifier() { + return new Identifier(partition, bucket, file.level(), file.fileName()); + } + public static RowType schema() { List fields = new ArrayList<>(); fields.add(new DataField(0, "_KIND", new TinyIntType(false))); @@ -81,6 +112,43 @@ public String toString() { return String.format("{%s, %s, %d, %d, %s}", kind, partition, bucket, totalBuckets, file); } + public static Collection mergeEntries(Iterable entries) { + LinkedHashMap map = new LinkedHashMap<>(); + mergeEntries(entries, map); + return map.values(); + } + + public static void mergeEntries( + Iterable entries, Map map) { + for (ManifestEntry entry : entries) { + ManifestEntry.Identifier identifier = entry.identifier(); + switch (entry.kind()) { + case ADD: + Preconditions.checkState( + !map.containsKey(identifier), + "Trying to add file %s which is already added. Manifest might be corrupted.", + identifier); + map.put(identifier, entry); + break; + case DELETE: + // each dataFile will only be added once and deleted once, + // if we know that it is added before then both add and delete entry can be + // removed because there won't be further operations on this file, + // otherwise we have to keep the delete entry because the add entry must be + // in the previous manifest files + if (map.containsKey(identifier)) { + map.remove(identifier); + } else { + map.put(identifier, entry); + } + break; + default: + throw new UnsupportedOperationException( + "Unknown value kind " + entry.kind().name()); + } + } + } + public static void assertNoDelete(Collection entries) { for (ManifestEntry entry : entries) { Preconditions.checkState( @@ -89,4 +157,54 @@ public static void assertNoDelete(Collection entries) { entry.file().fileName()); } } + + /** + * The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data + * file. + */ + public static class Identifier { + public final BinaryRow partition; + public final int bucket; + public final int level; + public final String fileName; + + private Identifier(BinaryRow partition, int bucket, int level, String fileName) { + this.partition = partition; + this.bucket = bucket; + this.level = level; + this.fileName = fileName; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Identifier)) { + return false; + } + Identifier that = (Identifier) o; + return Objects.equals(partition, that.partition) + && bucket == that.bucket + && level == that.level + && Objects.equals(fileName, that.fileName); + } + + @Override + public int hashCode() { + return Objects.hash(partition, bucket, level, fileName); + } + + @Override + public String toString() { + return String.format("{%s, %d, %d, %s}", partition, bucket, level, fileName); + } + + public String toString(FileStorePathFactory pathFactory) { + return pathFactory.getPartitionString(partition) + + ", bucket " + + bucket + + ", level " + + level + + ", file " + + fileName; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index f81a5cdc50d5..4f3b3e3ba1e1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -21,7 +21,6 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.manifest.AbstractManifestEntry; import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestEntrySerializer; @@ -219,8 +218,8 @@ public List files() { }; } - private Pair> doPlan( - Function> readManifest) { + private Pair> doPlan( + Function> readManifest) { List manifests = specifiedManifests; Long snapshotId = specifiedSnapshotId; if (manifests == null) { @@ -237,7 +236,7 @@ private Pair> doPlan( final List readManifests = manifests; - Iterable entries = + Iterable entries = ParallellyExecuteUtils.parallelismBatchIterable( files -> files.parallelStream() @@ -248,8 +247,8 @@ private Pair> doPlan( readManifests, scanManifestParallelism); - List files = new ArrayList<>(); - for (T file : AbstractManifestEntry.mergeEntries(entries)) { + List files = new ArrayList<>(); + for (ManifestEntry file : ManifestEntry.mergeEntries(entries)) { if (checkNumOfBuckets && file.totalBuckets() != numOfBuckets) { String partInfo = partitionConverter.getArity() > 0 @@ -324,29 +323,19 @@ private boolean filterManifestFileMeta(ManifestFileMeta manifest) { } /** Note: Keep this thread-safe. */ - private boolean filterByBucket(AbstractManifestEntry entry) { + private boolean filterByBucket(ManifestEntry entry) { return (specifiedBucket == null || entry.bucket() == specifiedBucket); } /** Note: Keep this thread-safe. */ - private boolean filterByBucketSelector(AbstractManifestEntry entry) { + private boolean filterByBucketSelector(ManifestEntry entry) { return (bucketSelector == null || bucketSelector.select(entry.bucket(), entry.totalBuckets())); } /** Note: Keep this thread-safe. */ - private boolean filterByLevel(AbstractManifestEntry entry) { - return (levelFilter == null || levelFilter.test(entry.level())); - } - - /** Note: Keep this thread-safe. */ - private boolean filterByStats(AbstractManifestEntry entry) { - // filterByStats is an action that is completed as much as possible and does not have an - // impact if it is not done. - if (entry instanceof ManifestEntry) { - return filterByStats((ManifestEntry) entry); - } - return true; + private boolean filterByLevel(ManifestEntry entry) { + return (levelFilter == null || levelFilter.test(entry.file().level())); } /** Note: Keep this thread-safe. */