Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prune add file entry using columns stats in Delta checkpoint iterator #25311

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1242,6 +1242,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getEnforcedPartitionConstraint(),
tableHandle.getNonPartitionConstraint(),
tableHandle.getProjectedColumns().orElse(ImmutableSet.of()))) {
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
@@ -1640,6 +1641,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
deltaLakeTableHandle.getMetadataEntry(),
deltaLakeTableHandle.getProtocolEntry(),
deltaLakeTableHandle.getEnforcedPartitionConstraint(),
deltaLakeTableHandle.getNonPartitionConstraint(),
deltaLakeTableHandle.getProjectedColumns().orElse(ImmutableSet.of()))) {
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
@@ -2548,6 +2550,7 @@ private Map<String, DeletionVectorEntry> loadDeletionVectors(ConnectorSession se
handle.getMetadataEntry(),
handle.getProtocolEntry(),
handle.getEnforcedPartitionConstraint(),
handle.getNonPartitionConstraint(),
handle.getProjectedColumns().orElse(ImmutableSet.of()))) {
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
@@ -4286,6 +4289,7 @@ private Stream<AddFileEntry> getAddFileEntriesMatchingEnforcedPartitionConstrain
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getEnforcedPartitionConstraint(),
tableHandle.getNonPartitionConstraint(),
tableHandle.getProjectedColumns().orElse(ImmutableSet.of()));
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
if (enforcedPartitionConstraint.isAll()) {
@@ -4407,11 +4411,11 @@ public static DeltaLakeTableHandle checkValidTableHandle(ConnectorTableHandle ta
}

public static TupleDomain<DeltaLakeColumnHandle> createStatisticsPredicate(
AddFileEntry addFileEntry,
Optional<? extends DeltaLakeFileStatistics> stats,
List<DeltaLakeColumnMetadata> schema,
List<String> canonicalPartitionColumns)
{
return addFileEntry.getStats()
return stats
.map(deltaLakeFileStatistics -> withColumnDomains(
schema.stream()
.filter(column -> canUseInPredicate(column.columnMetadata()))
Original file line number Diff line number Diff line change
@@ -164,7 +164,7 @@ private List<Page> buildPages(ConnectorSession session)
PageListBuilder pageListBuilder = PageListBuilder.forTable(tableMetadata);

Map<Map<String, Optional<String>>, DeltaLakePartitionStatistics> statisticsByPartition;
try (Stream<AddFileEntry> activeFiles = transactionLogAccess.loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), alwaysTrue())) {
try (Stream<AddFileEntry> activeFiles = transactionLogAccess.loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), TupleDomain.all(), alwaysTrue())) {
statisticsByPartition = getStatisticsByPartition(activeFiles);
}

Original file line number Diff line number Diff line change
@@ -164,6 +164,7 @@ private Stream<DeltaLakeSplit> getSplits(
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getEnforcedPartitionConstraint(),
tableHandle.getNonPartitionConstraint(),
tableHandle.getProjectedColumns().orElse(ImmutableSet.of()));
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = tableHandle.getEnforcedPartitionConstraint();
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint = tableHandle.getNonPartitionConstraint();
@@ -232,7 +233,7 @@ private Stream<DeltaLakeSplit> getSplits(
}

TupleDomain<DeltaLakeColumnHandle> statisticsPredicate = createStatisticsPredicate(
addAction,
addAction.getStats(),
predicatedColumns,
metadataEntry.getLowercasePartitionColumns());
if (!nonPartitionConstraint.overlaps(statisticsPredicate)) {
Original file line number Diff line number Diff line change
@@ -131,6 +131,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
tableHandle.getMetadataEntry(),
tableHandle.getProtocolEntry(),
tableHandle.getEnforcedPartitionConstraint(),
tableHandle.getNonPartitionConstraint(),
tableHandle.getProjectedColumns().orElse(ImmutableSet.of()))) {
Iterator<AddFileEntry> addEntryIterator = addEntries.iterator();
while (addEntryIterator.hasNext()) {
@@ -159,7 +160,7 @@ public TableStatistics getTableStatistics(ConnectorSession session, DeltaLakeTab
}

TupleDomain<DeltaLakeColumnHandle> statisticsPredicate = createStatisticsPredicate(
addEntry,
addEntry.getStats(),
predicatedColumns,
tableHandle.getMetadataEntry().getLowercasePartitionColumns());
if (!tableHandle.getNonPartitionConstraint().overlaps(statisticsPredicate)) {
Original file line number Diff line number Diff line change
@@ -96,6 +96,11 @@ public AddFileEntry(
this.tags = tags;
this.deletionVector = requireNonNull(deletionVector, "deletionVector is null");

this.parsedStats = getDeltaLakeFileStatistics(stats, parsedStats);
}

public static Optional<? extends DeltaLakeFileStatistics> getDeltaLakeFileStatistics(Optional<String> stats, Optional<DeltaLakeParquetFileStatistics> parsedStats)
{
Optional<? extends DeltaLakeFileStatistics> resultParsedStats = Optional.empty();
if (parsedStats.isPresent()) {
resultParsedStats = parsedStats;
@@ -111,7 +116,7 @@ else if (stats.isPresent()) {
stats.get());
}
}
this.parsedStats = resultParsedStats;
return resultParsedStats;
}

/**
Original file line number Diff line number Diff line change
@@ -226,6 +226,7 @@ public Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
FileFormatDataSourceStats stats,
Optional<MetadataAndProtocolEntry> metadataAndProtocol,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<Predicate<String>> addStatsMinMaxColumnFilter)
throws IOException
{
@@ -254,6 +255,7 @@ public Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
checkpoint,
checkpointFile,
partitionConstraint,
nonPartitionConstraint,
addStatsMinMaxColumnFilter));
}

@@ -274,6 +276,7 @@ private Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
LastCheckpoint checkpoint,
TrinoInputFile checkpointFile,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<Predicate<String>> addStatsMinMaxColumnFilter)
{
long fileSize;
@@ -298,6 +301,7 @@ private Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
checkpoint,
checkpointFile,
partitionConstraint,
nonPartitionConstraint,
addStatsMinMaxColumnFilter,
fileSystem,
fileSize);
@@ -316,6 +320,7 @@ private Stream<DeltaLakeTransactionLogEntry> getCheckpointTransactionLogEntries(
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold,
partitionConstraint,
nonPartitionConstraint,
addStatsMinMaxColumnFilter);
return stream(checkpointEntryIterator).onClose(checkpointEntryIterator::close);
}
@@ -331,11 +336,26 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointTransactionLogEntrie
LastCheckpoint checkpoint,
TrinoInputFile checkpointFile,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<Predicate<String>> addStatsMinMaxColumnFilter,
TrinoFileSystem fileSystem,
long fileSize)
{
return getV2CheckpointEntries(session, entryTypes, metadataEntry, protocolEntry, checkpointSchemaManager, typeManager, stats, checkpoint, checkpointFile, partitionConstraint, addStatsMinMaxColumnFilter, fileSystem, fileSize)
return getV2CheckpointEntries(
session,
entryTypes,
metadataEntry,
protocolEntry,
checkpointSchemaManager,
typeManager,
stats,
checkpoint,
checkpointFile,
partitionConstraint,
nonPartitionConstraint,
addStatsMinMaxColumnFilter,
fileSystem,
fileSize)
.mapMulti((entry, builder) -> {
// Sidecar files contain only ADD and REMOVE entry types. https://github.com/delta-io/delta/blob/master/PROTOCOL.md#v2-spec
Set<CheckpointEntryIterator.EntryType> dataEntryTypes = Sets.intersection(entryTypes, Set.of(ADD, REMOVE));
@@ -358,6 +378,7 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointTransactionLogEntrie
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold,
partitionConstraint,
nonPartitionConstraint,
addStatsMinMaxColumnFilter);
stream(iterator).onClose(iterator::close).forEach(builder);
});
@@ -374,6 +395,7 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointEntries(
LastCheckpoint checkpoint,
TrinoInputFile checkpointFile,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Optional<Predicate<String>> addStatsMinMaxColumnFilter,
TrinoFileSystem fileSystem,
long fileSize)
@@ -406,6 +428,7 @@ private Stream<DeltaLakeTransactionLogEntry> getV2CheckpointEntries(
checkpointRowStatisticsWritingEnabled,
domainCompactionThreshold,
partitionConstraint,
nonPartitionConstraint,
addStatsMinMaxColumnFilter);
return stream(checkpointEntryIterator)
.onClose(checkpointEntryIterator::close);
Original file line number Diff line number Diff line change
@@ -333,13 +333,14 @@ public Stream<AddFileEntry> getActiveFiles(
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Set<DeltaLakeColumnHandle> projectedColumns)
{
Set<String> baseColumnNames = projectedColumns.stream()
.filter(DeltaLakeColumnHandle::isBaseColumn) // Only base column stats are supported
.map(DeltaLakeColumnHandle::columnName)
.collect(toImmutableSet());
return getActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, baseColumnNames::contains);
return getActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, nonPartitionConstraint, baseColumnNames::contains);
}

public Stream<AddFileEntry> getActiveFiles(
@@ -349,10 +350,22 @@ public Stream<AddFileEntry> getActiveFiles(
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
Predicate<String> addStatsMinMaxColumnFilter)
{
return getActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, TupleDomain.all(), addStatsMinMaxColumnFilter);
}

public Stream<AddFileEntry> getActiveFiles(
ConnectorSession session,
TableSnapshot tableSnapshot,
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Predicate<String> addStatsMinMaxColumnFilter)
{
try {
if (isCheckpointFilteringEnabled(session)) {
return loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, addStatsMinMaxColumnFilter);
return loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, partitionConstraint, nonPartitionConstraint, addStatsMinMaxColumnFilter);
}

TableVersion tableVersion = new TableVersion(new TableLocation(tableSnapshot.getTable(), tableSnapshot.getTableLocation()), tableSnapshot.getVersion());
@@ -383,7 +396,7 @@ public Stream<AddFileEntry> getActiveFiles(
}

List<AddFileEntry> activeFiles;
try (Stream<AddFileEntry> addFileEntryStream = loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), alwaysTrue())) {
try (Stream<AddFileEntry> addFileEntryStream = loadActiveFiles(session, tableSnapshot, metadataEntry, protocolEntry, TupleDomain.all(), TupleDomain.all(), alwaysTrue())) {
activeFiles = addFileEntryStream.collect(toImmutableList());
}
return new DeltaLakeDataFileCacheEntry(tableSnapshot.getVersion(), activeFiles);
@@ -401,6 +414,7 @@ public Stream<AddFileEntry> loadActiveFiles(
MetadataEntry metadataEntry,
ProtocolEntry protocolEntry,
TupleDomain<DeltaLakeColumnHandle> partitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Predicate<String> addStatsMinMaxColumnFilter)
{
List<Transaction> transactions = tableSnapshot.getTransactions();
@@ -414,6 +428,7 @@ public Stream<AddFileEntry> loadActiveFiles(
fileFormatDataSourceStats,
Optional.of(new MetadataAndProtocolEntry(metadataEntry, protocolEntry)),
partitionConstraint,
nonPartitionConstraint,
Optional.of(addStatsMinMaxColumnFilter))) {
return activeAddEntries(checkpointEntries, transactions, fileSystem)
.filter(partitionConstraint.isAll()
@@ -567,7 +582,7 @@ private <T> Stream<T> getEntries(
List<Transaction> transactions = tableSnapshot.getTransactions();
// Passing TupleDomain.all() because this method is used for getting all entries
Stream<DeltaLakeTransactionLogEntry> checkpointEntries = tableSnapshot.getCheckpointTransactionLogEntries(
session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats, Optional.empty(), TupleDomain.all(), Optional.of(alwaysTrue()));
session, entryTypes, checkpointSchemaManager, typeManager, fileSystem, stats, Optional.empty(), TupleDomain.all(), TupleDomain.all(), Optional.of(alwaysTrue()));

return entryMapper.apply(
checkpointEntries,
Original file line number Diff line number Diff line change
@@ -183,7 +183,7 @@ private static Long readPartitionTimestamp(String timestamp)
}

@VisibleForTesting
static Long readPartitionTimestampWithZone(String timestamp)
public static Long readPartitionTimestampWithZone(String timestamp)
{
ZonedDateTime zonedDateTime;
try {
Loading