Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Prometheus Query Exemplars (#1782) #1905

Merged
merged 1 commit into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public Pair<FunctionSignature, FunctionBuilder> resolve(
FunctionSignature unresolvedSignature) {
FunctionName functionName = FunctionName.of("query_range");
FunctionSignature functionSignature =
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, LONG));
new FunctionSignature(functionName, List.of(STRING, LONG, LONG, STRING));
return Pair.of(
functionSignature,
(functionProperties, args) ->
Expand Down
79 changes: 73 additions & 6 deletions docs/user/ppl/admin/prometheus_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,11 @@ PromQL Support for prometheus Connector

`query_range` Table Function
----------------------------
Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
Arguments should be either passed by name or positionArguments should be either passed by name or position.
`source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
or
`source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
* Prometheus connector offers `query_range` table function. This table function can be used to query metrics in a specific time range using promQL.
* The function takes inputs similar to parameters mentioned for query range api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
* Arguments should be either passed by name or positionArguments should be either passed by name or position.
- `source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)`
- `source=my_prometheus.query_range(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130, step=14)`
Example::

> source=my_prometheus.query_range('prometheus_http_requests_total', 1686694425, 1686700130, 14)
Expand All @@ -210,3 +209,71 @@ Example::
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus |
| 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus |
+------------+------------------------+--------------------------------+---------------+-------------+-------------+


Prometheus Connector Table Functions
==========================================

`query_exemplars` Table Function
----------------------------
* This table function can be used to fetch exemplars of a query in a specific time range.
* The function takes inputs similar to parameters mentioned for query exemplars api mentioned here: https://prometheus.io/docs/prometheus/latest/querying/api/
* Arguments should be either passed by name or positionArguments should be either passed by name or position.
- `source=my_prometheus.query_exemplars('prometheus_http_requests_total', 1686694425, 1686700130)`
- `source=my_prometheus.query_exemplars(query='prometheus_http_requests_total', starttime=1686694425, endtime=1686700130)`
Example::

> source=my_prometheus.query_exemplars('prometheus_http_requests_total', 1686694425, 1686700130)
"schema": [
{
"name": "seriesLabels",
"type": "struct"
},
{
"name": "exemplars",
"type": "array"
}
],
"datarows": [
[
{
"instance": "localhost:8090",
"__name__": "test_exemplar_metric_total",
"service": "bar",
"job": "prometheus"
},
[
{
"labels": {
"traceID": "EpTxMJ40fUus7aGY"
},
"timestamp": "2020-09-14 15:22:25.479",
"value": 6.0
}
]
],
[
{
"instance": "localhost:8090",
"__name__": "test_exemplar_metric_total",
"service": "foo",
"job": "prometheus"
},
[
{
"labels": {
"traceID": "Olp9XHlq763ccsfa"
},
"timestamp": "2020-09-14 15:22:35.479",
"value": 19.0
},
{
"labels": {
"traceID": "hCtjygkIHwAN9vs4"
},
"timestamp": "2020-09-14 15:22:45.489",
"value": 20.0
}
]
]
]
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ public void testQueryRange() {
long currentTimestamp = new Date().getTime();
JSONObject response =
executeQuery("source=my_prometheus.query_range('prometheus_http_requests_total',"
+ ((currentTimestamp/1000)-3600) + "," + currentTimestamp/1000 + ", " + 14 + ")" );
+ ((currentTimestamp/1000)-3600) + "," + currentTimestamp/1000 + ", " + "'14'" + ")" );
verifySchema(response,
schema(LABELS, "struct"),
schema(VALUE, "array"),
schema(TIMESTAMP, "array"),
schema(LABELS, "struct"));
schema(TIMESTAMP, "array"));
Assertions.assertTrue(response.getInt("size") > 0);
}

Expand All @@ -249,8 +249,19 @@ public void explainQueryRange() throws Exception {
);
}

@Test
public void testExplainForQueryExemplars() throws Exception {
String expected = loadFromFile("expectedOutput/ppl/explain_query_exemplars.json");
assertJsonEquals(
expected,
explainQueryToString("source = my_prometheus."
+ "query_exemplars('app_ads_ad_requests_total',1689228292,1689232299)")
);
}

