Skip to content

Commit e2375fe

Browse files
authored
Push down IP comparison as range query with Calcite (#3959)
* Add reverse op for compare ip to support pushdown Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Pushdown ip comparison Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Refactor CompareIpFunction to use SqlKind directly Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> * Simplify the overriding of reverse() for IP comparators Signed-off-by: Yuanchun Shen <yuanchu@amazon.com> --------- Signed-off-by: Yuanchun Shen <yuanchu@amazon.com>
1 parent 9f2b19a commit e2375fe

File tree

11 files changed

+153
-38
lines changed

11 files changed

+153
-38
lines changed

core/src/main/java/org/opensearch/sql/calcite/utils/PPLReturnTypes.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ private PPLReturnTypes() {}
2424
ReturnTypes.explicit(UserDefinedFunctionUtils.NULLABLE_TIME_UDT);
2525
public static final SqlReturnTypeInference TIMESTAMP_FORCE_NULLABLE =
2626
ReturnTypes.explicit(UserDefinedFunctionUtils.NULLABLE_TIMESTAMP_UDT);
27+
public static final SqlReturnTypeInference IP_FORCE_NULLABLE =
28+
ReturnTypes.explicit(UserDefinedFunctionUtils.NULLABLE_IP_UDT);
2729
public static SqlReturnTypeInference INTEGER_FORCE_NULLABLE =
2830
ReturnTypes.INTEGER.andThen(SqlTypeTransforms.FORCE_NULLABLE);
2931
public static SqlReturnTypeInference STRING_FORCE_NULLABLE =

core/src/main/java/org/opensearch/sql/calcite/utils/UserDefinedFunctionUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class UserDefinedFunctionUtils {
5555
TYPE_FACTORY.createUDT(ExprUDT.EXPR_TIMESTAMP, true);
5656
public static final RelDataType NULLABLE_STRING =
5757
TYPE_FACTORY.createTypeWithNullability(TYPE_FACTORY.createSqlType(SqlTypeName.VARCHAR), true);
58+
public static final RelDataType NULLABLE_IP_UDT = TYPE_FACTORY.createUDT(EXPR_IP, true);
5859

5960
public static RelDataType nullablePatternAggList =
6061
createArrayType(
@@ -77,6 +78,7 @@ public class UserDefinedFunctionUtils {
7778
ImmutableSet.of("match", "match_phrase", "match_bool_prefix", "match_phrase_prefix");
7879
public static Set<String> MULTI_FIELDS_RELEVANCE_FUNCTION_SET =
7980
ImmutableSet.of("simple_query_string", "query_string", "multi_match");
81+
public static String IP_FUNCTION_NAME = "IP";
8082

8183
/**
8284
* Creates a SqlUserDefinedAggFunction that wraps a Java class implementing an aggregate function.

core/src/main/java/org/opensearch/sql/expression/function/PPLBuiltinOperators.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,8 @@ public class PPLBuiltinOperators extends ReflectiveSqlOperatorTable {
299299
.toUDF("TIME");
300300

301301
// IP cast function
302-
public static final SqlOperator IP = new IPFunction().toUDF("IP");
302+
public static final SqlOperator IP =
303+
new IPFunction().toUDF(UserDefinedFunctionUtils.IP_FUNCTION_NAME);
303304
public static final SqlOperator TIME_TO_SEC =
304305
adaptExprMethodToUDF(
305306
DateTimeFunctions.class,

core/src/main/java/org/opensearch/sql/expression/function/udf/ip/CompareIpFunction.java

Lines changed: 71 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,30 @@
55

66
package org.opensearch.sql.expression.function.udf.ip;
77

8+
import java.util.Collections;
89
import java.util.List;
10+
import java.util.Locale;
911
import org.apache.calcite.adapter.enumerable.NotNullImplementor;
1012
import org.apache.calcite.adapter.enumerable.NullPolicy;
1113
import org.apache.calcite.adapter.enumerable.RexToLixTranslator;
1214
import org.apache.calcite.linq4j.tree.ConstantExpression;
1315
import org.apache.calcite.linq4j.tree.Expression;
1416
import org.apache.calcite.linq4j.tree.Expressions;
1517
import org.apache.calcite.rex.RexCall;
18+
import org.apache.calcite.sql.SqlIdentifier;
19+
import org.apache.calcite.sql.SqlKind;
20+
import org.apache.calcite.sql.SqlOperator;
21+
import org.apache.calcite.sql.SqlSyntax;
22+
import org.apache.calcite.sql.parser.SqlParserPos;
23+
import org.apache.calcite.sql.type.InferTypes;
1624
import org.apache.calcite.sql.type.ReturnTypes;
1725
import org.apache.calcite.sql.type.SqlReturnTypeInference;
26+
import org.apache.calcite.sql.validate.SqlUserDefinedFunction;
27+
import org.checkerframework.checker.nullness.qual.Nullable;
1828
import org.opensearch.sql.data.model.ExprIpValue;
1929
import org.opensearch.sql.data.type.ExprCoreType;
2030
import org.opensearch.sql.expression.function.ImplementorUDF;
31+
import org.opensearch.sql.expression.function.PPLBuiltinOperators;
2132
import org.opensearch.sql.expression.function.UDFOperandMetadata;
2233

2334
/**
@@ -32,33 +43,73 @@
3243
* </ul>
3344
*/
3445
public class CompareIpFunction extends ImplementorUDF {
46+
private final SqlKind kind;
3547

36-
private CompareIpFunction(ComparisonType comparisonType) {
37-
super(new CompareImplementor(comparisonType), NullPolicy.ANY);
48+
private CompareIpFunction(SqlKind kind) {
49+
super(new CompareImplementor(kind), NullPolicy.ANY);
50+
this.kind = kind;
3851
}
3952

4053
public static CompareIpFunction less() {
41-
return new CompareIpFunction(ComparisonType.LESS);
54+
return new CompareIpFunction(SqlKind.LESS_THAN);
4255
}
4356

4457
public static CompareIpFunction greater() {
45-
return new CompareIpFunction(ComparisonType.GREATER);
58+
return new CompareIpFunction(SqlKind.GREATER_THAN);
4659
}
4760

4861
public static CompareIpFunction lessOrEquals() {
49-
return new CompareIpFunction(ComparisonType.LESS_OR_EQUAL);
62+
return new CompareIpFunction(SqlKind.LESS_THAN_OR_EQUAL);
5063
}
5164

5265
public static CompareIpFunction greaterOrEquals() {
53-
return new CompareIpFunction(ComparisonType.GREATER_OR_EQUAL);
66+
return new CompareIpFunction(SqlKind.GREATER_THAN_OR_EQUAL);
5467
}
5568

5669
public static CompareIpFunction equals() {
57-
return new CompareIpFunction(ComparisonType.EQUALS);
70+
return new CompareIpFunction(SqlKind.EQUALS);
5871
}
5972

6073
public static CompareIpFunction notEquals() {
61-
return new CompareIpFunction(ComparisonType.NOT_EQUALS);
74+
return new CompareIpFunction(SqlKind.NOT_EQUALS);
75+
}
76+
77+
@Override
78+
public SqlUserDefinedFunction toUDF(String functionName, boolean isDeterministic) {
79+
SqlIdentifier udfIdentifier =
80+
new SqlIdentifier(Collections.singletonList(functionName), null, SqlParserPos.ZERO, null);
81+
return new SqlUserDefinedFunction(
82+
udfIdentifier,
83+
kind,
84+
getReturnTypeInference(),
85+
InferTypes.ANY_NULLABLE,
86+
getOperandMetadata(),
87+
getFunction()) {
88+
@Override
89+
public boolean isDeterministic() {
90+
return isDeterministic;
91+
}
92+
93+
@Override
94+
public @Nullable SqlOperator reverse() {
95+
return switch (kind) {
96+
case LESS_THAN -> PPLBuiltinOperators.GREATER_IP;
97+
case GREATER_THAN -> PPLBuiltinOperators.LESS_IP;
98+
case LESS_THAN_OR_EQUAL -> PPLBuiltinOperators.GTE_IP;
99+
case GREATER_THAN_OR_EQUAL -> PPLBuiltinOperators.LTE_IP;
100+
case EQUALS -> PPLBuiltinOperators.EQUALS_IP;
101+
case NOT_EQUALS -> PPLBuiltinOperators.NOT_EQUALS_IP;
102+
default -> throw new IllegalArgumentException(
103+
String.format(
104+
Locale.ROOT, "CompareIpFunction is not supposed to be of kind: %s", kind));
105+
};
106+
}
107+
108+
@Override
109+
public SqlSyntax getSyntax() {
110+
return SqlSyntax.BINARY;
111+
}
112+
};
62113
}
63114

64115
@Override
@@ -72,10 +123,10 @@ public UDFOperandMetadata getOperandMetadata() {
72123
}
73124

74125
public static class CompareImplementor implements NotNullImplementor {
75-
private final ComparisonType comparisonType;
126+
private final SqlKind compareType;
76127

77-
public CompareImplementor(ComparisonType comparisonType) {
78-
this.comparisonType = comparisonType;
128+
public CompareImplementor(SqlKind compareType) {
129+
this.compareType = compareType;
79130
}
80131

81132
@Override
@@ -88,19 +139,20 @@ public Expression implement(
88139
translatedOperands.get(0),
89140
translatedOperands.get(1));
90141

91-
return generateComparisonExpression(compareResult, comparisonType);
142+
return evalCompareResult(compareResult, compareType);
92143
}
93144

94-
private static Expression generateComparisonExpression(
95-
Expression compareResult, ComparisonType comparisonType) {
145+
private static Expression evalCompareResult(Expression compareResult, SqlKind compareType) {
96146
final ConstantExpression zero = Expressions.constant(0);
97-
return switch (comparisonType) {
147+
return switch (compareType) {
98148
case EQUALS -> Expressions.equal(compareResult, zero);
99149
case NOT_EQUALS -> Expressions.notEqual(compareResult, zero);
100-
case LESS -> Expressions.lessThan(compareResult, zero);
101-
case LESS_OR_EQUAL -> Expressions.lessThanOrEqual(compareResult, zero);
102-
case GREATER -> Expressions.greaterThan(compareResult, zero);
103-
case GREATER_OR_EQUAL -> Expressions.greaterThanOrEqual(compareResult, zero);
150+
case LESS_THAN -> Expressions.lessThan(compareResult, zero);
151+
case LESS_THAN_OR_EQUAL -> Expressions.lessThanOrEqual(compareResult, zero);
152+
case GREATER_THAN -> Expressions.greaterThan(compareResult, zero);
153+
case GREATER_THAN_OR_EQUAL -> Expressions.greaterThanOrEqual(compareResult, zero);
154+
default -> throw new UnsupportedOperationException(
155+
String.format(Locale.ROOT, "Unsupported compare type: %s", compareType));
104156
};
105157
}
106158

@@ -119,13 +171,4 @@ private static ExprIpValue toExprIpValue(Object obj) {
119171
throw new IllegalArgumentException("Invalid IP type: " + obj);
120172
}
121173
}
122-
123-
public enum ComparisonType {
124-
EQUALS,
125-
NOT_EQUALS,
126-
LESS,
127-
LESS_OR_EQUAL,
128-
GREATER,
129-
GREATER_OR_EQUAL
130-
}
131174
}

core/src/main/java/org/opensearch/sql/expression/function/udf/ip/IPFunction.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
import org.apache.calcite.linq4j.tree.Expression;
1313
import org.apache.calcite.linq4j.tree.Expressions;
1414
import org.apache.calcite.rex.RexCall;
15-
import org.apache.calcite.sql.type.ReturnTypes;
1615
import org.apache.calcite.sql.type.SqlReturnTypeInference;
1716
import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory;
17+
import org.opensearch.sql.calcite.utils.PPLReturnTypes;
1818
import org.opensearch.sql.data.model.ExprIpValue;
1919
import org.opensearch.sql.data.type.ExprCoreType;
2020
import org.opensearch.sql.data.type.ExprType;
@@ -46,8 +46,7 @@ public UDFOperandMetadata getOperandMetadata() {
4646

4747
@Override
4848
public SqlReturnTypeInference getReturnTypeInference() {
49-
return ReturnTypes.explicit(
50-
OpenSearchTypeFactory.TYPE_FACTORY.createUDT(OpenSearchTypeFactory.ExprUDT.EXPR_IP, true));
49+
return PPLReturnTypes.IP_FORCE_NULLABLE;
5150
}
5251

5352
public static class CastImplementor

integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
import java.io.IOException;
1515
import java.util.Locale;
16+
import org.junit.Assume;
1617
import org.junit.Ignore;
1718
import org.junit.jupiter.api.Test;
1819
import org.opensearch.client.ResponseException;
@@ -93,18 +94,31 @@ public void testFilterByCompareStringTimePushDownExplain() throws IOException {
9394

9495
@Test
9596
public void testFilterByCompareIPCoercion() throws IOException {
96-
// Should automatically cast the string literal to IP.
97-
// TODO: Push down IP comparison as range query with Calcite
98-
String expected = loadExpectedPlan("explain_filter_compare_ip.json");
97+
// Should automatically cast the string literal to IP and pushdown it as a range query
9998
assertJsonEqualsIgnoreId(
100-
expected,
99+
loadExpectedPlan("explain_filter_compare_ip.json"),
101100
explainQueryToString(
102101
String.format(
103102
Locale.ROOT,
104103
"source=%s | where host > '1.1.1.1' | fields host",
105104
TEST_INDEX_WEBLOGS)));
106105
}
107106

107+
@Test
108+
public void testFilterByCompareIpv6Swapped() throws IOException {
109+
// Ignored in v2: the serialized string is unstable because of function properties
110+
Assume.assumeTrue(isCalciteEnabled());
111+
// Test swapping ip and string. In v2, this is pushed down as script;
112+
// with Calcite, it will still be pushed down as a range query
113+
assertJsonEqualsIgnoreId(
114+
loadExpectedPlan("explain_filter_compare_ipv6_swapped.json"),
115+
explainQueryToString(
116+
String.format(
117+
Locale.ROOT,
118+
"source=%s | where '::ffff:1234' <= host | fields host",
119+
TEST_INDEX_WEBLOGS)));
120+
}
121+
108122
@Test
109123
public void testWeekArgumentCoercion() throws IOException {
110124
String expected = loadExpectedPlan("explain_week_argument_coercion.json");
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"calcite": {
33
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[GREATER_IP($0, IP('1.1.1.1':VARCHAR))])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
4-
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], SCRIPT->GREATER_IP($0, IP('1.1.1.1':VARCHAR)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"script\":{\"script\":{\"source\":\"{\\\"langType\\\":\\\"calcite\\\",\\\"script\\\":\\\"rO0ABXNyABFqYXZhLnV0aWwuQ29sbFNlcleOq7Y6G6gRAwABSQADdGFneHAAAAADdwQAAAAGdAAHcm93VHlwZXQAensKICAiZmllbGRzIjogWwogICAgewogICAgICAidHlwZSI6ICJPVEhFUiIsCiAgICAgICJudWxsYWJsZSI6IHRydWUsCiAgICAgICJuYW1lIjogImhvc3QiCiAgICB9CiAgXSwKICAibnVsbGFibGUiOiBmYWxzZQp9dAAEZXhwcnQDfXsKICAib3AiOiB7CiAgICAibmFtZSI6ICJHUkVBVEVSX0lQIiwKICAgICJraW5kIjogIk9USEVSX0ZVTkNUSU9OIiwKICAgICJzeW50YXgiOiAiRlVOQ1RJT04iCiAgfSwKICAib3BlcmFuZHMiOiBbCiAgICB7CiAgICAgICJpbnB1dCI6IDAsCiAgICAgICJuYW1lIjogIiQwIgogICAgfSwKICAgIHsKICAgICAgIm9wIjogewogICAgICAgICJuYW1lIjogIklQIiwKICAgICAgICAia2luZCI6ICJPVEhFUl9GVU5DVElPTiIsCiAgICAgICAgInN5bnRheCI6ICJGVU5DVElPTiIKICAgICAgfSwKICAgICAgIm9wZXJhbmRzIjogWwogICAgICAgIHsKICAgICAgICAgICJsaXRlcmFsIjogIjEuMS4xLjEiLAogICAgICAgICAgInR5cGUiOiB7CiAgICAgICAgICAgICJ0eXBlIjogIlZBUkNIQVIiLAogICAgICAgICAgICAibnVsbGFibGUiOiBmYWxzZSwKICAgICAgICAgICAgInByZWNpc2lvbiI6IC0xCiAgICAgICAgICB9CiAgICAgICAgfQogICAgICBdLAogICAgICAiY2xhc3MiOiAib3JnLm9wZW5zZWFyY2guc3FsLmV4cHJlc3Npb24uZnVuY3Rpb24uVXNlckRlZmluZWRGdW5jdGlvbkJ1aWxkZXIkMSIsCiAgICAgICJ0eXBlIjogewogICAgICAgICJ0eXBlIjogIk9USEVSIiwKICAgICAgICAibnVsbGFibGUiOiB0cnVlCiAgICAgIH0sCiAgICAgICJkZXRlcm1pbmlzdGljIjogdHJ1ZSwKICAgICAgImR5bmFtaWMiOiBmYWxzZQogICAgfQogIF0sCiAgImNsYXNzIjogIm9yZy5vcGVuc2VhcmNoLnNxbC5leHByZXNzaW9uLmZ1bmN0aW9uLlVzZXJEZWZpbmVkRnVuY3Rpb25CdWlsZGVyJDEiLAogICJ0eXBlIjogewogICAgInR5cGUiOiAiQk9PTEVBTiIsCiAgICAibnVsbGFibGUiOiB0cnVlCiAgfSwKICAiZGV0ZXJtaW5pc3RpYyI6IHRydWUsCiAgImR5bmFtaWMiOiBmYWxzZQp9dAAKZmllbGRUeXBlc3NyABFqYXZhLnV0aWwuSGFzaE1hcAUH2sHDFmDRAwACRgAKbG9hZEZhY3RvckkACXRocmVzaG9sZHhwP0AAAAAAAAx3CAAAABAAAAABdAAEaG9zdH5yAClvcmcub3BlbnNlYXJjaC5zcWwuZGF0YS50eXBlLkV4cHJDb3JlVHlwZQAAAAAAAAAAEgAAeHIADmphdmEubGFuZy5FbnVtAAAAAAAAAAASAAB4cHQAAklQeHg=\\\"}\",\"lang\":\"opensearch_compounded_script\",\"params\":{\"utcTimestamp\":*}},\"boost\":1.0}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], FILTER->GREATER_IP($0, IP('1.1.1.1':VARCHAR)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"host\":{\"from\":\"1.1.1.1\",\"to\":null,\"include_lower\":false,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
55
}
66
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[LTE_IP(IP('::ffff:1234':VARCHAR), $0)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
4+
"physical": "CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]], PushDownContext=[[PROJECT->[host], FILTER->LTE_IP(IP('::ffff:1234':VARCHAR), $0), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"query\":{\"range\":{\"host\":{\"from\":\"::ffff:1234\",\"to\":null,\"include_lower\":true,\"include_upper\":true,\"boost\":1.0}}},\"_source\":{\"includes\":[\"host\"],\"excludes\":[]},\"sort\":[{\"_doc\":{\"order\":\"asc\"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n"
5+
}
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"calcite": {
3+
"logical": "LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(host=[$0])\n LogicalFilter(condition=[LTE_IP(IP('::ffff:1234':VARCHAR), $0)])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n",
4+
"physical": "EnumerableLimit(fetch=[10000])\n EnumerableCalc(expr#0..11=[{inputs}], expr#12=['::ffff:1234':VARCHAR], expr#13=[IP($t12)], expr#14=[LTE_IP($t13, $t0)], host=[$t0], $condition=[$t14])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_weblogs]])\n"
5+
}
6+
}

0 commit comments

Comments
 (0)