From ee9b752bacc6ac54ffe366d485579dab12aef6ed Mon Sep 17 00:00:00 2001 From: YangCaiyin Date: Wed, 25 Sep 2024 15:56:20 +0800 Subject: [PATCH] AINode: Correcting the Omissions and Redundancies (#13594) * Add logic of plan deserialization in configNode * remove AINodeService in datanode which is not used anymore. --- .../consensus/request/ConfigPhysicalPlan.java | 40 ++++ .../ainode/GetAINodeConfigurationPlan.java | 21 +- .../request/read/model/GetModelInfoPlan.java | 22 ++- .../request/read/model/ShowModelPlan.java | 24 +++ .../org/apache/iotdb/db/conf/IoTDBConfig.java | 22 --- .../apache/iotdb/db/conf/IoTDBDescriptor.java | 13 -- .../AINodeRPCServiceThriftHandler.java | 60 ------ .../thrift/impl/AINodeRPCServiceImpl.java | 187 ------------------ .../impl/IAINodeRPCServiceWithHandler.java | 26 --- .../iotdb/db/service/AINodeRPCService.java | 94 --------- .../db/service/AINodeRPCServiceMBean.java | 22 --- .../org/apache/iotdb/db/service/DataNode.java | 4 - .../src/main/thrift/datanode.thrift | 15 +- 13 files changed, 106 insertions(+), 444 deletions(-) delete mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/AINodeRPCServiceThriftHandler.java delete mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/AINodeRPCServiceImpl.java delete mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/IAINodeRPCServiceWithHandler.java delete mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCService.java delete mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCServiceMBean.java diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java index 7bfa5c50c69d..9d020aa52d77 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlan.java @@ -20,7 +20,13 @@ package org.apache.iotdb.confignode.consensus.request; import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException; +import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan; +import org.apache.iotdb.confignode.consensus.request.read.model.GetModelInfoPlan; +import org.apache.iotdb.confignode.consensus.request.read.model.ShowModelPlan; import org.apache.iotdb.confignode.consensus.request.read.subscription.ShowTopicPlan; +import org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan; +import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan; +import org.apache.iotdb.confignode.consensus.request.write.ainode.UpdateAINodePlan; import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan; import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan; @@ -43,6 +49,10 @@ import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan; import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan; import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan; +import org.apache.iotdb.confignode.consensus.request.write.model.CreateModelPlan; +import org.apache.iotdb.confignode.consensus.request.write.model.DropModelInNodePlan; +import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan; +import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelInfoPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan; import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan; @@ -167,6 +177,18 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept case RemoveDataNode: plan = new RemoveDataNodePlan(); break; + case RegisterAINode: + plan = new RegisterAINodePlan(); + break; + case RemoveAINode: + plan = new RemoveAINodePlan(); + break; + case GetAINodeConfiguration: + plan = new GetAINodeConfigurationPlan(); + break; + case UpdateAINodeConfiguration: + plan = new UpdateAINodePlan(); + break; case CreateDatabase: plan = new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase); break; @@ -420,6 +442,24 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept case UPDATE_CQ_LAST_EXEC_TIME: plan = new UpdateCQLastExecTimePlan(); break; + case CreateModel: + plan = new CreateModelPlan(); + break; + case UpdateModelInfo: + plan = new UpdateModelInfoPlan(); + break; + case DropModel: + plan = new DropModelPlan(); + break; + case ShowModel: + plan = new ShowModelPlan(); + break; + case DropModelInNode: + plan = new DropModelInNodePlan(); + break; + case GetModelInfo: + plan = new GetModelInfoPlan(); + break; case CreatePipePlugin: plan = new CreatePipePluginPlan(); break; diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/ainode/GetAINodeConfigurationPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/ainode/GetAINodeConfigurationPlan.java index b080cf77c5dd..7222a8f53f8c 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/ainode/GetAINodeConfigurationPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/ainode/GetAINodeConfigurationPlan.java @@ -22,10 +22,18 @@ import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType; import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; + public class GetAINodeConfigurationPlan extends ConfigPhysicalReadPlan { // if aiNodeId is set to -1, return all AINode configurations. - private final int aiNodeId; + private int aiNodeId; + + public GetAINodeConfigurationPlan() { + super(ConfigPhysicalPlanType.GetAINodeConfiguration); + } public GetAINodeConfigurationPlan(final int aiNodeId) { super(ConfigPhysicalPlanType.GetAINodeConfiguration); @@ -36,6 +44,17 @@ public int getAiNodeId() { return aiNodeId; } + @Override + protected void serializeImpl(DataOutputStream stream) throws IOException { + stream.writeShort(getType().getPlanType()); + stream.writeInt(aiNodeId); + } + + @Override + protected void deserializeImpl(ByteBuffer buffer) throws IOException { + this.aiNodeId = buffer.getInt(); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/GetModelInfoPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/GetModelInfoPlan.java index dfec065f2069..9c33c2678829 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/GetModelInfoPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/GetModelInfoPlan.java @@ -23,11 +23,20 @@ import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoReq; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Objects; public class GetModelInfoPlan extends ConfigPhysicalReadPlan { - private final String modelId; + private String modelId; + + public GetModelInfoPlan() { + super(ConfigPhysicalPlanType.GetModelInfo); + } public GetModelInfoPlan(final TGetModelInfoReq getModelInfoReq) { super(ConfigPhysicalPlanType.GetModelInfo); @@ -38,6 +47,17 @@ public String getModelId() { return modelId; } + @Override + protected void serializeImpl(DataOutputStream stream) throws IOException { + stream.writeShort(getType().getPlanType()); + ReadWriteIOUtils.write(modelId, stream); + } + + @Override + protected void deserializeImpl(ByteBuffer buffer) throws IOException { + this.modelId = ReadWriteIOUtils.readString(buffer); + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java index c3d0ff79a37a..df924c97f5b7 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/read/model/ShowModelPlan.java @@ -23,12 +23,21 @@ import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan; import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq; +import org.apache.tsfile.utils.ReadWriteIOUtils; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Objects; public class ShowModelPlan extends ConfigPhysicalReadPlan { private String modelName; + public ShowModelPlan() { + super(ConfigPhysicalPlanType.ShowModel); + } + public ShowModelPlan(final TShowModelReq showModelReq) { super(ConfigPhysicalPlanType.ShowModel); if (showModelReq.isSetModelId()) { @@ -44,6 +53,21 @@ public String getModelName() { return modelName; } + @Override + protected void serializeImpl(DataOutputStream stream) throws IOException { + stream.writeShort(getType().getPlanType()); + ReadWriteIOUtils.write(modelName != null, stream); + ReadWriteIOUtils.write(modelName, stream); + } + + @Override + protected void deserializeImpl(ByteBuffer buffer) throws IOException { + boolean isSetModelId = ReadWriteIOUtils.readBool(buffer); + if (isSetModelId) { + this.modelName = ReadWriteIOUtils.readString(buffer); + } + } + @Override public boolean equals(final Object o) { if (this == o) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 5ccf729fc63c..1d369eeccd21 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -442,9 +442,6 @@ public class IoTDBConfig { /** Enable auto repair compaction */ private volatile boolean enableAutoRepairCompaction = true; - /** Enable the service for AINode */ - private boolean enableAINodeService = false; - /** The buffer for sort operation */ private long sortBufferSize = 1024 * 1024L; @@ -923,9 +920,6 @@ public class IoTDBConfig { /** Internal port for coordinator */ private int internalPort = 10730; - /** Port for AINode */ - private int aiNodePort = 10780; - /** Internal port for dataRegion consensus protocol */ private int dataRegionConsensusPort = 10760; @@ -2884,14 +2878,6 @@ public void setEnableAutoRepairCompaction(boolean enableAutoRepairCompaction) { this.enableAutoRepairCompaction = enableAutoRepairCompaction; } - public boolean isEnableAINodeService() { - return enableAINodeService; - } - - public void setEnableAINodeService(boolean enableAINodeService) { - this.enableAINodeService = enableAINodeService; - } - public InnerSequenceCompactionSelector getInnerSequenceCompactionSelector() { return innerSequenceCompactionSelector; } @@ -3170,14 +3156,6 @@ public void setInternalPort(int internalPort) { this.internalPort = internalPort; } - public int getAINodePort() { - return aiNodePort; - } - - public void setAINodePort(int aiNodePort) { - this.aiNodePort = aiNodePort; - } - public int getDataRegionConsensusPort() { return dataRegionConsensusPort; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index bc4e76fee8bd..feda1c03b6f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -287,19 +287,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO .getProperty(IoTDBConstant.DN_RPC_PORT, Integer.toString(conf.getRpcPort())) .trim())); - conf.setEnableAINodeService( - Boolean.parseBoolean( - properties - .getProperty( - "enable_ainode_rpc_service", Boolean.toString(conf.isEnableAINodeService())) - .trim())); - - conf.setAINodePort( - Integer.parseInt( - properties - .getProperty("ainode_rpc_port", Integer.toString(conf.getAINodePort())) - .trim())); - conf.setBufferedArraysMemoryProportion( Double.parseDouble( properties diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/AINodeRPCServiceThriftHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/AINodeRPCServiceThriftHandler.java deleted file mode 100644 index c5969f8678a3..000000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/handler/AINodeRPCServiceThriftHandler.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - *

http://www.apache.org/licenses/LICENSE-2.0 - * - *

Unless required by applicable law or agreed to in writing, software distributed under the - * License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.iotdb.db.protocol.thrift.handler; - -import org.apache.iotdb.db.protocol.thrift.impl.IAINodeRPCServiceWithHandler; - -import org.apache.thrift.protocol.TProtocol; -import org.apache.thrift.server.ServerContext; -import org.apache.thrift.server.TServerEventHandler; -import org.apache.thrift.transport.TTransport; - -import java.util.concurrent.atomic.AtomicLong; - -public class AINodeRPCServiceThriftHandler implements TServerEventHandler { - - private final AtomicLong thriftConnectionNumber = new AtomicLong(0); - private final IAINodeRPCServiceWithHandler eventHandler; - - public AINodeRPCServiceThriftHandler(IAINodeRPCServiceWithHandler eventHandler) { - this.eventHandler = eventHandler; - } - - @Override - public ServerContext createContext(TProtocol in, TProtocol out) { - thriftConnectionNumber.incrementAndGet(); - return null; - } - - @Override - public void deleteContext(ServerContext arg0, TProtocol in, TProtocol out) { - thriftConnectionNumber.decrementAndGet(); - eventHandler.handleExit(); - } - - @Override - public void preServe() { - // do nothing - } - - @Override - public void processContext( - ServerContext serverContext, TTransport tTransport, TTransport tTransport1) { - // do nothing - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/AINodeRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/AINodeRPCServiceImpl.java deleted file mode 100644 index 68e492cdf462..000000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/AINodeRPCServiceImpl.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.protocol.thrift.impl; - -import org.apache.iotdb.common.rpc.thrift.TSStatus; -import org.apache.iotdb.commons.conf.IoTDBConstant.ClientVersion; -import org.apache.iotdb.db.protocol.session.IClientSession; -import org.apache.iotdb.db.protocol.session.InternalClientSession; -import org.apache.iotdb.db.protocol.session.SessionManager; -import org.apache.iotdb.db.protocol.thrift.OperationType; -import org.apache.iotdb.db.queryengine.common.header.DatasetHeader; -import org.apache.iotdb.db.queryengine.plan.Coordinator; -import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher; -import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; -import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher; -import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaFetcher; -import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult; -import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution; -import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator; -import org.apache.iotdb.db.queryengine.plan.statement.Statement; -import org.apache.iotdb.db.utils.ErrorHandlingUtils; -import org.apache.iotdb.db.utils.QueryDataSetUtils; -import org.apache.iotdb.db.utils.SetThreadName; -import org.apache.iotdb.mpp.rpc.thrift.TFetchMoreDataReq; -import org.apache.iotdb.mpp.rpc.thrift.TFetchMoreDataResp; -import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesReq; -import org.apache.iotdb.mpp.rpc.thrift.TFetchTimeseriesResp; -import org.apache.iotdb.rpc.RpcUtils; -import org.apache.iotdb.rpc.TSStatusCode; - -import org.apache.thrift.TException; -import org.apache.tsfile.utils.Pair; - -import java.nio.ByteBuffer; -import java.time.ZoneId; -import java.util.List; - -public class AINodeRPCServiceImpl implements IAINodeRPCServiceWithHandler { - - public static final String AI_METRICS_PATH_PREFIX = "root.__system.AI.exp"; - - private static final SessionManager SESSION_MANAGER = SessionManager.getInstance(); - - private static final Coordinator COORDINATOR = Coordinator.getInstance(); - - private final IPartitionFetcher partitionFetcher; - - private final ISchemaFetcher schemaFetcher; - - private final IClientSession session; - - public AINodeRPCServiceImpl() { - super(); - partitionFetcher = ClusterPartitionFetcher.getInstance(); - schemaFetcher = ClusterSchemaFetcher.getInstance(); - session = new InternalClientSession("AINodeService"); - SESSION_MANAGER.registerSession(session); - SESSION_MANAGER.supplySession(session, "AINode", ZoneId.systemDefault(), ClientVersion.V_1_0); - } - - @Override - public TFetchTimeseriesResp fetchTimeseries(TFetchTimeseriesReq req) throws TException { - boolean finished = false; - TFetchTimeseriesResp resp = new TFetchTimeseriesResp(); - Throwable t = null; - try { - - Statement s = StatementGenerator.createStatement(req, session.getZoneId()); - - if (s == null) { - resp.setStatus( - RpcUtils.getStatus( - TSStatusCode.SQL_PARSE_ERROR, "This operation type is not supported")); - return resp; - } - - long queryId = - SESSION_MANAGER.requestQueryId(session, SESSION_MANAGER.requestStatementId(session)); - ExecutionResult result = - COORDINATOR.executeForTreeModel( - s, - queryId, - SESSION_MANAGER.getSessionInfo(session), - "", - partitionFetcher, - schemaFetcher, - req.getTimeout()); - - if (result.status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode() - && result.status.code != TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) { - resp.setStatus(result.status); - return resp; - } - - IQueryExecution queryExecution = COORDINATOR.getQueryExecution(queryId); - - try (SetThreadName threadName = new SetThreadName(result.queryId.getId())) { - - DatasetHeader header = queryExecution.getDatasetHeader(); - resp.setStatus(result.status); - resp.setColumnNameList(header.getRespColumns()); - resp.setColumnTypeList(header.getRespDataTypeList()); - resp.setColumnNameIndexMap(header.getColumnNameIndexMap()); - resp.setQueryId(queryId); - - Pair, Boolean> pair = - QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize); - resp.setTsDataset(pair.left); - finished = pair.right; - resp.setHasMoreData(!finished); - return resp; - } - } catch (Exception e) { - finished = true; - t = e; - resp.setStatus(ErrorHandlingUtils.onQueryException(e, OperationType.EXECUTE_STATEMENT)); - return resp; - } catch (Error error) { - t = error; - throw error; - } finally { - if (finished) { - COORDINATOR.cleanupQueryExecution(resp.queryId, req, t); - } - } - } - - @Override - public TFetchMoreDataResp fetchMoreData(TFetchMoreDataReq req) throws TException { - TFetchMoreDataResp resp = new TFetchMoreDataResp(); - boolean finished = false; - Throwable t = null; - try { - IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId); - resp.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); - - if (queryExecution == null) { - resp.setHasMoreData(false); - return resp; - } - - try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { - Pair, Boolean> pair = - QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize); - List result = pair.left; - finished = pair.right; - resp.setTsDataset(result); - resp.setHasMoreData(!finished); - return resp; - } - } catch (Exception e) { - finished = true; - t = e; - resp.setStatus(ErrorHandlingUtils.onQueryException(e, OperationType.FETCH_RESULTS)); - return resp; - } catch (Error error) { - t = error; - throw error; - } finally { - if (finished) { - COORDINATOR.cleanupQueryExecution(req.queryId, req, t); - } - } - } - - @Override - public void handleExit() { - SESSION_MANAGER.closeSession(session, COORDINATOR::cleanupQueryExecution); - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/IAINodeRPCServiceWithHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/IAINodeRPCServiceWithHandler.java deleted file mode 100644 index 7d9df50ac3ce..000000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/IAINodeRPCServiceWithHandler.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.protocol.thrift.impl; - -import org.apache.iotdb.mpp.rpc.thrift.IAINodeInternalRPCService; - -public interface IAINodeRPCServiceWithHandler extends IAINodeInternalRPCService.Iface { - void handleExit(); -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCService.java deleted file mode 100644 index 5ec49e756602..000000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCService.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.service; - -import org.apache.iotdb.commons.concurrent.ThreadName; -import org.apache.iotdb.commons.exception.runtime.RPCServiceException; -import org.apache.iotdb.commons.service.ServiceType; -import org.apache.iotdb.commons.service.ThriftService; -import org.apache.iotdb.commons.service.ThriftServiceThread; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; -import org.apache.iotdb.db.protocol.thrift.handler.AINodeRPCServiceThriftHandler; -import org.apache.iotdb.db.protocol.thrift.impl.AINodeRPCServiceImpl; -import org.apache.iotdb.mpp.rpc.thrift.IAINodeInternalRPCService; -import org.apache.iotdb.rpc.DeepCopyRpcTransportFactory; - -public class AINodeRPCService extends ThriftService implements AINodeRPCServiceMBean { - - private AINodeRPCServiceImpl impl; - - private AINodeRPCService() {} - - @Override - public ServiceType getID() { - return ServiceType.AINode_RPC_SERVICE; - } - - @Override - public void initTProcessor() { - impl = new AINodeRPCServiceImpl(); - initSyncedServiceImpl(null); - processor = new IAINodeInternalRPCService.Processor<>(impl); - } - - @Override - public void initThriftServiceThread() - throws IllegalAccessException, InstantiationException, ClassNotFoundException { - try { - IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); - thriftServiceThread = - new ThriftServiceThread( - processor, - getID().getName(), - ThreadName.AINODE_RPC_SERVICE.getName(), - getBindIP(), - getBindPort(), - config.getRpcMaxConcurrentClientNum(), - config.getThriftServerAwaitTimeForStopService(), - new AINodeRPCServiceThriftHandler(impl), - config.isRpcThriftCompressionEnable(), - DeepCopyRpcTransportFactory.INSTANCE); - } catch (RPCServiceException e) { - throw new IllegalAccessException(e.getMessage()); - } - thriftServiceThread.setName(ThreadName.AINODE_RPC_SERVICE.getName()); - } - - @Override - public String getBindIP() { - return IoTDBDescriptor.getInstance().getConfig().getRpcAddress(); - } - - @Override - public int getBindPort() { - return IoTDBDescriptor.getInstance().getConfig().getAINodePort(); - } - - private static class AINodeRPCServiceHolder { - private static final AINodeRPCService INSTANCE = new AINodeRPCService(); - - private AINodeRPCServiceHolder() {} - } - - public static AINodeRPCService getInstance() { - return AINodeRPCServiceHolder.INSTANCE; - } -} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCServiceMBean.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCServiceMBean.java deleted file mode 100644 index f4f51c0caa27..000000000000 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/AINodeRPCServiceMBean.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.iotdb.db.service; - -public interface AINodeRPCServiceMBean {} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 986337d22276..5403c8ab3ec3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -818,10 +818,6 @@ private void setUp() throws StartupException { private void setUpRPCService() throws StartupException { // Start InternalRPCService to indicate that the current DataNode can accept cluster scheduling registerManager.register(DataNodeInternalRPCService.getInstance()); - // Start InternalRPCService to indicate that the current DataNode can accept request from AINode - if (config.isEnableAINodeService()) { - registerManager.register(AINodeRPCService.getInstance()); - } // Notice: During the period between starting the internal RPC service // and starting the client RPC service , some requests may fail because diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift index 7ea36ce54302..2cd84fdda0f9 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift @@ -1057,17 +1057,4 @@ service MPPDataExchangeService { /** Empty rpc, only for connection test */ common.TSStatus testConnectionEmptyRPC() -} - -service IAINodeInternalRPCService{ - /** - * Fecth the data of the specified time series - */ - TFetchTimeseriesResp fetchTimeseries(TFetchTimeseriesReq req) - - /** - * Fetch rest data for a specified fetchTimeseries - */ - TFetchMoreDataResp fetchMoreData(TFetchMoreDataReq req) - -} +} \ No newline at end of file