Skip to content

Commit

Permalink
[Prometheus]Bug fix for less than and greater than operators on @time…
Browse files Browse the repository at this point in the history
…stamp (#1267)

Signed-off-by: vamsi-amazon <reddyvam@amazon.com>

Signed-off-by: vamsi-amazon <reddyvam@amazon.com>
  • Loading branch information
vmmusings authored Jan 11, 2023
1 parent 9e67e5b commit 7554fcc
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 14 deletions.
1 change: 1 addition & 0 deletions docs/user/ppl/admin/prometheus_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ Prometheus Connector Limitations
* Only one aggregation is supported in stats command.
* Span Expression is compulsory in stats command.
* AVG, MAX, MIN, SUM, COUNT are the only aggregations supported in prometheus connector.
* Where clause only supports EQUALS(=) operation on metric dimensions and Comparative(> , < , >= , <=) Operations on @timestamp attribute.

Example queries
---------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifySchema;

import java.text.SimpleDateFormat;
import java.util.Date;
import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONArray;
Expand Down Expand Up @@ -42,6 +44,27 @@ public void testSourceMetricCommand() {
}
}

@Test
@SneakyThrows
public void testSourceMetricCommandWithTimestamp() {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String query = "source=my_prometheus.prometheus_http_requests_total | where @timestamp > '"
+ format.format(new Date(System.currentTimeMillis() - 3600 * 1000))
+ "' | sort + @timestamp | head 5";

JSONObject response =
executeQuery(query);
verifySchema(response,
schema(VALUE, "double"),
schema(TIMESTAMP, "timestamp"),
schema("handler", "string"),
schema("code", "string"),
schema("instance", "string"),
schema("job", "string"));
// <TODO>Currently, data is not injected into prometheus,
// so asserting on result is not possible. Verifying only schema.
}

@Test
@SneakyThrows
public void testMetricAvgAggregationCommand() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.expression.ExpressionNodeVisitor;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.function.BuiltinFunctionName;

