Skip to content

Timeout reading delta tables after incremental updates with null values #24920

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.delta;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.GenericInternalException;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
Expand All @@ -25,6 +24,7 @@
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Streams;
import io.airlift.slice.Slice;
import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.Row;
Expand All @@ -34,10 +34,12 @@
import java.io.IOException;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.function.Predicate;

import static com.facebook.presto.delta.DeltaColumnHandle.ColumnType.PARTITION;
import static com.facebook.presto.delta.DeltaErrorCode.DELTA_INVALID_PARTITION_VALUE;
Expand Down Expand Up @@ -123,59 +125,26 @@ private static TupleDomain<String> extractPartitionColumnsPredicate(TupleDomain<
});
}

private static class AllFilesIterator
private static class NoneFilesIterator
implements CloseableIterator<Row>
{
private final CloseableIterator<FilteredColumnarBatch> inputIterator;
private Row nextItem;
private boolean rowsRemaining;
private CloseableIterator<Row> row;

public AllFilesIterator(CloseableIterator<FilteredColumnarBatch> inputIterator)
NoneFilesIterator(CloseableIterator<FilteredColumnarBatch> inputIterator)
{
this.inputIterator = inputIterator;
}

@Override
public boolean hasNext()
{
if (nextItem != null) {
return true;
}

if (!rowsRemaining) {
if (!inputIterator.hasNext()) {
return false;
}
FilteredColumnarBatch nextFile = inputIterator.next();
row = nextFile.getRows();
}
Row nextRow;
rowsRemaining = false;
if (row.hasNext()) {
nextRow = row.next();
nextItem = nextRow;
rowsRemaining = true;
}
if (!rowsRemaining) {
try {
row.close();
}
catch (IOException e) {
throw new GenericInternalException("Could not close row batch", e);
}
}
return nextItem != null;
return false;
}

@Override
public Row next()
{
if (!hasNext()) {
throw new NoSuchElementException("There are no more files");
}
Row toReturn = nextItem;
nextItem = null;
return toReturn;
throw new NoSuchElementException();
}

@Override
Expand All @@ -186,109 +155,78 @@ public void close()
}
}

private static class NoneFilesIterator
private static class BatchRowIterator
implements CloseableIterator<Row>
{
private final CloseableIterator<FilteredColumnarBatch> inputIterator;
private final Iterator<Row> rows;
private CloseableIterator<Row> prev;

NoneFilesIterator(CloseableIterator<FilteredColumnarBatch> inputIterator)
public BatchRowIterator(CloseableIterator<FilteredColumnarBatch> inputIterator,
Optional<Predicate<Row>> rowFilter)
{
this.inputIterator = inputIterator;
this.rows = Streams.stream(inputIterator)
.flatMap(batch -> {
if (prev != null) {
try {
prev.close();
}
catch (IOException e) {
throw new RuntimeException("Failed to close previous row batch", e);
}
}
prev = batch.getRows();
return Streams.stream(prev);
})
// if there is a filter to be applied, it applies it
.filter(row -> rowFilter.map(predicate -> predicate.test(row)).orElse(true))
.iterator();
}

@Override
public boolean hasNext()
{
return false;
return rows.hasNext();
}

@Override
public Row next()
{
throw new NoSuchElementException();
return rows.next();
}

@Override
public void close()
throws IOException
public void close() throws IOException
{
inputIterator.close();
}
}

private static class FilteredByPredicateIterator
implements CloseableIterator<Row>
{
private final CloseableIterator<FilteredColumnarBatch> inputIterator;
private final TupleDomain<String> partitionPredicate;
private final List<DeltaColumnHandle> partitionColumns;
private final TypeManager typeManager;
private Row nextItem;
private boolean rowsRemaining;
private CloseableIterator<Row> row;

public FilteredByPredicateIterator(CloseableIterator<FilteredColumnarBatch> inputIterator,
TupleDomain<String> partitionPredicate,
List<DeltaColumnHandle> partitionColumns, TypeManager typeManager)
{
this.inputIterator = inputIterator;
this.partitionPredicate = partitionPredicate;
this.partitionColumns = partitionColumns;
this.typeManager = typeManager;
}

@Override
public boolean hasNext()
{
if (nextItem != null) {
return true;
}

if (!rowsRemaining) {
if (!inputIterator.hasNext()) {
return false;
}
FilteredColumnarBatch nextFile = inputIterator.next();
row = nextFile.getRows();
}
Row nextRow;
rowsRemaining = false;
while (row.hasNext()) {
nextRow = row.next();
if (evaluatePartitionPredicate(partitionPredicate, partitionColumns, typeManager,
nextRow)) {
nextItem = nextRow;
rowsRemaining = true;
break;
}
if (prev != null) {
prev.close();
}
if (!rowsRemaining) {
try {
row.close();
}
catch (IOException e) {
throw new GenericInternalException("Cloud not close row batch", e);
}
if (inputIterator != null) {
inputIterator.close();
}
return nextItem != null;
}
}

@Override
public Row next()
private static class AllFilesIterator
extends BatchRowIterator
{
public AllFilesIterator(CloseableIterator<FilteredColumnarBatch> inputIterator)
{
if (!hasNext()) {
throw new NoSuchElementException("There are no more files");
}
Row toReturn = nextItem;
nextItem = null;
return toReturn;
super(inputIterator, Optional.empty());
}
}

@Override
public void close()
throws IOException
private static class FilteredByPredicateIterator
extends BatchRowIterator
{
public FilteredByPredicateIterator(CloseableIterator<FilteredColumnarBatch> inputIterator,
TupleDomain<String> partitionPredicate,
List<DeltaColumnHandle> partitionColumns,
TypeManager typeManager)
{
inputIterator.close();
super(inputIterator,
Optional.of(row -> evaluatePartitionPredicate(partitionPredicate, partitionColumns, typeManager, row)));
}

private static boolean evaluatePartitionPredicate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ private static DistributedQueryRunner createDeltaQueryRunner(Map<String, String>
* @param deltaTableName Name of the delta table which is on the classpath.
* @param hiveTableName Name of the Hive table that the Delta table is to be registered as in HMS
*/
private static void registerDeltaTableInHMS(QueryRunner queryRunner, String deltaTableName, String hiveTableName)
protected static void registerDeltaTableInHMS(QueryRunner queryRunner, String deltaTableName, String hiveTableName)
{
queryRunner.execute(format(
"CREATE TABLE %s.\"%s\".\"%s\" (dummyColumn INT) WITH (external_location = '%s')",
Expand Down
Loading
Loading