Skip to content

Commit

Permalink
Fix some query metric
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 authored Nov 24, 2024
1 parent 6e41961 commit 34228a9
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.tsfile.read.reader.IPageReader;
import org.apache.tsfile.read.reader.IPointReader;
import org.apache.tsfile.read.reader.page.AlignedPageReader;
import org.apache.tsfile.read.reader.page.TablePageReader;
import org.apache.tsfile.read.reader.series.PaginationController;
import org.apache.tsfile.utils.Accountable;
import org.apache.tsfile.utils.RamUsageEstimator;
Expand Down Expand Up @@ -1228,7 +1229,10 @@ protected static class VersionPageReader {
this.version = new MergeReaderPriority(fileTimestamp, version, offset, isSeq);
this.data = data;
this.isSeq = isSeq;
this.isAligned = data instanceof AlignedPageReader || data instanceof MemAlignedPageReader;
this.isAligned =
data instanceof AlignedPageReader
|| data instanceof MemAlignedPageReader
|| data instanceof TablePageReader;
this.isMem = data instanceof MemPageReader || data instanceof MemAlignedPageReader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1107,45 +1107,45 @@ private void unbindConstructChunkReader(AbstractMetricService metricService) {
/////////////////////////////////////////////////////////////////////////////////////////////////
private static final String READ_CHUNK = "read_chunk";
private static final String ALL = "all";
public static final String READ_CHUNK_ALL = READ_CHUNK + "_" + ALL;
public static final String READ_CHUNK_CACHE = READ_CHUNK + "_" + CACHE;
public static final String READ_CHUNK_FILE = READ_CHUNK + "_" + FILE;
private Timer readChunkAllTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer readChunkCacheTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
private Timer readChunkFileTimer = DoNothingMetricManager.DO_NOTHING_TIMER;

private void bindReadChunk(AbstractMetricService metricService) {
readChunkAllTimer =
readChunkCacheTimer =
metricService.getOrCreateTimer(
Metric.SERIES_SCAN_COST.toString(),
MetricLevel.IMPORTANT,
Tag.STAGE.toString(),
READ_CHUNK,
READ_CHUNK_CACHE,
Tag.TYPE.toString(),
NULL,
Tag.FROM.toString(),
ALL);
CACHE);
readChunkFileTimer =
metricService.getOrCreateTimer(
Metric.SERIES_SCAN_COST.toString(),
MetricLevel.IMPORTANT,
Tag.STAGE.toString(),
READ_CHUNK,
READ_CHUNK_FILE,
Tag.TYPE.toString(),
NULL,
Tag.FROM.toString(),
FILE);
}

private void unbindReadChunk(AbstractMetricService metricService) {
readChunkAllTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
readChunkCacheTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
readChunkFileTimer = DoNothingMetricManager.DO_NOTHING_TIMER;
Arrays.asList(ALL, FILE)
Arrays.asList(CACHE, FILE)
.forEach(
from ->
metricService.remove(
MetricType.TIMER,
Metric.SERIES_SCAN_COST.toString(),
Tag.STAGE.toString(),
READ_CHUNK,
READ_CHUNK + "_" + from,
Tag.TYPE.toString(),
NULL,
Tag.FROM.toString(),
Expand Down Expand Up @@ -1571,8 +1571,8 @@ public void recordSeriesScanCost(String type, long cost) {
case READ_TIMESERIES_METADATA_FILE:
readTimeseriesMetadataFileTimer.updateNanos(cost);
break;
case READ_CHUNK_ALL:
readChunkAllTimer.updateNanos(cost);
case READ_CHUNK_CACHE:
readChunkCacheTimer.updateNanos(cost);
break;
case READ_CHUNK_FILE:
readChunkFileTimer.updateNanos(cost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ public Operator visitTableScan(TableScanNode node, LocalExecutionPlanContext con
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
TableScanNode.class.getSimpleName());
TableScanOperator.class.getSimpleName());

int maxTsBlockLineNum = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
if (context.getTypeProvider().getTemplatedInfo() != null) {
Expand Down Expand Up @@ -1132,7 +1132,7 @@ public Operator visitStreamSort(StreamSortNode node, LocalExecutionPlanContext c
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
StreamSortNode.class.getSimpleName());
TableStreamSortOperator.class.getSimpleName());
List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());
int sortItemsCount = node.getOrderingScheme().getOrderBy().size();

Expand Down Expand Up @@ -1176,11 +1176,6 @@ public Operator visitStreamSort(StreamSortNode node, LocalExecutionPlanContext c

@Override
public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(), node.getPlanNodeId(), JoinNode.class.getSimpleName());
List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());

Operator leftChild = node.getLeftChild().accept(this, context);
Expand Down Expand Up @@ -1221,6 +1216,13 @@ public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) {
}

if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.INNER) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
TableInnerJoinOperator.class.getSimpleName());
return new TableInnerJoinOperator(
operatorContext,
leftChild,
Expand All @@ -1232,6 +1234,13 @@ public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) {
ASC_TIME_COMPARATOR,
dataTypes);
} else if (requireNonNull(node.getJoinType()) == JoinNode.JoinType.FULL) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
TableFullOuterJoinOperator.class.getSimpleName());
return new TableFullOuterJoinOperator(
operatorContext,
leftChild,
Expand Down Expand Up @@ -1357,25 +1366,28 @@ public Operator visitTableDeviceQueryCount(

@Override
public Operator visitAggregation(AggregationNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AggregationNode.class.getSimpleName());

Operator child = node.getChild().accept(this, context);

if (node.getGroupingKeys().isEmpty()) {
return planGlobalAggregation(node, child, context.getTypeProvider(), operatorContext);
return planGlobalAggregation(node, child, context.getTypeProvider(), context);
}

return planGroupByAggregation(node, child, context.getTypeProvider(), operatorContext);
return planGroupByAggregation(node, child, context.getTypeProvider(), context);
}

private Operator planGlobalAggregation(
AggregationNode node, Operator child, TypeProvider typeProvider, OperatorContext context) {

AggregationNode node,
Operator child,
TypeProvider typeProvider,
LocalExecutionPlanContext context) {
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AggregationOperator.class.getSimpleName());
Map<Symbol, AggregationNode.Aggregation> aggregationMap = node.getAggregations();
ImmutableList.Builder<TableAggregator> aggregatorBuilder = new ImmutableList.Builder<>();
Map<Symbol, Integer> childLayout =
Expand All @@ -1393,7 +1405,7 @@ private Operator planGlobalAggregation(
typeProvider,
true,
null)));
return new AggregationOperator(context, child, aggregatorBuilder.build());
return new AggregationOperator(operatorContext, child, aggregatorBuilder.build());
}