/**
* This class builds metric selection query from the filter condition
Expand Down Expand Up @@ -47,23 +48,25 @@ public static String build(String metricName, Expression filterCondition) {
static class SeriesSelectionExpressionNodeVisitor extends ExpressionNodeVisitor<String, Object> {
@Override
public String visitFunction(FunctionExpression func, Object context) {
if (func.getFunctionName().getFunctionName().equals("and")) {
if (BuiltinFunctionName.AND.getName().equals(func.getFunctionName())) {
return func.getArguments().stream()
.map(arg -> visitFunction((FunctionExpression) arg, context))
.filter(StringUtils::isNotEmpty)
.collect(Collectors.joining(" , "));
} else if (func.getFunctionName().getFunctionName().contains("=")) {
ReferenceExpression ref = (ReferenceExpression) func.getArguments().get(0);
if (!ref.getAttr().equals(TIMESTAMP)) {
return func.getArguments().get(0)
+ func.getFunctionName().getFunctionName()
+ func.getArguments().get(1);
} else {
return null;
}
} else if ((BuiltinFunctionName.LTE.getName().equals(func.getFunctionName())
|| BuiltinFunctionName.GTE.getName().equals(func.getFunctionName())
|| BuiltinFunctionName.LESS.getName().equals(func.getFunctionName())
|| BuiltinFunctionName.GREATER.getName().equals(func.getFunctionName()))
&& ((ReferenceExpression) func.getArguments().get(0)).getAttr().equals(TIMESTAMP)) {
return null;
} else if (BuiltinFunctionName.EQUAL.getName().equals(func.getFunctionName())) {
return func.getArguments().get(0)
+ func.getFunctionName().getFunctionName()
+ func.getArguments().get(1);
} else {
throw new RuntimeException(
String.format("Prometheus Catalog doesn't support %s in where command.",
String.format("Prometheus Datasource doesn't support %s "
+ "in where command.",
func.getFunctionName().getFunctionName()));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

package org.opensearch.sql.prometheus.storage.querybuilder;

import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;

import java.util.Date;
import lombok.NoArgsConstructor;
import org.apache.commons.math3.util.Pair;
Expand All @@ -15,6 +17,7 @@
import org.opensearch.sql.expression.ExpressionNodeVisitor;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.ReferenceExpression;
import org.opensearch.sql.expression.function.BuiltinFunctionName;

@NoArgsConstructor
public class TimeRangeParametersResolver extends ExpressionNodeVisitor<Void, Object> {
Expand Down Expand Up @@ -53,10 +56,13 @@ public Pair<Long, Long> resolve(Expression filterCondition) {

@Override
public Void visitFunction(FunctionExpression func, Object context) {
if (func.getFunctionName().getFunctionName().contains("=")) {
if ((BuiltinFunctionName.LTE.getName().equals(func.getFunctionName())
|| BuiltinFunctionName.GTE.getName().equals(func.getFunctionName())
|| BuiltinFunctionName.LESS.getName().equals(func.getFunctionName())
|| BuiltinFunctionName.GREATER.getName().equals(func.getFunctionName()))) {
ReferenceExpression ref = (ReferenceExpression) func.getArguments().get(0);
Expression rightExpr = func.getArguments().get(1);
if (ref.getAttr().equals("@timestamp")) {
if (ref.getAttr().equals(TIMESTAMP)) {
ExprValue literalValue = rightExpr.valueOf();
if (func.getFunctionName().getFunctionName().contains(">")) {
startTime = literalValue.timestampValue().toEpochMilli() / 1000;
Expand All @@ -67,6 +73,8 @@ public Void visitFunction(FunctionExpression func, Object context) {
}
} else {
func.getArguments()
.stream()
.filter(arg -> arg instanceof FunctionExpression)
.forEach(arg -> visitFunction((FunctionExpression) arg, context));
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,8 @@ void testImplementWithORConditionInWhereClause() {
DSL.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))));
RuntimeException exception
= assertThrows(RuntimeException.class, () -> prometheusMetricTable.implement(plan));
assertEquals("Prometheus Catalog doesn't support or in where command.", exception.getMessage());
assertEquals("Prometheus Datasource doesn't support or in where command.",
exception.getMessage());
}

@Test
Expand Down Expand Up @@ -753,6 +754,66 @@ void testImplementWithRelationAndTimestampFilter() {
assertEquals(List.of(VALUE, TIMESTAMP), outputFields);
}


@Test
void testImplementWithRelationAndTimestampLTFilter() {
List<NamedExpression> finalProjectList = new ArrayList<>();
finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING)));
finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP)));
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Long endTime = new Date(System.currentTimeMillis()).getTime();
PrometheusMetricTable prometheusMetricTable =
new PrometheusMetricTable(client, "prometheus_http_total_requests");
LogicalPlan logicalPlan = project(indexScan("prometheus_http_total_requests",
DSL.less(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP),
DSL.literal(
fromObjectValue(dateFormat.format(new Date(endTime)),
ExprCoreType.TIMESTAMP)))
), finalProjectList, null);
PhysicalPlan physicalPlan = prometheusMetricTable.implement(logicalPlan);
assertTrue(physicalPlan instanceof ProjectOperator);
assertTrue(((ProjectOperator) physicalPlan).getInput() instanceof PrometheusMetricScan);
PrometheusQueryRequest request
= ((PrometheusMetricScan) ((ProjectOperator) physicalPlan).getInput()).getRequest();
assertEquals((3600 / 250) + "s", request.getStep());
assertEquals("prometheus_http_total_requests",
request.getPromQl());
List<NamedExpression> projectList = ((ProjectOperator) physicalPlan).getProjectList();
List<String> outputFields
= projectList.stream().map(NamedExpression::getName).collect(Collectors.toList());
assertEquals(List.of(VALUE, TIMESTAMP), outputFields);
}


@Test
void testImplementWithRelationAndTimestampGTFilter() {
List<NamedExpression> finalProjectList = new ArrayList<>();
finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING)));
finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP)));
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Long endTime = new Date(System.currentTimeMillis()).getTime();
PrometheusMetricTable prometheusMetricTable =
new PrometheusMetricTable(client, "prometheus_http_total_requests");
LogicalPlan logicalPlan = project(indexScan("prometheus_http_total_requests",
DSL.greater(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP),
DSL.literal(
fromObjectValue(dateFormat.format(new Date(endTime)),
ExprCoreType.TIMESTAMP)))
), finalProjectList, null);
PhysicalPlan physicalPlan = prometheusMetricTable.implement(logicalPlan);
assertTrue(physicalPlan instanceof ProjectOperator);
assertTrue(((ProjectOperator) physicalPlan).getInput() instanceof PrometheusMetricScan);
PrometheusQueryRequest request
= ((PrometheusMetricScan) ((ProjectOperator) physicalPlan).getInput()).getRequest();
assertEquals((3600 / 250) + "s", request.getStep());
assertEquals("prometheus_http_total_requests",
request.getPromQl());
List<NamedExpression> projectList = ((ProjectOperator) physicalPlan).getProjectList();
List<String> outputFields
= projectList.stream().map(NamedExpression::getName).collect(Collectors.toList());
assertEquals(List.of(VALUE, TIMESTAMP), outputFields);
}

@Test
void testOptimize() {
PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest();
Expand Down Expand Up @@ -803,4 +864,41 @@ void testImplementPrometheusQueryWithBackQuotedFieldNamesInStatsQuery() {
prometheusQueryRequest.getPromQl());

}

@Test
void testImplementPrometheusQueryWithFilterQuery() {

PrometheusMetricTable prometheusMetricTable =
new PrometheusMetricTable(client, "prometheus_http_total_requests");

// IndexScanAgg without Filter
PhysicalPlan plan = prometheusMetricTable.implement(
indexScan("prometheus_http_total_requests",
DSL.and(DSL.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))),
DSL.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))))));

assertTrue(plan instanceof PrometheusMetricScan);
PrometheusQueryRequest prometheusQueryRequest =
((PrometheusMetricScan) plan).getRequest();
assertEquals(
"prometheus_http_total_requests{code=\"200\" , handler=\"/ready/\"}",
prometheusQueryRequest.getPromQl());
}

@Test
void testImplementPrometheusQueryWithUnsupportedFilterQuery() {

PrometheusMetricTable prometheusMetricTable =
new PrometheusMetricTable(client, "prometheus_http_total_requests");

RuntimeException exception = assertThrows(RuntimeException.class,
() -> prometheusMetricTable.implement(indexScan("prometheus_http_total_requests",
DSL.and(DSL.lte(DSL.ref("code", STRING), DSL.literal(stringValue("200"))),
DSL.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))))));
assertEquals("Prometheus Datasource doesn't support <= in where command.",
exception.getMessage());
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.prometheus.storage.querybuilders;

import static org.opensearch.sql.data.model.ExprValueUtils.stringValue;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;

import org.apache.commons.math3.util.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.expression.DSL;
import org.opensearch.sql.prometheus.storage.querybuilder.TimeRangeParametersResolver;

public class TimeRangeParametersResolverTest {

@Test
void testTimeRangeParametersWithoutTimestampFilter() {
TimeRangeParametersResolver timeRangeParametersResolver = new TimeRangeParametersResolver();
Pair<Long, Long> result = timeRangeParametersResolver.resolve(
DSL.and(DSL.less(DSL.ref("code", STRING), DSL.literal(stringValue("200"))),
DSL.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))));
Assertions.assertNotNull(result);
Assertions.assertEquals(3600, result.getSecond() - result.getFirst());
}
}

0 comments on commit 7554fcc

Please sign in to comment.