String loadFromFile(String filename) throws Exception {
URI uri = Resources.getResource(filename).toURI();
return new String(Files.readAllBytes(Paths.get(uri)));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"root": {
"name": "QueryExemplarsFunctionTableScanOperator",
"description": {
"request": "query_exemplars(app_ads_ad_requests_total, 1689228292, 1689232299)"
},
"children": []
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.sql.prometheus.request.system.model.MetricMetadata;

Expand All @@ -18,4 +19,6 @@ public interface PrometheusClient {
List<String> getLabels(String metricName) throws IOException;

Map<String, List<MetricMetadata>> getAllMetrics() throws IOException;

JSONArray queryExemplars(String query, Long start, Long end) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,20 @@ public Map<String, List<MetricMetadata>> getAllMetrics() throws IOException {
return new ObjectMapper().readValue(jsonObject.getJSONObject("data").toString(), typeRef);
}

@Override
public JSONArray queryExemplars(String query, Long start, Long end) throws IOException {
String queryUrl = String.format("%s/api/v1/query_exemplars?query=%s&start=%s&end=%s",
uri.toString().replaceAll("/$", ""), URLEncoder.encode(query, StandardCharsets.UTF_8),
start, end);
logger.debug("queryUrl: " + queryUrl);
Request request = new Request.Builder()
.url(queryUrl)
.build();
Response response = this.okHttpClient.newCall(request).execute();
JSONObject jsonObject = readResponse(response);
return jsonObject.getJSONArray("data");
}

private List<String> toListOfLabels(JSONArray array) {
List<String> result = new ArrayList<>();
for (int i = 0; i < array.length(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,15 @@ public class PrometheusFieldConstants {
public static final String TIMESTAMP = "@timestamp";
public static final String VALUE = "@value";
public static final String LABELS = "@labels";
public static final String MATRIX_KEY = "matrix";
public static final String RESULT_TYPE_KEY = "resultType";
public static final String METRIC_KEY = "metric";
public static final String RESULT_KEY = "result";
public static final String VALUES_KEY = "values";
public static final String SERIES_LABELS_KEY = "seriesLabels";
public static final String EXEMPLARS_KEY = "exemplars";
public static final String TRACE_ID_KEY = "traceID";
public static final String LABELS_KEY = "labels";
public static final String TIMESTAMP_KEY = "timestamp";
public static final String VALUE_KEY = "value";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.prometheus.functions.implementation;

import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.ENDTIME;
import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.QUERY;
import static org.opensearch.sql.prometheus.functions.resolver.QueryRangeTableFunctionResolver.STARTTIME;

import java.util.List;
import java.util.stream.Collectors;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.NamedArgumentExpression;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.TableFunctionImplementation;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.request.PrometheusQueryExemplarsRequest;
import org.opensearch.sql.prometheus.storage.QueryExemplarsTable;
import org.opensearch.sql.storage.Table;

public class QueryExemplarFunctionImplementation extends FunctionExpression implements
TableFunctionImplementation {

private final FunctionName functionName;
private final List<Expression> arguments;
private final PrometheusClient prometheusClient;

/**
* Required argument constructor.
*
* @param functionName name of the function
* @param arguments a list of arguments provided
*/
public QueryExemplarFunctionImplementation(FunctionName functionName, List<Expression> arguments,
PrometheusClient prometheusClient) {
super(functionName, arguments);
this.functionName = functionName;
this.arguments = arguments;
this.prometheusClient = prometheusClient;
}

@Override
public ExprValue valueOf(Environment<Expression, ExprValue> valueEnv) {
throw new UnsupportedOperationException(String.format(
"Prometheus defined function [%s] is only "
+ "supported in SOURCE clause with prometheus connector catalog",
functionName));
}

@Override
public ExprType type() {
return ExprCoreType.STRUCT;
}

@Override
public String toString() {
List<String> args = arguments.stream()
.map(arg -> String.format("%s=%s", ((NamedArgumentExpression) arg)
.getArgName(), ((NamedArgumentExpression) arg).getValue().toString()))
.collect(Collectors.toList());
return String.format("%s(%s)", functionName, String.join(", ", args));
}

@Override
public Table applyArguments() {
return new QueryExemplarsTable(prometheusClient, buildExemplarsQueryRequest(arguments));
}

private PrometheusQueryExemplarsRequest buildExemplarsQueryRequest(List<Expression> arguments) {

PrometheusQueryExemplarsRequest request = new PrometheusQueryExemplarsRequest();
arguments.forEach(arg -> {
String argName = ((NamedArgumentExpression) arg).getArgName();
Expression argValue = ((NamedArgumentExpression) arg).getValue();
ExprValue literalValue = argValue.valueOf();
switch (argName) {
case QUERY:
request
.setQuery((String) literalValue.value());
break;
case STARTTIME:
request.setStartTime(((Number) literalValue.value()).longValue());
break;
case ENDTIME:
request.setEndTime(((Number) literalValue.value()).longValue());
break;
default:
throw new ExpressionEvaluationException(
String.format("Invalid Function Argument:%s", argName));
}
});
return request;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.prometheus.functions.resolver;

import static org.opensearch.sql.data.type.ExprCoreType.LONG;
import static org.opensearch.sql.data.type.ExprCoreType.STRING;
import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.getNamedArgumentsOfTableFunction;
import static org.opensearch.sql.prometheus.utils.TableFunctionUtils.validatePrometheusTableFunctionArguments;

import java.util.List;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.function.FunctionBuilder;
import org.opensearch.sql.expression.function.FunctionName;
import org.opensearch.sql.expression.function.FunctionResolver;
import org.opensearch.sql.expression.function.FunctionSignature;
import org.opensearch.sql.prometheus.client.PrometheusClient;
import org.opensearch.sql.prometheus.functions.implementation.QueryExemplarFunctionImplementation;

/**
* This class is for query_exemplars table function resolver {@link FunctionResolver}.
* It takes care of validating function arguments and also creating
* required {@link org.opensearch.sql.expression.function.TableFunctionImplementation} Class.
*/
@RequiredArgsConstructor
public class QueryExemplarsTableFunctionResolver implements FunctionResolver {

private final PrometheusClient prometheusClient;
public static final String QUERY_EXEMPLARS = "query_exemplars";

public static final String QUERY = "query";
public static final String STARTTIME = "starttime";
public static final String ENDTIME = "endtime";

@Override
public Pair<FunctionSignature, FunctionBuilder> resolve(FunctionSignature unresolvedSignature) {
final FunctionName functionName = FunctionName.of(QUERY_EXEMPLARS);
FunctionSignature functionSignature =
new FunctionSignature(FunctionName.of(QUERY_EXEMPLARS), List.of(STRING, LONG, LONG));
FunctionBuilder functionBuilder = (functionProperties, arguments) -> {
final List<String> argumentNames = List.of(QUERY, STARTTIME, ENDTIME);
validatePrometheusTableFunctionArguments(arguments, argumentNames);
List<Expression> namedArguments = getNamedArgumentsOfTableFunction(arguments, argumentNames);
return new QueryExemplarFunctionImplementation(functionName,
namedArguments, prometheusClient);
};
return Pair.of(functionSignature, functionBuilder);
}

@Override
public FunctionName getFunctionName() {
return FunctionName.of(QUERY_EXEMPLARS);
}
}
Loading