Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[To rc/1.2.0] Add executeGroupByQueryIntervalQuery rpc interface #10573

Merged
merged 4 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions iotdb-protocol/thrift/src/main/thrift/client.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,21 @@ struct TSAggregationQueryReq {
11: optional bool legalPathNodes
}

struct TSGroupByQueryIntervalReq {
1: required i64 sessionId
2: required i64 statementId
3: required string device
4: required string measurement
5: required i32 dataType
6: required common.TAggregationType aggregationType
7: optional string database
8: optional i64 startTime
9: optional i64 endTime
10: optional i64 interval
11: optional i32 fetchSize
12: optional i64 timeout
}

struct TSCreateMultiTimeseriesReq {
1: required i64 sessionId
2: required list<string> paths
Expand Down Expand Up @@ -500,6 +515,8 @@ service IClientRPCService {

TSExecuteStatementResp executeAggregationQueryV2(1:TSAggregationQueryReq req);

TSExecuteStatementResp executeGroupByQueryIntervalQuery(1:TSGroupByQueryIntervalReq req);

TSFetchResultsResp fetchResultsV2(1:TSFetchResultsReq req);

TSOpenSessionResp openSession(1:TSOpenSessionReq req);
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@
<module>iotdb-client/compile-tools</module>
<module>iotdb-client/client-cpp</module>
<module>metrics</module>
<!-- <module>integration-test</module>-->
<!-- <module>integration-test</module>-->
<module>consensus</module>
<module>library-udf</module>
<module>iotdb-api/udf-api</module>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ public void releaseResourceWhenAllDriversAreClosed() {
* All file paths used by this fragment instance must be cleared and thus the usage reference must
* be decreased.
*/
protected synchronized void releaseResource() {
public synchronized void releaseResource() {
// For schema related query FI, closedFilePaths and unClosedFilePaths will be null
if (closedFilePaths != null) {
for (TsFileResource tsFile : closedFilePaths) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public class FragmentInstanceManager {
private final IDriverScheduler scheduler = DriverScheduler.getInstance();

private final ScheduledExecutorService instanceManagementExecutor;
private final ExecutorService instanceNotificationExecutor;
public final ExecutorService instanceNotificationExecutor;

private final Duration infoCacheTime;

// record failed instances count
private final CounterStat failedInstances = new CounterStat();
public final CounterStat failedInstances = new CounterStat();

private final ExecutorService intoOperationExecutor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@
*/
package org.apache.iotdb.db.service.thrift.impl;

import org.apache.iotdb.common.rpc.thrift.TAggregationType;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.IoTDBException;
import org.apache.iotdb.commons.partition.DataPartition;
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
import org.apache.iotdb.commons.path.AlignedPath;
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.service.metric.enums.Metric;
import org.apache.iotdb.commons.service.metric.enums.Tag;
Expand All @@ -32,8 +41,25 @@
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.OperationType;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.metadata.template.TemplateQueryType;
import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory;
import org.apache.iotdb.db.mpp.aggregation.Aggregator;
import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
import org.apache.iotdb.db.mpp.common.PlanFragmentId;
import org.apache.iotdb.db.mpp.common.QueryId;
import org.apache.iotdb.db.mpp.common.SessionInfo;
import org.apache.iotdb.db.mpp.common.header.ColumnHeader;
import org.apache.iotdb.db.mpp.common.header.DatasetHeader;
import org.apache.iotdb.db.mpp.execution.driver.DriverContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
import org.apache.iotdb.db.mpp.execution.operator.source.AbstractSeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator;
import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
import org.apache.iotdb.db.mpp.plan.Coordinator;
import org.apache.iotdb.db.mpp.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.mpp.plan.analyze.IPartitionFetcher;
Expand All @@ -43,8 +69,14 @@
import org.apache.iotdb.db.mpp.plan.execution.IQueryExecution;
import org.apache.iotdb.db.mpp.plan.parser.ASTVisitor;
import org.apache.iotdb.db.mpp.plan.parser.StatementGenerator;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.GroupByTimeParameter;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.SeriesScanOptions;
import org.apache.iotdb.db.mpp.plan.statement.Statement;
import org.apache.iotdb.db.mpp.plan.statement.StatementType;
import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
Expand All @@ -71,6 +103,7 @@
import org.apache.iotdb.db.service.basic.BasicOpenSessionResp;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.db.utils.TimePartitionUtils;
import org.apache.iotdb.metrics.utils.MetricLevel;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
Expand Down Expand Up @@ -99,6 +132,7 @@
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsReq;
import org.apache.iotdb.service.rpc.thrift.TSFetchResultsResp;
import org.apache.iotdb.service.rpc.thrift.TSGetTimeZoneResp;
import org.apache.iotdb.service.rpc.thrift.TSGroupByQueryIntervalReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsOfOneDeviceReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq;
Expand All @@ -121,10 +155,18 @@
import org.apache.iotdb.service.rpc.thrift.TSUnsetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo;
import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.Column;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

import io.airlift.units.Duration;
import io.jsonwebtoken.lang.Strings;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -133,14 +175,20 @@
import java.nio.ByteBuffer;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
import static org.apache.iotdb.db.mpp.execution.operator.AggregationUtil.initTimeRangeIterator;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onIoTDBException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNPEOrUnexpectedException;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;
import static org.apache.iotdb.tsfile.read.common.block.TsBlockBuilderStatus.DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES;

public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {

Expand Down Expand Up @@ -177,12 +225,13 @@ boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int f

private static final SelectResult OLD_SELECT_RESULT =
(resp, queryExecution, fetchSize) -> {
Pair<TSQueryDataSet, Boolean> pair =
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize);
Pair<TSQueryDataSet, Boolean> pair = convertTsBlockByFetchSize(queryExecution, fetchSize);
resp.setQueryDataSet(pair.left);
return pair.right;
};

public static Duration DEFAULT_TIME_SLICE = new Duration(60_000, TimeUnit.MILLISECONDS);

public ClientRPCServiceImpl() {
partitionFetcher = ClusterPartitionFetcher.getInstance();
schemaFetcher = ClusterSchemaFetcher.getInstance();
Expand Down Expand Up @@ -555,6 +604,175 @@ private TSExecuteStatementResp executeAggregationQueryInternal(
}
}

@Override
public TSExecuteStatementResp executeGroupByQueryIntervalQuery(TSGroupByQueryIntervalReq req)
throws TException {

try {
IClientSession clientSession = SESSION_MANAGER.getCurrSessionAndUpdateIdleTime();

String database = req.getDatabase();
if (StringUtils.isEmpty(database)) {
String[] splits = Strings.split(req.getDevice(), "\\.");
database = String.format("%s.%s", splits[0], splits[1]);
}
String deviceId = req.getDevice();
String measurementId = req.getMeasurement();
TSDataType dataType = TSDataType.getTsDataType((byte) req.getDataType());

// only one database, one device, one time interval
Map<String, List<DataPartitionQueryParam>> sgNameToQueryParamsMap = new HashMap<>();
TTimePartitionSlot timePartitionSlot =
TimePartitionUtils.getTimePartition(req.getStartTime());
DataPartitionQueryParam queryParam =
new DataPartitionQueryParam(
deviceId, Collections.singletonList(timePartitionSlot), false, false);
sgNameToQueryParamsMap.put(database, Collections.singletonList(queryParam));
DataPartition dataPartition = partitionFetcher.getDataPartition(sgNameToQueryParamsMap);
List<DataRegion> dataRegionList = new ArrayList<>();
List<TRegionReplicaSet> replicaSets =
dataPartition.getDataRegionReplicaSet(
deviceId, Collections.singletonList(timePartitionSlot));
for (TRegionReplicaSet region : replicaSets) {
dataRegionList.add(
StorageEngine.getInstance()
.getDataRegion(new DataRegionId(region.getRegionId().getId())));
}

List<TsBlock> blockResult =
executeGroupByQueryInternal(
SESSION_MANAGER.getSessionInfo(clientSession),
deviceId,
measurementId,
dataType,
true,
req.getStartTime(),
req.getEndTime(),
req.getInterval(),
req.getAggregationType(),
dataRegionList);

String outputColumnName = req.getAggregationType().name();
List<ColumnHeader> columnHeaders =
Collections.singletonList(new ColumnHeader(outputColumnName, dataType));
DatasetHeader header = new DatasetHeader(columnHeaders, false);
header.setColumnToTsBlockIndexMap(Collections.singletonList(outputColumnName));

TSExecuteStatementResp resp = createResponse(header, 1);
TSQueryDataSet queryDataSet = convertTsBlockByFetchSize(blockResult);
resp.setQueryDataSet(queryDataSet);

return resp;
} catch (Exception e) {
return RpcUtils.getTSExecuteStatementResp(
onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_AGG_QUERY));
} finally {
SESSION_MANAGER.updateIdleTime();
}
}

private List<TsBlock> executeGroupByQueryInternal(
SessionInfo sessionInfo,
String device,
String measurement,
TSDataType dataType,
boolean isAligned,
long startTime,
long endTme,
long interval,
TAggregationType aggregationType,
List<DataRegion> dataRegionList)
throws IllegalPathException {

int dataRegionSize = dataRegionList.size();
if (dataRegionSize != 1) {
throw new IllegalArgumentException(
"dataRegionList.size() should only be 1 now, current size is " + dataRegionSize);
}

Filter timeFilter = new TimeFilter.TimeGtEqAndLt(startTime, endTme);

QueryId queryId = new QueryId("stub_query");
FragmentInstanceId instanceId =
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
FragmentInstanceStateMachine stateMachine =
new FragmentInstanceStateMachine(
instanceId, FragmentInstanceManager.getInstance().instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext =
createFragmentInstanceContext(
instanceId, stateMachine, sessionInfo, dataRegionList.get(0), timeFilter);
DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
PlanNodeId planNodeId = new PlanNodeId("1");
driverContext.addOperatorContext(1, planNodeId, SeriesScanOperator.class.getSimpleName());
driverContext
.getOperatorContexts()
.forEach(operatorContext -> operatorContext.setMaxRunTime(DEFAULT_TIME_SLICE));

SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
scanOptionsBuilder.withAllSensors(Collections.singleton(measurement));
scanOptionsBuilder.withGlobalTimeFilter(timeFilter);

Aggregator aggregator =
new Aggregator(
AccumulatorFactory.createAccumulator(aggregationType, dataType, null, null, true),
AggregationStep.SINGLE,
Collections.singletonList(new InputLocation[] {new InputLocation(0, 0)}));

GroupByTimeParameter groupByTimeParameter =
new GroupByTimeParameter(startTime, endTme, interval, interval, true);

IMeasurementSchema measurementSchema = new MeasurementSchema(measurement, dataType);
AbstractSeriesAggregationScanOperator operator;
PartialPath path;
if (isAligned) {
path =
new AlignedPath(
device,
Collections.singletonList(measurement),
Collections.singletonList(measurementSchema));
operator =
new AlignedSeriesAggregationScanOperator(
planNodeId,
(AlignedPath) path,
Ordering.ASC,
scanOptionsBuilder.build(),
driverContext.getOperatorContexts().get(0),
Collections.singletonList(aggregator),
initTimeRangeIterator(groupByTimeParameter, true, true),
groupByTimeParameter,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
} else {
path = new MeasurementPath(device, measurement, measurementSchema);
operator =
new SeriesAggregationScanOperator(
planNodeId,
path,
Ordering.ASC,
scanOptionsBuilder.build(),
driverContext.getOperatorContexts().get(0),
Collections.singletonList(aggregator),
initTimeRangeIterator(groupByTimeParameter, true, true),
groupByTimeParameter,
DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES);
}

try {
List<TsBlock> result = new ArrayList<>();
fragmentInstanceContext.setSourcePaths(Collections.singletonList(path));
operator.initQueryDataSource(fragmentInstanceContext.getSharedQueryDataSource());

while (operator.hasNext()) {
result.add(operator.next());
}

return result;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
fragmentInstanceContext.releaseResource();
}
}

@Override
public TSExecuteStatementResp executeQueryStatementV2(TSExecuteStatementReq req) {
return executeStatementV2(req);
Expand Down Expand Up @@ -1130,7 +1348,7 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {

try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
Pair<TSQueryDataSet, Boolean> pair =
QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize);
convertTsBlockByFetchSize(queryExecution, req.fetchSize);
TSQueryDataSet result = pair.left;
finished = pair.right;
boolean hasResultSet = result.bufferForTime().limit() != 0;
Expand Down
Loading