Skip to content

Commit

Permalink
[core] Optimize manifest-full-compact and parts overwrite by Partitio…
Browse files Browse the repository at this point in the history
…nPredicate (#3864)
  • Loading branch information
JingsongLi authored Aug 1, 2024
1 parent 5bd3c6f commit acb0bea
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.manifest.FileEntry.Identifier;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.stats.SimpleStatsConverter;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,9 +43,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Metadata of a manifest file. */
Expand Down Expand Up @@ -291,11 +287,9 @@ public static Optional<List<ManifestFileMeta>> tryFullCompaction(
int j = 0;
if (partitionType.getFieldCount() > 0) {
Set<BinaryRow> deletePartitions = computeDeletePartitions(deltaMerged);
Optional<Predicate> predicateOpt =
convertPartitionToPredicate(partitionType, deletePartitions);

if (predicateOpt.isPresent()) {
Predicate predicate = predicateOpt.get();
PartitionPredicate predicate =
PartitionPredicate.fromMultiple(partitionType, deletePartitions);
if (predicate != null) {
for (; j < base.size(); j++) {
// TODO: optimize this to binary search.
ManifestFileMeta file = base.get(j);
Expand Down Expand Up @@ -404,23 +398,4 @@ private static Set<BinaryRow> computeDeletePartitions(
}
return partitions;
}

private static Optional<Predicate> convertPartitionToPredicate(
RowType partitionType, Set<BinaryRow> partitions) {
Optional<Predicate> predicateOpt;
if (!partitions.isEmpty()) {
RowDataToObjectArrayConverter rowArrayConverter =
new RowDataToObjectArrayConverter(partitionType);

List<Predicate> predicateList =
partitions.stream()
.map(rowArrayConverter::convert)
.map(values -> createPartitionPredicate(partitionType, values))
.collect(Collectors.toList());
predicateOpt = Optional.of(PredicateBuilder.or(predicateList));
} else {
predicateOpt = Optional.empty();
}
return predicateOpt;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.operation.metrics.CommitStats;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -422,27 +423,24 @@ public void overwrite(
try {
boolean skipOverwrite = false;
// partition filter is built from static or dynamic partition according to properties
Predicate partitionFilter = null;
PartitionPredicate partitionFilter = null;
if (dynamicPartitionOverwrite) {
if (appendTableFiles.isEmpty()) {
// in dynamic mode, if there is no changes to commit, no data will be deleted
skipOverwrite = true;
} else {
partitionFilter =
Set<BinaryRow> partitions =
appendTableFiles.stream()
.map(ManifestEntry::partition)
.distinct()
// partition filter is built from new data's partitions
.map(p -> createPartitionPredicate(partitionType, p))
.reduce(PredicateBuilder::or)
.orElseThrow(
() ->
new RuntimeException(
"Failed to get dynamic partition filter. This is unexpected."));
.collect(Collectors.toSet());
partitionFilter = PartitionPredicate.fromMultiple(partitionType, partitions);
}
} else {
partitionFilter =
// partition may be partial partition fields, so here must to use predicate way.
Predicate partitionPredicate =
createPartitionPredicate(partition, partitionType, partitionDefaultName);
partitionFilter =
PartitionPredicate.fromPredicate(partitionType, partitionPredicate);
// sanity check, all changes must be done within the given partition
if (partitionFilter != null) {
for (ManifestEntry entry : appendTableFiles) {
Expand Down Expand Up @@ -511,14 +509,17 @@ public void dropPartitions(List<Map<String, String>> partitions, long commitIden
partitions.stream().map(Objects::toString).collect(Collectors.joining(",")));
}

Predicate partitionFilter =
// partitions may be partial partition fields, so here must to use predicate way.
Predicate predicate =
partitions.stream()
.map(
partition ->
createPartitionPredicate(
partition, partitionType, partitionDefaultName))
.reduce(PredicateBuilder::or)
.orElseThrow(() -> new RuntimeException("Failed to get partition filter."));
PartitionPredicate partitionFilter =
PartitionPredicate.fromPredicate(partitionType, predicate);

tryOverwrite(
partitionFilter,
Expand Down Expand Up @@ -722,7 +723,7 @@ private int tryCommit(
}

private int tryOverwrite(
Predicate partitionFilter,
@Nullable PartitionPredicate partitionFilter,
List<ManifestEntry> changes,
List<IndexManifestEntry> indexFiles,
long identifier,
Expand Down Expand Up @@ -784,7 +785,7 @@ private int tryOverwrite(
}

@VisibleForTesting
public boolean tryCommitOnce(
boolean tryCommitOnce(
List<ManifestEntry> tableFiles,
List<ManifestEntry> changelogFiles,
List<IndexManifestEntry> indexFiles,
Expand All @@ -804,13 +805,13 @@ public boolean tryCommitOnce(
: snapshotManager.copyWithBranch(branchName).snapshotPath(newSnapshotId);

if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
LOG.debug("Ready to commit table files to snapshot {}", newSnapshotId);
for (ManifestEntry entry : tableFiles) {
LOG.debug(" * " + entry.toString());
LOG.debug(" * {}", entry);
}
LOG.debug("Ready to commit changelog to snapshot #" + newSnapshotId);
LOG.debug("Ready to commit changelog to snapshot {}", newSnapshotId);
for (ManifestEntry entry : changelogFiles) {
LOG.debug(" * " + entry.toString());
LOG.debug(" * {}", entry);
}
}

Expand Down Expand Up @@ -1292,7 +1293,8 @@ static ConflictCheck noConflictCheck() {
return latestSnapshot -> false;
}

public static ConflictCheck mustConflictCheck() {
@VisibleForTesting
static ConflictCheck mustConflictCheck() {
return latestSnapshot -> true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Set;

import static org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternal;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** A special predicate to filter partition only, just like {@link Predicate}. */
Expand All @@ -50,6 +51,10 @@ public interface PartitionPredicate {
boolean test(
long rowCount, InternalRow minValues, InternalRow maxValues, InternalArray nullCounts);

/**
* Compared to the multiple method, this approach can accept filtering of partially partitioned
* fields.
*/
@Nullable
static PartitionPredicate fromPredicate(RowType partitionType, Predicate predicate) {
if (partitionType.getFieldCount() == 0 || predicate == null) {
Expand All @@ -61,12 +66,17 @@ static PartitionPredicate fromPredicate(RowType partitionType, Predicate predica

@Nullable
static PartitionPredicate fromMultiple(RowType partitionType, List<BinaryRow> partitions) {
return fromMultiple(partitionType, new HashSet<>(partitions));
}

@Nullable
static PartitionPredicate fromMultiple(RowType partitionType, Set<BinaryRow> partitions) {
if (partitionType.getFieldCount() == 0 || partitions.isEmpty()) {
return null;
}

return new MultiplePartitionPredicate(
new RowDataToObjectArrayConverter(partitionType), new HashSet<>(partitions));
new RowDataToObjectArrayConverter(partitionType), partitions);
}

/** A {@link PartitionPredicate} using {@link Predicate}. */
Expand Down Expand Up @@ -127,13 +137,15 @@ private MultiplePartitionPredicate(
PredicateBuilder builder = new PredicateBuilder(partitionType);
for (int i = 0; i < collectors.length; i++) {
SimpleColStats stats = collectors[i].result();
if (stats.nullCount() == partitions.size()) {
Long nullCount = stats.nullCount();
checkArgument(nullCount != null, "nullCount cannot be null!");
if (nullCount == partitions.size()) {
min[i] = builder.isNull(i);
max[i] = builder.isNull(i);
} else {
min[i] = builder.greaterOrEqual(i, checkNotNull(stats.min()));
max[i] = builder.lessOrEqual(i, checkNotNull(stats.max()));
if (stats.nullCount() > 0) {
if (nullCount > 0) {
min[i] = PredicateBuilder.or(builder.isNull(i), min[i]);
max[i] = PredicateBuilder.or(builder.isNull(i), max[i]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,19 +787,20 @@ private void cleanBucket(TestFileStore store, BinaryRow partition, int bucket) {
entry.file()))
.collect(Collectors.toList());
// commit
store.newCommit()
.tryCommitOnce(
delete,
Collections.emptyList(),
Collections.emptyList(),
commitIdentifier++,
null,
Collections.emptyMap(),
Snapshot.CommitKind.APPEND,
store.snapshotManager().latestSnapshot(),
mustConflictCheck(),
DEFAULT_MAIN_BRANCH,
null);
try (FileStoreCommitImpl commit = store.newCommit()) {
commit.tryCommitOnce(
delete,
Collections.emptyList(),
Collections.emptyList(),
commitIdentifier++,
null,
Collections.emptyMap(),
Snapshot.CommitKind.APPEND,
store.snapshotManager().latestSnapshot(),
mustConflictCheck(),
DEFAULT_MAIN_BRANCH,
null);
}
}

private void createTag(Snapshot snapshot, String tagName, Duration timeRetained) {
Expand Down

0 comments on commit acb0bea

Please sign in to comment.