// timeColumnName will only be set for AggTableScan.
Expand Down Expand Up @@ -1438,7 +1450,7 @@ private Operator planGroupByAggregation(
AggregationNode node,
Operator child,
TypeProvider typeProvider,
OperatorContext operatorContext) {
LocalExecutionPlanContext context) {
Map<Symbol, Integer> childLayout =
makeLayoutFromOutputSymbols(node.getChild().getOutputSymbols());

Expand All @@ -1458,6 +1470,13 @@ private Operator planGroupByAggregation(
buildAggregator(
childLayout, k, v, node.getStep(), typeProvider, true, null)));

OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
StreamingAggregationOperator.class.getSimpleName());
return new StreamingAggregationOperator(
operatorContext,
child,
Expand Down Expand Up @@ -1499,6 +1518,13 @@ private Operator planGroupByAggregation(
}

List<Integer> preGroupedChannels = preGroupedChannelsBuilder.build();
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
StreamingHashAggregationOperator.class.getSimpleName());
return new StreamingHashAggregationOperator(
operatorContext,
child,
Expand All @@ -1522,6 +1548,13 @@ private Operator planGroupByAggregation(
(k, v) ->
aggregatorBuilder.add(
buildGroupByAggregator(childLayout, k, v, node.getStep(), typeProvider)));
OperatorContext operatorContext =
context
.getDriverContext()
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
HashAggregationOperator.class.getSimpleName());

return new HashAggregationOperator(
operatorContext,
Expand Down Expand Up @@ -1739,7 +1772,7 @@ public Operator visitAggregationTableScan(
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
AggregationTableScanNode.class.getSimpleName());
TableAggregationTableScanOperator.class.getSimpleName());
SeriesScanOptions.Builder scanOptionsBuilder =
node.getTimePredicate().isPresent()
? getSeriesScanOptionsBuilder(context, node.getTimePredicate().get())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.queryengine.plan.relational.sql.parser;

import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.DataType;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
Expand Down Expand Up @@ -54,6 +55,10 @@
import static java.util.Objects.requireNonNull;

public class SqlParser {

private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS =
PerformanceOverviewMetrics.getInstance();

private static final ANTLRErrorListener LEXER_ERROR_LISTENER =
new BaseErrorListener() {
@Override
Expand Down Expand Up @@ -131,6 +136,7 @@ private Node invokeParser(
Optional<NodeLocation> location,
Function<RelationalSqlParser, ParserRuleContext> parseFunction,
ZoneId zoneId) {
long startTime = System.nanoTime();
try {
RelationalSqlLexer lexer =
new RelationalSqlLexer(new CaseInsensitiveStream(CharStreams.fromString(sql)));
Expand Down Expand Up @@ -190,6 +196,8 @@ public Token recoverInline(Parser recognizer) throws RecognitionException {
return new AstBuilder(location.orElse(null), zoneId).visit(tree);
} catch (StackOverflowError e) {
throw new ParsingException(name + " is too large (stack overflow while parsing)");
} finally {
PERFORMANCE_OVERVIEW_METRICS.recordParseCost(System.nanoTime() - startTime);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import java.util.function.Function;
import java.util.function.LongConsumer;

import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.READ_CHUNK_ALL;
import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.READ_CHUNK_CACHE;
import static org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.READ_CHUNK_FILE;

/**
Expand Down Expand Up @@ -162,11 +162,13 @@ private Chunk get(
} finally {
if (chunkLoader.isCacheMiss()) {
cacheMissAdder.accept(1);
SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
READ_CHUNK_FILE, System.nanoTime() - startTime);
} else {
cacheHitAdder.accept(1);
SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
READ_CHUNK_CACHE, System.nanoTime() - startTime);
}
SERIES_SCAN_COST_METRIC_SET.recordSeriesScanCost(
READ_CHUNK_ALL, System.nanoTime() - startTime);
}
}

Expand Down

0 comments on commit 34228a9

Please sign in to comment.