From 77b6bb44cbb5b9ffc0c46de99e54816b9c496380 Mon Sep 17 00:00:00 2001 From: Vamsi Manohar Date: Thu, 13 Jul 2023 16:49:37 -0700 Subject: [PATCH] Change query range response structure Signed-off-by: Vamsi Manohar --- .../org/opensearch/sql/ppl/ExplainIT.java | 10 +++ .../ppl/explain_query_range.json | 9 +++ ...faultQueryRangeFunctionResponseHandle.java | 75 +++++++++---------- .../response/PrometheusResponse.java | 25 +------ .../storage/PrometheusMetricScan.java | 7 +- .../storage/PrometheusMetricTable.java | 4 - ...eryRangeFunctionTableScanOperatorTest.java | 36 ++++++--- .../storage/PrometheusMetricScanTest.java | 33 -------- .../storage/PrometheusMetricTableTest.java | 26 ------- 9 files changed, 83 insertions(+), 142 deletions(-) create mode 100644 integ-test/src/test/resources/expectedOutput/ppl/explain_query_range.json diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java index 1a785e9074..d41b2cde8a 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java @@ -81,6 +81,16 @@ public void testSortPushDownExplain() throws Exception { ); } + @Test + public void explainQueryRange() throws Exception { + String expected = loadFromFile("expectedOutput/ppl/explain_query_range.json"); + assertJsonEquals( + expected, + explainQueryToString("source = my_prometheus" + + ".query_range('prometheus_http_requests_total',1689281439,1689291439,14)") + ); + } + String loadFromFile(String filename) throws Exception { URI uri = Resources.getResource(filename).toURI(); return new String(Files.readAllBytes(Paths.get(uri))); diff --git a/integ-test/src/test/resources/expectedOutput/ppl/explain_query_range.json b/integ-test/src/test/resources/expectedOutput/ppl/explain_query_range.json new file mode 100644 index 0000000000..bbc00e0c43 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/ppl/explain_query_range.json @@ -0,0 +1,9 @@ +{ + "root": { + "name": "QueryRangeFunctionTableScanOperator", + "description": { + "request": "query_range(prometheus_http_requests_total, 1689281439, 1689291439, 14)" + }, + "children": [] + } +} \ No newline at end of file diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java index 7f261360f7..d4353d2f99 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/response/DefaultQueryRangeFunctionResponseHandle.java @@ -5,6 +5,8 @@ package org.opensearch.sql.prometheus.functions.response; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; import java.time.Instant; @@ -12,9 +14,9 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; -import org.jetbrains.annotations.NotNull; import org.json.JSONArray; import org.json.JSONObject; +import org.opensearch.sql.data.model.ExprCollectionValue; import org.opensearch.sql.data.model.ExprDoubleValue; import org.opensearch.sql.data.model.ExprStringValue; import org.opensearch.sql.data.model.ExprTimestampValue; @@ -22,7 +24,6 @@ import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.executor.ExecutionEngine; -import org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants; /** * Default implementation of QueryRangeFunctionResponseHandle. @@ -40,63 +41,61 @@ public class DefaultQueryRangeFunctionResponseHandle implements QueryRangeFuncti */ public DefaultQueryRangeFunctionResponseHandle(JSONObject responseObject) { this.responseObject = responseObject; - constructIteratorAndSchema(); + constructSchema(); + constructIterator(); } - private void constructIteratorAndSchema() { + private void constructIterator() { List result = new ArrayList<>(); - List columnList = new ArrayList<>(); if ("matrix".equals(responseObject.getString("resultType"))) { JSONArray itemArray = responseObject.getJSONArray("result"); for (int i = 0; i < itemArray.length(); i++) { + LinkedHashMap linkedHashMap = new LinkedHashMap<>(); JSONObject item = itemArray.getJSONObject(i); - JSONObject metric = item.getJSONObject("metric"); - JSONArray values = item.getJSONArray("values"); - if (i == 0) { - columnList = getColumnList(metric); - } - for (int j = 0; j < values.length(); j++) { - LinkedHashMap linkedHashMap = - extractRow(metric, values.getJSONArray(j), columnList); - result.add(new ExprTupleValue(linkedHashMap)); - } + linkedHashMap.put(LABELS, extractLabels(item.getJSONObject("metric"))); + extractTimestampAndValues(item.getJSONArray("values"), linkedHashMap); + result.add(new ExprTupleValue(linkedHashMap)); } } else { throw new RuntimeException(String.format("Unexpected Result Type: %s during Prometheus " + "Response Parsing. 'matrix' resultType is expected", responseObject.getString("resultType"))); } - this.schema = new ExecutionEngine.Schema(columnList); this.responseIterator = result.iterator(); } - @NotNull - private static LinkedHashMap extractRow(JSONObject metric, - JSONArray values, List columnList) { - LinkedHashMap linkedHashMap = new LinkedHashMap<>(); - for (ExecutionEngine.Schema.Column column : columnList) { - if (PrometheusFieldConstants.TIMESTAMP.equals(column.getName())) { - linkedHashMap.put(PrometheusFieldConstants.TIMESTAMP, - new ExprTimestampValue(Instant.ofEpochMilli((long) (values.getDouble(0) * 1000)))); - } else if (column.getName().equals(VALUE)) { - linkedHashMap.put(VALUE, new ExprDoubleValue(values.getDouble(1))); - } else { - linkedHashMap.put(column.getName(), - new ExprStringValue(metric.getString(column.getName()))); - } + private static void extractTimestampAndValues(JSONArray values, + LinkedHashMap linkedHashMap) { + List timestampList = new ArrayList<>(); + List valueList = new ArrayList<>(); + for (int j = 0; j < values.length(); j++) { + JSONArray value = values.getJSONArray(j); + timestampList.add(new ExprTimestampValue( + Instant.ofEpochMilli((long) (value.getDouble(0) * 1000)))); + valueList.add(new ExprDoubleValue(value.getDouble(1))); } - return linkedHashMap; + linkedHashMap.put(TIMESTAMP, + new ExprCollectionValue(timestampList)); + linkedHashMap.put(VALUE, new ExprCollectionValue(valueList)); } + private void constructSchema() { + this.schema = new ExecutionEngine.Schema(getColumnList()); + } - private List getColumnList(JSONObject metric) { + private ExprValue extractLabels(JSONObject metric) { + LinkedHashMap labelsMap = new LinkedHashMap<>(); + metric.keySet().forEach(key + -> labelsMap.put(key, new ExprStringValue(metric.getString(key)))); + return new ExprTupleValue(labelsMap); + } + + + private List getColumnList() { List columnList = new ArrayList<>(); - columnList.add(new ExecutionEngine.Schema.Column(PrometheusFieldConstants.TIMESTAMP, - PrometheusFieldConstants.TIMESTAMP, ExprCoreType.TIMESTAMP)); - columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE)); - for (String key : metric.keySet()) { - columnList.add(new ExecutionEngine.Schema.Column(key, key, ExprCoreType.STRING)); - } + columnList.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY)); + columnList.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY)); + columnList.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT)); return columnList; } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java index 331605b1d5..bd9e36ccdc 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java @@ -35,8 +35,6 @@ public class PrometheusResponse implements Iterable { private final PrometheusResponseFieldNames prometheusResponseFieldNames; - private final Boolean isQueryRangeFunctionScan; - /** * Constructor. * @@ -46,11 +44,9 @@ public class PrometheusResponse implements Iterable { * and timestamp fieldName. */ public PrometheusResponse(JSONObject responseObject, - PrometheusResponseFieldNames prometheusResponseFieldNames, - Boolean isQueryRangeFunctionScan) { + PrometheusResponseFieldNames prometheusResponseFieldNames) { this.responseObject = responseObject; this.prometheusResponseFieldNames = prometheusResponseFieldNames; - this.isQueryRangeFunctionScan = isQueryRangeFunctionScan; } @NonNull @@ -70,24 +66,7 @@ public Iterator iterator() { new ExprTimestampValue(Instant.ofEpochMilli((long) (val.getDouble(0) * 1000)))); linkedHashMap.put(prometheusResponseFieldNames.getValueFieldName(), getValue(val, 1, prometheusResponseFieldNames.getValueType())); - // Concept: - // {\"instance\":\"localhost:9090\",\"__name__\":\"up\",\"job\":\"prometheus\"}" - // This is the label string in the prometheus response. - // Q: how do we map this to columns in a table. - // For queries like source = prometheus.metric_name | .... - // we can get the labels list in prior as we know which metric we are working on. - // In case of commands like source = prometheus.query_range('promQL'); - // Any arbitrary command can be written and we don't know the labels - // in the prometheus response in prior. - // So for PPL like commands...output structure is @value, @timestamp - // and each label is treated as a separate column where as in case of query_range - // function irrespective of promQL, the output structure is - // @value, @timestamp, @labels [jsonfied string of all the labels for a data point] - if (isQueryRangeFunctionScan) { - linkedHashMap.put(LABELS, new ExprStringValue(metric.toString())); - } else { - insertLabels(linkedHashMap, metric); - } + insertLabels(linkedHashMap, metric); result.add(new ExprTupleValue(linkedHashMap)); } } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java index 8611ae04f1..7f75cb3c07 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java @@ -40,10 +40,6 @@ public class PrometheusMetricScan extends TableScanOperator { private Iterator iterator; - @Setter - @Getter - private Boolean isQueryRangeFunctionScan = Boolean.FALSE; - @Setter private PrometheusResponseFieldNames prometheusResponseFieldNames; @@ -69,8 +65,7 @@ public void open() { JSONObject responseObject = prometheusClient.queryRange( request.getPromQl(), request.getStartTime(), request.getEndTime(), request.getStep()); - return new PrometheusResponse(responseObject, prometheusResponseFieldNames, - isQueryRangeFunctionScan).iterator(); + return new PrometheusResponse(responseObject, prometheusResponseFieldNames).iterator(); } catch (IOException e) { LOG.error(e.getMessage()); throw new RuntimeException("Error fetching data from prometheus server. " + e.getMessage()); diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java index a03d69bc41..b3b63327d0 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java @@ -97,10 +97,6 @@ public Map getFieldTypes() { public PhysicalPlan implement(LogicalPlan plan) { PrometheusMetricScan metricScan = new PrometheusMetricScan(prometheusClient); - if (prometheusQueryRequest != null) { - metricScan.setRequest(prometheusQueryRequest); - metricScan.setIsQueryRangeFunctionScan(Boolean.TRUE); - } return plan.accept(new PrometheusDefaultImplementor(), metricScan); } diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperatorTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperatorTest.java index 3aa992fc65..79da8b466c 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperatorTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/scan/QueryRangeFunctionTableScanOperatorTest.java @@ -15,6 +15,7 @@ import static org.opensearch.sql.prometheus.constants.TestConstants.QUERY; import static org.opensearch.sql.prometheus.constants.TestConstants.STARTTIME; import static org.opensearch.sql.prometheus.constants.TestConstants.STEP; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; import static org.opensearch.sql.prometheus.utils.TestUtils.getJson; @@ -22,6 +23,7 @@ import java.io.IOException; import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.LinkedHashMap; import lombok.SneakyThrows; import org.json.JSONObject; @@ -30,17 +32,19 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.data.model.ExprCollectionValue; import org.opensearch.sql.data.model.ExprDoubleValue; import org.opensearch.sql.data.model.ExprStringValue; import org.opensearch.sql.data.model.ExprTimestampValue; import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.prometheus.client.PrometheusClient; import org.opensearch.sql.prometheus.request.PrometheusQueryRequest; @ExtendWith(MockitoExtension.class) -public class QueryRangeFunctionTableScanOperatorTest { +class QueryRangeFunctionTableScanOperatorTest { @Mock private PrometheusClient prometheusClient; @@ -61,22 +65,32 @@ void testQueryResponseIterator() { .thenReturn(new JSONObject(getJson("query_range_result.json"))); queryRangeFunctionTableScanOperator.open(); Assertions.assertTrue(queryRangeFunctionTableScanOperator.hasNext()); - ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{ - put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); - put(VALUE, new ExprDoubleValue(1)); + LinkedHashMap labelsMap = new LinkedHashMap<>() {{ put("instance", new ExprStringValue("localhost:9090")); put("__name__", new ExprStringValue("up")); put("job", new ExprStringValue("prometheus")); + }}; + ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{ + put(LABELS, new ExprTupleValue(labelsMap)); + put(TIMESTAMP, new ExprCollectionValue(Collections + .singletonList(new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))))); + put(VALUE, new ExprCollectionValue(Collections.singletonList(new ExprDoubleValue(1)))); } }); + assertEquals(firstRow, queryRangeFunctionTableScanOperator.next()); Assertions.assertTrue(queryRangeFunctionTableScanOperator.hasNext()); - ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{ - put("@timestamp", new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); - put("@value", new ExprDoubleValue(0)); + + LinkedHashMap labelsMap2 = new LinkedHashMap<>() {{ put("instance", new ExprStringValue("localhost:9091")); put("__name__", new ExprStringValue("up")); put("job", new ExprStringValue("node")); + }}; + ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{ + put(LABELS, new ExprTupleValue(labelsMap2)); + put(TIMESTAMP, new ExprCollectionValue(Collections + .singletonList(new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))))); + put(VALUE, new ExprCollectionValue(Collections.singletonList(new ExprDoubleValue(0)))); } }); assertEquals(secondRow, queryRangeFunctionTableScanOperator.next()); @@ -120,11 +134,9 @@ void testQuerySchema() { .thenReturn(new JSONObject(getJson("query_range_result.json"))); queryRangeFunctionTableScanOperator.open(); ArrayList columns = new ArrayList<>(); - columns.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.TIMESTAMP)); - columns.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.DOUBLE)); - columns.add(new ExecutionEngine.Schema.Column("instance", "instance", ExprCoreType.STRING)); - columns.add(new ExecutionEngine.Schema.Column("__name__", "__name__", ExprCoreType.STRING)); - columns.add(new ExecutionEngine.Schema.Column("job", "job", ExprCoreType.STRING)); + columns.add(new ExecutionEngine.Schema.Column(TIMESTAMP, TIMESTAMP, ExprCoreType.ARRAY)); + columns.add(new ExecutionEngine.Schema.Column(VALUE, VALUE, ExprCoreType.ARRAY)); + columns.add(new ExecutionEngine.Schema.Column(LABELS, LABELS, ExprCoreType.STRUCT)); ExecutionEngine.Schema expectedSchema = new ExecutionEngine.Schema(columns); assertEquals(expectedSchema, queryRangeFunctionTableScanOperator.schema()); } diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScanTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScanTest.java index cb70e9e064..68e03c758c 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScanTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScanTest.java @@ -209,39 +209,6 @@ void testQueryResponseIteratorWithGivenPrometheusResponseWithBackQuotedFieldName Assertions.assertFalse(prometheusMetricScan.hasNext()); } - @Test - @SneakyThrows - void testQueryResponseIteratorForQueryRangeFunction() { - PrometheusMetricScan prometheusMetricScan = new PrometheusMetricScan(prometheusClient); - prometheusMetricScan.setIsQueryRangeFunctionScan(Boolean.TRUE); - prometheusMetricScan.getRequest().setPromQl(QUERY); - prometheusMetricScan.getRequest().setStartTime(STARTTIME); - prometheusMetricScan.getRequest().setEndTime(ENDTIME); - prometheusMetricScan.getRequest().setStep(STEP); - - when(prometheusClient.queryRange(any(), any(), any(), any())) - .thenReturn(new JSONObject(getJson("query_range_result.json"))); - prometheusMetricScan.open(); - Assertions.assertTrue(prometheusMetricScan.hasNext()); - ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{ - put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); - put(VALUE, new ExprLongValue(1)); - put(LABELS, new ExprStringValue( - "{\"instance\":\"localhost:9090\",\"__name__\":\"up\",\"job\":\"prometheus\"}")); - } - }); - assertEquals(firstRow, prometheusMetricScan.next()); - Assertions.assertTrue(prometheusMetricScan.hasNext()); - ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{ - put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); - put(VALUE, new ExprLongValue(0)); - put(LABELS, new ExprStringValue( - "{\"instance\":\"localhost:9091\",\"__name__\":\"up\",\"job\":\"node\"}")); - } - }); - assertEquals(secondRow, prometheusMetricScan.next()); - Assertions.assertFalse(prometheusMetricScan.hasNext()); - } @Test @SneakyThrows diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java index de95b2bd64..d43c38fc68 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java @@ -111,32 +111,6 @@ void testGetFieldTypesFromPrometheusQueryRequest() { assertNull(prometheusMetricTable.getMetricName()); } - @Test - void testImplementWithQueryRangeFunction() { - PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); - prometheusQueryRequest.setPromQl("test"); - prometheusQueryRequest.setStep("15m"); - PrometheusMetricTable prometheusMetricTable = - new PrometheusMetricTable(client, prometheusQueryRequest); - List finalProjectList = new ArrayList<>(); - finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); - finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); - PhysicalPlan plan = prometheusMetricTable.implement( - project(relation("query_range", prometheusMetricTable), - finalProjectList, null)); - - - assertTrue(plan instanceof ProjectOperator); - List projectList = ((ProjectOperator) plan).getProjectList(); - List outputFields - = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); - assertEquals(List.of(VALUE, TIMESTAMP), outputFields); - assertTrue(((ProjectOperator) plan).getInput() instanceof PrometheusMetricScan); - PrometheusMetricScan prometheusMetricScan = - (PrometheusMetricScan) ((ProjectOperator) plan).getInput(); - assertEquals(prometheusQueryRequest, prometheusMetricScan.getRequest()); - } - @Test void testImplementWithBasicMetricQuery() { PrometheusMetricTable prometheusMetricTable =