Skip to content

Commit

Permalink
Support increase/rate function in the MQE query language. (#11591)
Browse files Browse the repository at this point in the history
  • Loading branch information
wankai123 authored Nov 27, 2023
1 parent f964df9 commit 075b08c
Show file tree
Hide file tree
Showing 29 changed files with 457 additions and 41 deletions.
40 changes: 40 additions & 0 deletions docs/en/api/metrics-query-expression.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,46 @@ view_as_seq(not_existing, service_cpm)
#### Result Type
The result type is determined by the type of selected not-null metric expression.

## Trend Operation
Trend Operation takes an expression and performs a trend calculation on its results.

Expression:
```text
<Trend-Operator>(Metrics Expression, time_range)
```

`time_range` is the positive int of the calculated range. The unit will automatically align with to the query [Step](../../../oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/enumeration/Step.java),
for example, if the query Step is `MINUTE`, the unit of `time_range` is `minute`.


| Operator | Definition | ExpressionResultType |
|----------|---------------------------------------------------------------------------------------|-------------------------------|
| increase | returns the increase in the time range in the time series | TIME_SERIES_VALUES |
| rate | returns the per-second average rate of increase in the time range in the time series | TIME_SERIES_VALUES |

For example:
If we want to query the increase value of the `service_cpm` metric in 2 minute(assume the query Step is MINUTE),
we can use the following expression:

```text
increase(service_cpm, 2)
```

If the query duration is 3 minutes, from (T1 to T3) and the metric has values in time series:
```text
V(T1-2), V(T1-1), V(T1), V(T2), V(T3)
```
then the expression result is:
```text
V(T1)-V(T1-2), V(T2)-V(T1-1), V(T3)-V(T1)
```

**Note**:
* If the calculated metric value is empty, the result will be empty. Assume in the T3 point, the increase value = V(T3)-V(T1), If the metric V(T3) or V(T1) is empty, the result value in T3 will be empty.

### Result Type
TIME_SERIES_VALUES.

## Expression Query Example
### Labeled Value Metrics
```text
Expand Down
1 change: 1 addition & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
* Replace Metrics v2 protocol with MQE in UI templates and E2E Test.
* Fix incorrect apisix metrics otel rules.
* Support `Scratch The OAP Config Dump`.
* Support `increase/rate` function in the `MQE` query language.
* Group service endpoints into `_abandoned` when endpoints have high
cardinality.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ ABS: 'abs';
CEIL: 'ceil';
FLOOR: 'floor';
ROUND: 'round';
INCREASE: 'increase';
RATE: 'rate';

// TopN
TOP_N: 'top_n';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ expression
| aggregation L_PAREN expression R_PAREN # aggregationOp
| mathematical_operator0 L_PAREN expression R_PAREN #mathematicalOperator0OP
| mathematical_operator1 L_PAREN expression COMMA parameter R_PAREN #mathematicalOperator1OP
| trend L_PAREN metric COMMA INTEGER R_PAREN #trendOP
| logical_operator L_PAREN expressionList R_PAREN #logicalOperatorOP
| topN L_PAREN metric COMMA parameter COMMA order R_PAREN #topNOP
| topN L_PAREN metric COMMA INTEGER COMMA order R_PAREN #topNOP
| relabels L_PAREN expression COMMA label R_PAREN #relablesOP
| aggregateLabels L_PAREN expression COMMA aggregateLabelsFunc R_PAREN #aggregateLabelsOp
;
Expand Down Expand Up @@ -69,6 +70,9 @@ mathematical_operator0:
mathematical_operator1:
ROUND;

trend:
INCREASE | RATE;

topN: TOP_N;

logical_operator:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,24 @@
import org.apache.skywalking.mqe.rt.operation.CompareOp;
import org.apache.skywalking.mqe.rt.operation.LogicalFunctionOp;
import org.apache.skywalking.mqe.rt.operation.MathematicalFunctionOp;
import org.apache.skywalking.mqe.rt.operation.TrendOp;
import org.apache.skywalking.mqe.rt.type.ExpressionResult;
import org.apache.skywalking.mqe.rt.exception.IllegalExpressionException;
import org.apache.skywalking.mqe.rt.type.ExpressionResultType;
import org.apache.skywalking.mqe.rt.type.MQEValue;
import org.apache.skywalking.mqe.rt.type.MQEValues;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.query.enumeration.Step;
import org.apache.skywalking.oap.server.library.util.StringUtil;

@Slf4j
public abstract class MQEVisitorBase extends MQEParserBaseVisitor<ExpressionResult> {
public final static String GENERAL_LABEL_NAME = "_";
public final Step queryStep;

protected MQEVisitorBase(final Step queryStep) {
this.queryStep = queryStep;
}

@Override
public ExpressionResult visitParensOp(MQEParser.ParensOpContext ctx) {
Expand Down Expand Up @@ -250,6 +257,34 @@ public ExpressionResult visitCompareOp(MQEParser.CompareOpContext ctx) {
}
}

@Override
public ExpressionResult visitTrendOP(MQEParser.TrendOPContext ctx) {
int opType = ctx.trend().getStart().getType();
int trendRange = Integer.parseInt(ctx.INTEGER().getText());
if (trendRange < 1) {
ExpressionResult result = new ExpressionResult();
result.setType(ExpressionResultType.UNKNOWN);
result.setError("The trend range must be greater than 0.");
return result;
}
ExpressionResult expResult = visit(ctx.metric());
if (StringUtil.isNotEmpty(expResult.getError())) {
return expResult;
}
if (expResult.getType() != ExpressionResultType.TIME_SERIES_VALUES) {
expResult.setError("The result of expression [" + ctx.metric().getText() + "] is not a time series result.");
return expResult;
}
try {
return TrendOp.doTrendOp(expResult, opType, trendRange, queryStep);
} catch (IllegalExpressionException e) {
ExpressionResult result = new ExpressionResult();
result.setType(ExpressionResultType.UNKNOWN);
result.setError(e.getMessage());
return result;
}
}

@Override
public abstract ExpressionResult visitMetric(MQEParser.MetricContext ctx);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.mqe.rt.operation;

import java.util.ArrayList;
import java.util.List;
import org.apache.skywalking.mqe.rt.exception.IllegalExpressionException;
import org.apache.skywalking.mqe.rt.grammar.MQEParser;
import org.apache.skywalking.mqe.rt.type.ExpressionResult;
import org.apache.skywalking.mqe.rt.type.MQEValue;
import org.apache.skywalking.oap.server.core.query.enumeration.Step;

public class TrendOp {
public static ExpressionResult doTrendOp(ExpressionResult expResult,
int opType,
int trendRange,
Step step) throws IllegalExpressionException {
switch (opType) {
case MQEParser.INCREASE:
return TrendOp.calculateIncrease(expResult, trendRange);
case MQEParser.RATE:
return TrendOp.calculateRate(expResult, trendRange, step);
}

throw new IllegalExpressionException("Unsupported function.");
}

private static ExpressionResult calculateIncrease(ExpressionResult expResult, int trendRange) {
expResult.getResults().forEach(resultValues -> {
List<MQEValue> mqeValues = resultValues.getValues();
List<MQEValue> newMqeValues = new ArrayList<>();
for (int i = trendRange; i < mqeValues.size(); i++) {
MQEValue mqeValue = mqeValues.get(i);
//if the current value is empty, then the trend value is empty
if (mqeValue.isEmptyValue()) {
newMqeValues.add(mqeValue);
continue;
}
MQEValue newMqeValue = new MQEValue();
newMqeValue.setId(mqeValue.getId());

//if the previous value is empty, then the trend value is empty
if (mqeValues.get(i - trendRange).isEmptyValue()) {
newMqeValue.setEmptyValue(true);
newMqeValues.add(newMqeValue);
continue;
}

newMqeValue.setEmptyValue(mqeValue.isEmptyValue());
newMqeValue.setId(mqeValue.getId());
newMqeValue.setTraceID(mqeValue.getTraceID());
double newValue = mqeValue.getDoubleValue() - mqeValues.get(i - trendRange).getDoubleValue();
newMqeValue.setDoubleValue(newValue);
newMqeValues.add(newMqeValue);
}
resultValues.setValues(newMqeValues);
});
return expResult;
}

private static ExpressionResult calculateRate(ExpressionResult expResult, int trendRange, Step step) {
ExpressionResult result = calculateIncrease(expResult, trendRange);
long rangeSeconds;
switch (step) {
case SECOND:
rangeSeconds = trendRange;
break;
case MINUTE:
rangeSeconds = trendRange * 60;
break;
case HOUR:
rangeSeconds = trendRange * 3600;
break;
case DAY:
rangeSeconds = trendRange * 86400;
break;
default:
throw new IllegalArgumentException("Unsupported step: " + step);
}
result.getResults().forEach(resultValues -> {
resultValues.getValues().forEach(mqeValue -> {
if (!mqeValue.isEmptyValue()) {
double newValue = mqeValue.getDoubleValue() / rangeSeconds;
mqeValue.setDoubleValue(newValue);
}
});
});
return result;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.skywalking.mqe.rt;

import org.apache.skywalking.mqe.rt.grammar.MQEParser;
import org.apache.skywalking.mqe.rt.operation.TrendOp;
import org.apache.skywalking.mqe.rt.type.ExpressionResult;
import org.apache.skywalking.mqe.rt.type.ExpressionResultType;
import org.apache.skywalking.oap.server.core.query.enumeration.Step;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;

public class TrendOpTest {
private final MockData mockData = new MockData();

@Test
public void increaseNoLabeledTest() throws Exception {
ExpressionResult increase = TrendOp.doTrendOp(
mockData.newSeriesNoLabeledResult(100, 280), MQEParser.INCREASE, 1, Step.MINUTE);
assertEquals(ExpressionResultType.TIME_SERIES_VALUES, increase.getType());
assertEquals("300", increase.getResults().get(0).getValues().get(0).getId());
assertEquals(180, increase.getResults().get(0).getValues().get(0).getDoubleValue());
}

@Test
public void increaseLabeledTest() throws Exception {
ExpressionResult increase = TrendOp.doTrendOp(
mockData.newSeriesLabeledResult(100, 280, 100, 220), MQEParser.INCREASE, 1, Step.MINUTE);
assertEquals(ExpressionResultType.TIME_SERIES_VALUES, increase.getType());
assertEquals("300", increase.getResults().get(0).getValues().get(0).getId());
assertEquals(180, increase.getResults().get(0).getValues().get(0).getDoubleValue());
assertEquals("300", increase.getResults().get(1).getValues().get(0).getId());
assertEquals(120, increase.getResults().get(1).getValues().get(0).getDoubleValue());
}

@Test
public void rateNoLabeledTest() throws Exception {
ExpressionResult rate = TrendOp.doTrendOp(
mockData.newSeriesNoLabeledResult(100, 280), MQEParser.RATE, 1, Step.MINUTE);
assertEquals(ExpressionResultType.TIME_SERIES_VALUES, rate.getType());
assertEquals("300", rate.getResults().get(0).getValues().get(0).getId());
assertEquals(3, rate.getResults().get(0).getValues().get(0).getDoubleValue());
}

@Test
public void rateLabeledTest() throws Exception {
ExpressionResult rate = TrendOp.doTrendOp(
mockData.newSeriesLabeledResult(100, 280, 100, 220), MQEParser.RATE, 1, Step.MINUTE);
assertEquals(ExpressionResultType.TIME_SERIES_VALUES, rate.getType());
assertEquals("300", rate.getResults().get(0).getValues().get(0).getId());
assertEquals(3, rate.getResults().get(0).getValues().get(0).getDoubleValue());
assertEquals("300", rate.getResults().get(1).getValues().get(0).getId());
assertEquals(2, rate.getResults().get(1).getValues().get(0).getDoubleValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ public class AlarmRule {
private String message;
private Map<String, String> tags;
private Set<String> hooks;
private int maxTrendRange;

/**
* Init includeMetrics and verify the expression.
Expand All @@ -67,6 +68,7 @@ public void setExpression(final String expression) throws IllegalExpressionExcep
MQELexer lexer = new MQELexer(CharStreams.fromString(expression));
lexer.addErrorListener(new ParseErrorListener());
MQEParser parser = new MQEParser(new CommonTokenStream(lexer));
parser.addErrorListener(new ParseErrorListener());
ParseTree tree;
try {
tree = parser.expression();
Expand All @@ -88,6 +90,7 @@ public void setExpression(final String expression) throws IllegalExpressionExcep
verifyIncludeMetrics(visitor.getIncludeMetrics(), expression);
this.expression = expression;
this.includeMetrics = visitor.getIncludeMetrics();
this.maxTrendRange = visitor.getMaxTrendRange();
}

private void verifyIncludeMetrics(Set<String> includeMetrics, String expression) throws IllegalExpressionException {
Expand Down
Loading

0 comments on commit 075b08c

Please sign in to comment.