Skip to content

Commit

Permalink
[To rc/1.2.0] Add executeGroupByQueryIntervalQuery rpc interface (#10573
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Beyyes authored Jul 18, 2023
1 parent 11ba3bf commit f98b8ca
Show file tree
Hide file tree
Showing 9 changed files with 528 additions and 8 deletions.
17 changes: 17 additions & 0 deletions iotdb-protocol/thrift/src/main/thrift/client.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,21 @@ struct TSAggregationQueryReq {
11: optional bool legalPathNodes
}

struct TSGroupByQueryIntervalReq {
1: required i64 sessionId
2: required i64 statementId
3: required string device
4: required string measurement
5: required i32 dataType
6: required common.TAggregationType aggregationType
7: optional string database
8: optional i64 startTime
9: optional i64 endTime
10: optional i64 interval
11: optional i32 fetchSize
12: optional i64 timeout
}

struct TSCreateMultiTimeseriesReq {
1: required i64 sessionId
2: required list<string> paths
Expand Down Expand Up @@ -500,6 +515,8 @@ service IClientRPCService {

TSExecuteStatementResp executeAggregationQueryV2(1:TSAggregationQueryReq req);

TSExecuteStatementResp executeGroupByQueryIntervalQuery(1:TSGroupByQueryIntervalReq req);

TSFetchResultsResp fetchResultsV2(1:TSFetchResultsReq req);

TSOpenSessionResp openSession(1:TSOpenSessionReq req);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<module>iotdb-client/compile-tools</module>
<module>iotdb-client/client-cpp</module>
<module>metrics</module>
<!-- <module>integration-test</module>-->
<!-- <module>integration-test</module>-->
<module>consensus</module>
<module>library-udf</module>
<module>iotdb-api/udf-api</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public void releaseResourceWhenAllDriversAreClosed() {
* All file paths used by this fragment instance must be cleared and thus the usage reference must
* be decreased.
*/
protected synchronized void releaseResource() {
public synchronized void releaseResource() {
// For schema related query FI, closedFilePaths and unClosedFilePaths will be null
if (closedFilePaths != null) {
for (TsFileResource tsFile : closedFilePaths) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public class FragmentInstanceManager {
private final IDriverScheduler scheduler = DriverScheduler.getInstance();

private final ScheduledExecutorService instanceManagementExecutor;
private final ExecutorService instanceNotificationExecutor;
public final ExecutorService instanceNotificationExecutor;

private final Duration infoCacheTime;

// record failed instances count
private final CounterStat failedInstances = new CounterStat();
public final CounterStat failedInstances = new CounterStat();

private final ExecutorService intoOperationExecutor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@
*/
package org.apache.iotdb.db.service.thrift.impl;

import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
Expand All @@ -32,8 +41,25 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.source.AbstractSeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
Expand All @@ -43,8 +69,14 @@
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.ASTVisitor;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
Expand All @@ -71,6 +103,7 @@
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
Expand Down Expand Up @@ -99,6 +132,7 @@
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TSGroupByQueryIntervalReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
Expand All @@ -121,10 +155,18 @@
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

import io.airlift.units.Duration;
import io.jsonwebtoken.lang.Strings;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -133,14 +175,20 @@
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;

public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {

Expand Down Expand Up @@ -177,12 +225,13 @@ boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int f

private static final SelectResult OLD_SELECT_RESULT =
(resp, queryExecution, fetchSize) -> {
Pair<TSQueryDataSet, Boolean> pair =
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize);
Pair<TSQueryDataSet, Boolean> pair = convertTsBlockByFetchSize(queryExecution, fetchSize);
resp.setQueryDataSet(pair.left);
return pair.right;
};

public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS);

public ClientRPCServiceImpl() {
partitionFetcher = ClusterPartitionFetcher.getInstance();
schemaFetcher = ClusterSchemaFetcher.getInstance();
Expand Down Expand Up @@ -555,6 +604,175 @@ private TSExecuteStatementResp executeAggregationQueryInternal(
}
}

@Override
public TSExecuteStatementResp executeGroupByQueryIntervalQuery(TSGroupByQueryIntervalReq req)
throws TException {

try {
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();

String database = req.getDatabase();
if (StringUtils.isEmpty(database)) {
String[] splits = Strings.split(req.getDevice(), "\\.");
database = String.format("%s.%s", splits[0], splits[1]);
}
String deviceId = req.getDevice();
String measurementId = req.getMeasurement();
TSDataType dataType = TSDataType.getTsDataType((byte) req.getDataType());

// only one database, one device, one time interval
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartition(req.getStartTime());
DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(
deviceId, Collections.singletonList(timePartitionSlot), false, false);
sgNameToQueryParamsMap.put(database, Collections.singletonList(queryParam));
DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
List<DataRegion> dataRegionList = new ArrayList<>();
List<TRegionReplicaSet> replicaSets =
dataPartition.getDataRegionReplicaSet(
deviceId, Collections.singletonList(timePartitionSlot));
for (TRegionReplicaSet region : replicaSets) {
dataRegionList.add(
StorageEngine.getInstance()
.getDataRegion(new DataRegionId(region.getRegionId().getId())));
}

List<TsBlock> blockResult =
executeGroupByQueryInternal(
SESSION_MANAGER.getSessionInfo(clientSession),
deviceId,
measurementId,
dataType,
true,
req.getStartTime(),
req.getEndTime(),
req.getInterval(),
req.getAggregationType(),
dataRegionList);

String outputColumnName = req.getAggregationType().name();
List<ColumnHeader> columnHeaders =
Collections.singletonList(new ColumnHeader(outputColumnName, dataType));
DatasetHeader header = new DatasetHeader(columnHeaders, false);
header.setColumnToTsBlockIndexMap(Collections.singletonList(outputColumnName));

TSExecuteStatementResp resp = createResponse(header, 1);
TSQueryDataSet queryDataSet = convertTsBlockByFetchSize(blockResult);
resp.setQueryDataSet(queryDataSet);

return resp;
} catch (Exception e) {
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_AGG_QUERY));
} finally {
SESSION_MANAGER.updateIdleTime();
}
}

private List<TsBlock> executeGroupByQueryInternal(
SessionInfo sessionInfo,
String device,
String measurement,
TSDataType dataType,
boolean isAligned,
long startTime,
long endTme,
long interval,
TAggregationType aggregationType,
List<DataRegion> dataRegionList)
throws IllegalPathException {

int dataRegionSize = dataRegionList.size();
if (dataRegionSize != 1) {
throw new IllegalArgumentException(
"dataRegionList.size() should only be 1 now, current size is " + dataRegionSize);
}

Filter timeFilter = new TimeFilter.TimeGtEqAndLt(startTime, endTme);

QueryId queryId = new QueryId("stub_query");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(
instanceId, FragmentInstanceManager.getInstance().instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
createFragmentInstanceContext(
instanceId, stateMachine, sessionInfo, dataRegionList.get(0), timeFilter);
DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
PlanNodeId planNodeId = new PlanNodeId("1");
driverContext.addOperatorContext(1, planNodeId, SeriesScanOperator.class.getSimpleName());
driverContext
.getOperatorContexts()
.forEach(operatorContext -> operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE));

SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
scanOptionsBuilder.withGlobalTimeFilter(timeFilter);

Aggregator aggregator =
new Aggregator(
AccumulatorFactory.createAccumulator(aggregationType, dataType, null, null, true),
AggregationStep.SINGLE,
Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)}));

GroupByTimeParameter groupByTimeParameter =
new GroupByTimeParameter(startTime, endTme, interval, interval, true);

IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, dataType);
AbstractSeriesAggregationScanOperator operator;
PartialPath path;
if (isAligned) {
path =
new AlignedPath(
device,
Collections.singletonList(measurement),
Collections.singletonList(measurementSchema));
operator =
new AlignedSeriesAggregationScanOperator(
planNodeId,
(AlignedPath) path,
Ordering.ASC,
scanOptionsBuilder.build(),
driverContext.getOperatorContexts().get(0),
Collections.singletonList(aggregator),
initTimeRangeIterator(groupByTimeParameter, true, true),
groupByTimeParameter,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
} else {
path = new MeasurementPath(device, measurement, measurementSchema);
operator =
new SeriesAggregationScanOperator(
planNodeId,
path,
Ordering.ASC,
scanOptionsBuilder.build(),
driverContext.getOperatorContexts().get(0),
Collections.singletonList(aggregator),
initTimeRangeIterator(groupByTimeParameter, true, true),
groupByTimeParameter,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
}

try {
List<TsBlock> result = new ArrayList<>();
fragmentInstanceContext.setSourcePaths(Collections.singletonList(path));
operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource());

while (operator.hasNext()) {
result.add(operator.next());
}

return result;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
fragmentInstanceContext.releaseResource();
}
}

@Override
public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req) {
return executeStatementV2(req);
Expand Down Expand Up @@ -1130,7 +1348,7 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {

try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
Pair<TSQueryDataSet, Boolean> pair =
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize);
convertTsBlockByFetchSize(queryExecution, req.fetchSize);
TSQueryDataSet result = pair.left;
finished = pair.right;
boolean hasResultSet = result.bufferForTime().limit() != 0;
Expand Down
Loading

0 comments on commit f98b8ca

Please sign in to comment.