Skip to content

Commit

Permalink
[fix](iceberg) fix iceberg predicate conversion bug (#33283)
Browse files Browse the repository at this point in the history
Followup #32923

Some cases are not covered in #32923
  • Loading branch information
morningman authored Apr 7, 2024
1 parent 7cbdaa7 commit 79a7718
Show file tree
Hide file tree
Showing 6 changed files with 1,028 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.expressions.And;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.Not;
import org.apache.iceberg.expressions.Or;
import org.apache.iceberg.expressions.Unbound;
import org.apache.iceberg.types.Type.TypeID;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -107,6 +110,10 @@ public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
Expression right = convertToIcebergExpr(compoundPredicate.getChild(1), schema);
if (left != null && right != null) {
expression = Expressions.and(left, right);
} else if (left != null) {
return left;
} else if (right != null) {
return right;
}
break;
}
Expand Down Expand Up @@ -209,6 +216,9 @@ public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
}
LiteralExpr literalExpr = (LiteralExpr) inExpr.getChild(i);
Object value = extractDorisLiteral(nestedField.type(), literalExpr);
if (value == null) {
return null;
}
valueList.add(value);
}
if (inExpr.isNotIn()) {
Expand All @@ -220,16 +230,62 @@ public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
}
}

if (expression != null && expression instanceof Unbound) {
try {
((Unbound<?, ?>) expression).bind(schema.asStruct(), true);
return expression;
} catch (Exception e) {
LOG.warn("Failed to check expression: " + e.getMessage());
return null;
return checkConversion(expression, schema);
}

private static Expression checkConversion(Expression expression, Schema schema) {
if (expression == null) {
return null;
}
switch (expression.op()) {
case AND: {
And andExpr = (And) expression;
Expression left = checkConversion(andExpr.left(), schema);
Expression right = checkConversion(andExpr.right(), schema);
if (left != null && right != null) {
return andExpr;
} else if (left != null) {
return left;
} else if (right != null) {
return right;
} else {
return null;
}
}
case OR: {
Or orExpr = (Or) expression;
Expression left = checkConversion(orExpr.left(), schema);
Expression right = checkConversion(orExpr.right(), schema);
if (left == null || right == null) {
return null;
} else {
return orExpr;
}
}
case NOT: {
Not notExpr = (Not) expression;
Expression child = checkConversion(notExpr.child(), schema);
if (child == null) {
return null;
} else {
return notExpr;
}
}
case TRUE:
case FALSE:
return expression;
default:
if (!(expression instanceof Unbound)) {
return null;
}
try {
((Unbound<?, ?>) expression).bind(schema.asStruct(), true);
return expression;
} catch (Exception e) {
LOG.debug("Failed to check expression: {}", e.getMessage());
return null;
}
}
return null;
}

public static Object extractDorisLiteral(org.apache.iceberg.types.Type icebergType, Expr expr) {
Expand All @@ -248,6 +304,7 @@ public static Object extractDorisLiteral(org.apache.iceberg.types.Type icebergTy
DateLiteral dateLiteral = (DateLiteral) expr;
switch (icebergTypeID) {
case STRING:
case DATE:
return dateLiteral.getStringValue();
case TIMESTAMP:
return dateLiteral.unixTimestamp(TimeUtils.getTimeZone()) * MILLIS_TO_NANO_TIME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileType;
Expand All @@ -50,6 +51,7 @@
import org.apache.doris.thrift.TTableFormatFileDesc;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CombinedScanTask;
Expand Down Expand Up @@ -90,6 +92,7 @@ public class IcebergScanNode extends FileQueryScanNode {

private IcebergSource source;
private Table icebergTable;
private List<String> pushdownIcebergPredicates = Lists.newArrayList();

/**
* External file scan node for Query iceberg table
Expand Down Expand Up @@ -180,7 +183,6 @@ public List<Split> getSplits() throws UserException {
}

private List<Split> doGetSplits() throws UserException {

TableScan scan = icebergTable.newScan();

// set snapshot
Expand All @@ -199,6 +201,7 @@ private List<Split> doGetSplits() throws UserException {
}
for (Expression predicate : expressions) {
scan = scan.filter(predicate);
this.pushdownIcebergPredicates.add(predicate.toString());
}

// get splits
Expand Down Expand Up @@ -439,4 +442,17 @@ protected void toThrift(TPlanNode planNode) {
}
}
}

@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
if (pushdownIcebergPredicates.isEmpty()) {
return super.getNodeExplainString(prefix, detailLevel);
}
StringBuilder sb = new StringBuilder();
for (String predicate : pushdownIcebergPredicates) {
sb.append(prefix).append(prefix).append(predicate).append("\n");
}
return super.getNodeExplainString(prefix, detailLevel)
+ String.format("%sicebergPredicatePushdown=\n%s\n", prefix, sb);
}
}
Loading

0 comments on commit 79a7718

Please sign in to comment.