Skip to content

Commit

Permalink
AINode: Correcting the Omissions and Redundancies (apache#13594)
Browse files Browse the repository at this point in the history
* Add logic of plan deserialization in configNode

* remove AINodeService in datanode which is not used anymore.
  • Loading branch information
ycycse authored Sep 25, 2024
1 parent 59475a6 commit ee9b752
Show file tree
Hide file tree
Showing 13 changed files with 106 additions and 444 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,13 @@
package org.apache.iotdb.confignode.consensus.request;

import org.apache.iotdb.commons.exception.runtime.SerializationRunTimeException;
import org.apache.iotdb.confignode.consensus.request.read.ainode.GetAINodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.read.model.GetModelInfoPlan;
import org.apache.iotdb.confignode.consensus.request.read.model.ShowModelPlan;
import org.apache.iotdb.confignode.consensus.request.read.subscription.ShowTopicPlan;
import org.apache.iotdb.confignode.consensus.request.write.ainode.RegisterAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.ainode.RemoveAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.ainode.UpdateAINodePlan;
import org.apache.iotdb.confignode.consensus.request.write.auth.AuthorPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
Expand All @@ -43,6 +49,10 @@
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.function.CreateFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.function.DropFunctionPlan;
import org.apache.iotdb.confignode.consensus.request.write.model.CreateModelPlan;
import org.apache.iotdb.confignode.consensus.request.write.model.DropModelInNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.model.DropModelPlan;
import org.apache.iotdb.confignode.consensus.request.write.model.UpdateModelInfoPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.AddRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateDataPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
Expand Down Expand Up @@ -167,6 +177,18 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case RemoveDataNode:
plan = new RemoveDataNodePlan();
break;
case RegisterAINode:
plan = new RegisterAINodePlan();
break;
case RemoveAINode:
plan = new RemoveAINodePlan();
break;
case GetAINodeConfiguration:
plan = new GetAINodeConfigurationPlan();
break;
case UpdateAINodeConfiguration:
plan = new UpdateAINodePlan();
break;
case CreateDatabase:
plan = new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase);
break;
Expand Down Expand Up @@ -420,6 +442,24 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case UPDATE_CQ_LAST_EXEC_TIME:
plan = new UpdateCQLastExecTimePlan();
break;
case CreateModel:
plan = new CreateModelPlan();
break;
case UpdateModelInfo:
plan = new UpdateModelInfoPlan();
break;
case DropModel:
plan = new DropModelPlan();
break;
case ShowModel:
plan = new ShowModelPlan();
break;
case DropModelInNode:
plan = new DropModelInNodePlan();
break;
case GetModelInfo:
plan = new GetModelInfoPlan();
break;
case CreatePipePlugin:
plan = new CreatePipePluginPlan();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;

public class GetAINodeConfigurationPlan extends ConfigPhysicalReadPlan {

// if aiNodeId is set to -1, return all AINode configurations.
private final int aiNodeId;
private int aiNodeId;

public GetAINodeConfigurationPlan() {
super(ConfigPhysicalPlanType.GetAINodeConfiguration);
}

public GetAINodeConfigurationPlan(final int aiNodeId) {
super(ConfigPhysicalPlanType.GetAINodeConfiguration);
Expand All @@ -36,6 +44,17 @@ public int getAiNodeId() {
return aiNodeId;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
stream.writeInt(aiNodeId);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
this.aiNodeId = buffer.getInt();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,20 @@
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
import org.apache.iotdb.confignode.rpc.thrift.TGetModelInfoReq;

import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class GetModelInfoPlan extends ConfigPhysicalReadPlan {

private final String modelId;
private String modelId;

public GetModelInfoPlan() {
super(ConfigPhysicalPlanType.GetModelInfo);
}

public GetModelInfoPlan(final TGetModelInfoReq getModelInfoReq) {
super(ConfigPhysicalPlanType.GetModelInfo);
Expand All @@ -38,6 +47,17 @@ public String getModelId() {
return modelId;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
ReadWriteIOUtils.write(modelId, stream);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
this.modelId = ReadWriteIOUtils.readString(buffer);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,21 @@
import org.apache.iotdb.confignode.consensus.request.read.ConfigPhysicalReadPlan;
import org.apache.iotdb.confignode.rpc.thrift.TShowModelReq;

import org.apache.tsfile.utils.ReadWriteIOUtils;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;

public class ShowModelPlan extends ConfigPhysicalReadPlan {

private String modelName;

public ShowModelPlan() {
super(ConfigPhysicalPlanType.ShowModel);
}

public ShowModelPlan(final TShowModelReq showModelReq) {
super(ConfigPhysicalPlanType.ShowModel);
if (showModelReq.isSetModelId()) {
Expand All @@ -44,6 +53,21 @@ public String getModelName() {
return modelName;
}

@Override
protected void serializeImpl(DataOutputStream stream) throws IOException {
stream.writeShort(getType().getPlanType());
ReadWriteIOUtils.write(modelName != null, stream);
ReadWriteIOUtils.write(modelName, stream);
}

@Override
protected void deserializeImpl(ByteBuffer buffer) throws IOException {
boolean isSetModelId = ReadWriteIOUtils.readBool(buffer);
if (isSetModelId) {
this.modelName = ReadWriteIOUtils.readString(buffer);
}
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,9 +442,6 @@ public class IoTDBConfig {
/** Enable auto repair compaction */
private volatile boolean enableAutoRepairCompaction = true;

/** Enable the service for AINode */
private boolean enableAINodeService = false;

/** The buffer for sort operation */
private long sortBufferSize = 1024 * 1024L;

Expand Down Expand Up @@ -923,9 +920,6 @@ public class IoTDBConfig {
/** Internal port for coordinator */
private int internalPort = 10730;

/** Port for AINode */
private int aiNodePort = 10780;

/** Internal port for dataRegion consensus protocol */
private int dataRegionConsensusPort = 10760;

Expand Down Expand Up @@ -2884,14 +2878,6 @@ public void setEnableAutoRepairCompaction(boolean enableAutoRepairCompaction) {
this.enableAutoRepairCompaction = enableAutoRepairCompaction;
}

public boolean isEnableAINodeService() {
return enableAINodeService;
}

public void setEnableAINodeService(boolean enableAINodeService) {
this.enableAINodeService = enableAINodeService;
}

public InnerSequenceCompactionSelector getInnerSequenceCompactionSelector() {
return innerSequenceCompactionSelector;
}
Expand Down Expand Up @@ -3170,14 +3156,6 @@ public void setInternalPort(int internalPort) {
this.internalPort = internalPort;
}

public int getAINodePort() {
return aiNodePort;
}

public void setAINodePort(int aiNodePort) {
this.aiNodePort = aiNodePort;
}

public int getDataRegionConsensusPort() {
return dataRegionConsensusPort;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,19 +287,6 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
.getProperty(IoTDBConstant.DN_RPC_PORT, Integer.toString(conf.getRpcPort()))
.trim()));

conf.setEnableAINodeService(
Boolean.parseBoolean(
properties
.getProperty(
"enable_ainode_rpc_service", Boolean.toString(conf.isEnableAINodeService()))
.trim()));

conf.setAINodePort(
Integer.parseInt(
properties
.getProperty("ainode_rpc_port", Integer.toString(conf.getAINodePort()))
.trim()));

conf.setBufferedArraysMemoryProportion(
Double.parseDouble(
properties
Expand Down

This file was deleted.

Loading

0 comments on commit ee9b752

Please sign in to comment.