From f5b169fbcdcdf3e23da2e5b7edf8249c8906a439 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Sat, 7 Sep 2024 17:52:15 +0800 Subject: [PATCH 1/9] introduce iot consensus v2 and corresponding mode option --- .../iotdb/it/env/cluster/ClusterConstant.java | 4 +--- .../apache/iotdb/it/env/cluster/EnvUtils.java | 18 +++--------------- .../confignode/conf/ConfigNodeConfig.java | 11 +++++++++++ .../confignode/conf/ConfigNodeDescriptor.java | 3 +++ .../conf/ConfigNodeStartupCheck.java | 5 ++--- .../conf/SystemPropertiesUtils.java | 7 +++++++ .../manager/load/balancer/RouteBalancer.java | 3 --- .../confignode/manager/node/NodeManager.java | 1 + .../iotdb/consensus/ConsensusFactory.java | 19 ++++--------------- .../consensus/config/PipeConsensusConfig.java | 11 +++++++++-- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 15 +++++++++++++++ .../apache/iotdb/db/conf/IoTDBDescriptor.java | 4 ++-- .../db/consensus/DataRegionConsensusImpl.java | 4 +++- .../planner/plan/node/write/InsertNode.java | 1 - .../org/apache/iotdb/db/service/DataNode.java | 4 ++-- .../iotdb/db/storageengine/StorageEngine.java | 3 --- .../storageengine/dataregion/DataRegion.java | 1 - .../dataregion/wal/WALManager.java | 18 +++--------------- .../wal/recover/WALNodeRecoverTask.java | 7 ++----- .../conf/iotdb-system.properties.template | 14 ++++++++++++-- .../src/main/thrift/confignode.thrift | 11 ++++++----- 21 files changed, 86 insertions(+), 78 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java index b3802f3ea8d8..3c222c4bfe34 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java @@ -217,9 +217,7 @@ public class ClusterConstant { public static final String SIMPLE_CONSENSUS_STR = "Simple"; public static final String RATIS_CONSENSUS_STR = "Ratis"; public static final String IOT_CONSENSUS_STR = "IoT"; - public static final String PIPE_CONSENSUS_STR = "Pipe"; - public static final String STREAM_CONSENSUS_STR = "Stream"; - public static final String BATCH_CONSENSUS_STR = "Batch"; + public static final String IOT_CONSENSUS_V2_STR = "IoTV2"; public static final String JAVA_CMD = System.getProperty("java.home") diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java index f3c9527e5952..f6156a3331c1 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java @@ -30,14 +30,11 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.iotdb.consensus.ConsensusFactory.FAST_IOT_CONSENSUS; import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS; import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2; import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS; -import static org.apache.iotdb.consensus.ConsensusFactory.REAL_PIPE_CONSENSUS; import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS; import static org.apache.iotdb.db.utils.DateTimeUtils.convertLongToDate; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.BATCH_CONSENSUS_STR; import static org.apache.iotdb.it.env.cluster.ClusterConstant.CLUSTER_CONFIGURATIONS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_CONFIG_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_DATA_NODE_NUM; @@ -47,17 +44,16 @@ import static org.apache.iotdb.it.env.cluster.ClusterConstant.HIGH_PERFORMANCE_MODE_CONFIG_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.HIGH_PERFORMANCE_MODE_DATA_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.IOT_CONSENSUS_STR; +import static org.apache.iotdb.it.env.cluster.ClusterConstant.IOT_CONSENSUS_V2_STR; import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE; import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_CONFIG_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_DATA_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.LOCK_FILE_PATH; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_CONSENSUS_STR; import static org.apache.iotdb.it.env.cluster.ClusterConstant.RATIS_CONSENSUS_STR; import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE; import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE_CONFIG_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE_DATA_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.SIMPLE_CONSENSUS_STR; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAM_CONSENSUS_STR; import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE; import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE_CONFIG_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE_DATA_NODE_NUM; @@ -222,12 +218,8 @@ public static String fromConsensusFullNameToAbbr(String consensus) { return RATIS_CONSENSUS_STR; case IOT_CONSENSUS: return IOT_CONSENSUS_STR; - case REAL_PIPE_CONSENSUS: - return PIPE_CONSENSUS_STR; case IOT_CONSENSUS_V2: - return STREAM_CONSENSUS_STR; - case FAST_IOT_CONSENSUS: - return BATCH_CONSENSUS_STR; + return IOT_CONSENSUS_V2_STR; default: throw new IllegalArgumentException("Unknown consensus type: " + consensus); } @@ -241,12 +233,8 @@ public static String fromConsensusAbbrToFullName(String consensus) { return RATIS_CONSENSUS; case IOT_CONSENSUS_STR: return IOT_CONSENSUS; - case PIPE_CONSENSUS_STR: - return REAL_PIPE_CONSENSUS; - case STREAM_CONSENSUS_STR: + case IOT_CONSENSUS_V2_STR: return IOT_CONSENSUS_V2; - case BATCH_CONSENSUS_STR: - return FAST_IOT_CONSENSUS; default: throw new IllegalArgumentException("Unknown consensus type: " + consensus); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 86bc4e579ed8..5a187d7b9e22 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -68,6 +68,9 @@ public class ConfigNodeConfig { /** Data region consensus protocol. */ private String dataRegionConsensusProtocolClass = ConsensusFactory.IOT_CONSENSUS; + /** iotConsensusV2 protocol mode. */ + private String iotConsensusV2Mode = ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE; + /** Default number of DataRegion replicas. */ private int dataReplicationFactor = 1; @@ -497,6 +500,14 @@ public void setDataRegionConsensusProtocolClass(String dataRegionConsensusProtoc this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass; } + public String getIotConsensusV2Mode() { + return iotConsensusV2Mode; + } + + public void setIotConsensusV2Mode(String iotConsensusV2Mode) { + this.iotConsensusV2Mode = iotConsensusV2Mode; + } + public double getDataRegionPerDataNode() { return dataRegionPerDataNode; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 771905358760..1858e9d4e8ee 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -216,6 +216,9 @@ private void loadProperties(Properties properties) throws BadNodeUrlException, I "data_region_consensus_protocol_class", conf.getDataRegionConsensusProtocolClass()) .trim()); + conf.setIotConsensusV2Mode( + properties.getProperty("iot_consensus_v2_mode", conf.getIotConsensusV2Mode()).trim()); + conf.setDataReplicationFactor( Integer.parseInt( properties diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java index ced2964ec5cb..42fba66eb3bf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java @@ -142,10 +142,9 @@ private void checkGlobalConfig() throws ConfigurationException { "the SchemaRegion doesn't support org.apache.iotdb.consensus.iot.IoTConsensus"); } - // When the schemaengine region consensus protocol is set to PipeConsensus, + // When the schemaengine region consensus protocol is set to IoTConsensusV2, // we should report an error - if (CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS) - || CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) { + if (CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) { throw new ConfigurationException( "schema_region_consensus_protocol_class", String.valueOf(CONF.getSchemaRegionConsensusProtocolClass()), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java index edd478e053bd..e1cbba983fff 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java @@ -53,6 +53,7 @@ public class SystemPropertiesUtils { private static final String TIMESTAMP_PRECISION = "timestamp_precision"; private static final String CN_CONSENSUS_PROTOCOL = "config_node_consensus_protocol_class"; private static final String DATA_CONSENSUS_PROTOCOL = "data_region_consensus_protocol_class"; + private static final String IOT_CONSENSUS_V2_MODE = "iot_consensus_v2_mode"; private static final String SCHEMA_CONSENSUS_PROTOCOL = "schema_region_consensus_protocol_class"; private static final String SERIES_PARTITION_SLOT_NUM = "series_partition_slot_num"; private static final String SERIES_PARTITION_EXECUTOR_CLASS = "series_partition_executor_class"; @@ -146,6 +147,12 @@ public static void checkSystemProperties() throws IOException { conf.setDataRegionConsensusProtocolClass(dataRegionConsensusProtocolClass); } + String iotConsensusV2Mode = systemProperties.getProperty(IOT_CONSENSUS_V2_MODE, null); + if (!iotConsensusV2Mode.equals(conf.getIotConsensusV2Mode())) { + LOGGER.warn(format, IOT_CONSENSUS_V2_MODE, conf.getIotConsensusV2Mode(), iotConsensusV2Mode); + conf.setIotConsensusV2Mode(iotConsensusV2Mode); + } + String schemaRegionConsensusProtocolClass = systemProperties.getProperty(SCHEMA_CONSENSUS_PROTOCOL, null); if (!schemaRegionConsensusProtocolClass.equals(conf.getSchemaRegionConsensusProtocolClass())) { diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index 94e1e5c0eb66..13d58397c9f1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -80,8 +80,6 @@ public class RouteBalancer implements IClusterStatusSubscriber { && ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS)) || (CONF.isEnableAutoLeaderBalanceForIoTConsensus() && ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS)) - || (CONF.isEnableAutoLeaderBalanceForIoTConsensus() - && ConsensusFactory.FAST_IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS)) || (CONF.isEnableAutoLeaderBalanceForIoTConsensus() && ConsensusFactory.IOT_CONSENSUS_V2.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS)) // The simple consensus protocol will always automatically designate itself as the leader @@ -185,7 +183,6 @@ private void balanceRegionLeader( regionGroupId, newLeaderId); switch (consensusProtocolClass) { - case ConsensusFactory.FAST_IOT_CONSENSUS: case ConsensusFactory.IOT_CONSENSUS_V2: case ConsensusFactory.IOT_CONSENSUS: case ConsensusFactory.SIMPLE_CONSENSUS: diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index 28116bbc3451..a346362d5a07 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -170,6 +170,7 @@ private void setGlobalConfig(ConfigurationResp dataSet) { globalConfig.setTimestampPrecision(commonConfig.getTimestampPrecision()); globalConfig.setSchemaEngineMode(commonConfig.getSchemaEngineMode()); globalConfig.setTagAttributeTotalSize(commonConfig.getTagAttributeTotalSize()); + globalConfig.setIotConsensusV2Mode(configNodeConfig.getIotConsensusV2Mode()); dataSet.setGlobalConfig(globalConfig); } diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java index 8cab79e315e4..9b338dd74529 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java @@ -21,7 +21,6 @@ import org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer; import org.apache.iotdb.consensus.config.ConsensusConfig; -import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode; import org.apache.iotdb.consensus.pipe.metric.PipeConsensusSyncLagManager; import org.slf4j.Logger; @@ -29,8 +28,6 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; public class ConsensusFactory { @@ -41,16 +38,9 @@ public class ConsensusFactory { public static final String RATIS_CONSENSUS = "org.apache.iotdb.consensus.ratis.RatisConsensus"; public static final String IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTConsensus"; public static final String REAL_PIPE_CONSENSUS = "org.apache.iotdb.consensus.pipe.PipeConsensus"; - // Corresponding to streamConsensus public static final String IOT_CONSENSUS_V2 = "org.apache.iotdb.consensus.iot.IoTConsensusV2"; - // Corresponding to batchConsensus - public static final String FAST_IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.FastIoTConsensus"; - private static final Map PIPE_CONSENSUS_MODE_MAP = new HashMap<>(); - - static { - PIPE_CONSENSUS_MODE_MAP.put(IOT_CONSENSUS_V2, ReplicateMode.STREAM); - PIPE_CONSENSUS_MODE_MAP.put(FAST_IOT_CONSENSUS, ReplicateMode.BATCH); - } + public static final String IOT_CONSENSUS_V2_BATCH_MODE = "batch"; + public static final String IOT_CONSENSUS_V2_STREAM_MODE = "stream"; private static final Logger logger = LoggerFactory.getLogger(ConsensusFactory.class); @@ -61,9 +51,8 @@ private ConsensusFactory() { public static Optional getConsensusImpl( String className, ConsensusConfig config, IStateMachine.Registry registry) { try { - // special judge for PipeConsensus - if (className.equals(IOT_CONSENSUS_V2) || className.equals(FAST_IOT_CONSENSUS)) { - config.getPipeConsensusConfig().setReplicateMode(PIPE_CONSENSUS_MODE_MAP.get(className)); + // special judge for IoTConsensusV2 + if (className.equals(IOT_CONSENSUS_V2)) { className = REAL_PIPE_CONSENSUS; // initialize pipeConsensus' thrift component PipeConsensusClientMgrContainer.build(); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java index bec2c7d8e1e0..fa412442bf6a 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java @@ -34,9 +34,10 @@ public class PipeConsensusConfig { // Use stream mode by default. User can configure it private ReplicateMode replicateMode = ReplicateMode.STREAM; - public PipeConsensusConfig(RPC rpc, Pipe pipe) { + public PipeConsensusConfig(RPC rpc, Pipe pipe, ReplicateMode replicateMode) { this.rpc = rpc; this.pipe = pipe; + this.replicateMode = replicateMode; } public void setReplicateMode(ReplicateMode replicateMode) { @@ -62,6 +63,7 @@ public static Builder newBuilder() { public static class Builder { private RPC rpc; private Pipe pipe; + private ReplicateMode replicateMode; public Builder setPipe(Pipe pipe) { this.pipe = pipe; @@ -73,8 +75,13 @@ public Builder setRPC(RPC rpc) { return this; } + public Builder setReplicateMode(ReplicateMode replicateMode) { + this.replicateMode = replicateMode; + return this; + } + public PipeConsensusConfig build() { - return new PipeConsensusConfig(rpc, pipe); + return new PipeConsensusConfig(rpc, pipe, replicateMode); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 4c4b0e824d08..c26cbdb2c838 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -935,6 +935,13 @@ public class IoTDBConfig { */ private String dataRegionConsensusProtocolClass = ConsensusFactory.IOT_CONSENSUS; + /** + * The mode of iotConsensusV2 protocol. The Datanode should communicate with ConfigNode on startup + * and set this variable so that the correct class name can be obtained later when the data region + * consensus layer singleton is initialized + */ + private String iotConsensusV2Mode = ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE; + /** * The consensus protocol class for schema region. The Datanode should communicate with ConfigNode * on startup and set this variable so that the correct class name can be obtained later when the @@ -3188,6 +3195,14 @@ public void setDataRegionConsensusProtocolClass(String dataRegionConsensusProtoc this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass; } + public String getIotConsensusV2Mode() { + return iotConsensusV2Mode; + } + + public void setIotConsensusV2Mode(String iotConsensusV2Mode) { + this.iotConsensusV2Mode = iotConsensusV2Mode; + } + public String getSchemaRegionConsensusProtocolClass() { return schemaRegionConsensusProtocolClass; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 58de8530dc4e..09db94994438 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -1130,9 +1130,9 @@ private void loadPipeConsensusProps(Properties properties) throws IOException { conf.setPipeConsensusPipelineSize( Integer.parseInt( properties.getProperty( - "fast_iot_consensus_pipeline_size", + "iot_consensus_v2_pipeline_size", ConfigurationFileUtils.getConfigurationDefaultValue( - "fast_iot_consensus_pipeline_size")))); + "iot_consensus_v2_pipeline_size")))); if (conf.getPipeConsensusPipelineSize() <= 0) { conf.setPipeConsensusPipelineSize(5); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 515a9f54ddca..eb01ca7d374e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -30,6 +30,7 @@ import org.apache.iotdb.consensus.config.IoTConsensusConfig; import org.apache.iotdb.consensus.config.IoTConsensusConfig.RPC; import org.apache.iotdb.consensus.config.PipeConsensusConfig; +import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode; import org.apache.iotdb.consensus.config.RatisConfig; import org.apache.iotdb.consensus.config.RatisConfig.Snapshot; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -168,8 +169,9 @@ private static ConsensusConfig buildConsensusConfig() { () -> PipeDataNodeAgent.task().getAllConsensusPipe()) .setConsensusPipeReceiver(PipeDataNodeAgent.receiver().pipeConsensus()) .setProgressIndexManager(new ProgressIndexDataNodeManager()) - .setConsensusPipeGuardJobIntervalInSeconds(300) // TODO: move to config + .setConsensusPipeGuardJobIntervalInSeconds(300) .build()) + .setReplicateMode(ReplicateMode.valueOf(CONF.getIotConsensusV2Mode())) .build()) .setRatisConfig( RatisConfig.newBuilder() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index f18de9ef2b9d..5e49a1eeb38b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -210,7 +210,6 @@ public boolean isGeneratedByRemoteConsensusLeader() { switch (config.getDataRegionConsensusProtocolClass()) { case ConsensusFactory.IOT_CONSENSUS: case ConsensusFactory.IOT_CONSENSUS_V2: - case ConsensusFactory.FAST_IOT_CONSENSUS: case ConsensusFactory.RATIS_CONSENSUS: return isGeneratedByRemoteConsensusLeader; case ConsensusFactory.SIMPLE_CONSENSUS: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 44582e30e844..12126cc65d97 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -398,6 +398,7 @@ private void pullAndCheckSystemConfigurations() throws StartupException { .checkConsensusProtocolExists(TConsensusGroupType.DataRegion)) { config.setDataRegionConsensusProtocolClass( configurationResp.globalConfig.getDataRegionConsensusProtocolClass()); + config.setIotConsensusV2Mode(configurationResp.globalConfig.getIotConsensusV2Mode()); } if (!IoTDBStartCheck.getInstance() @@ -839,8 +840,7 @@ public TDataNodeConfiguration generateDataNodeConfiguration() { } private boolean isUsingPipeConsensus() { - return config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2) - || config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS); + return config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2); } private void registerUdfServices() throws StartupException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index 11b2a1c6c632..a4ea80984688 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -757,9 +757,6 @@ public void deleteDataRegion(DataRegionId regionId) { region.syncDeleteDataFiles(); region.deleteFolder(systemDir); if (CONFIG.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) - || CONFIG - .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.FAST_IOT_CONSENSUS) || CONFIG .getDataRegionConsensusProtocolClass() .equals(ConsensusFactory.IOT_CONSENSUS_V2)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 09b30bf1bb6d..17ac5a299980 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3871,7 +3871,6 @@ public void markDeleted() { private void acquireDirectBufferMemory() throws DataRegionException { long acquireDirectBufferMemCost = 0; if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) - || config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS) || config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) { acquireDirectBufferMemCost = config.getWalBufferSize(); } else if (config diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java index bff78d50e5bf..e952dec11791 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java @@ -70,10 +70,7 @@ public class WALManager implements IService { private WALManager() { if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) - || config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2) - || config - .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.FAST_IOT_CONSENSUS)) { + || config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) { walNodesManager = new FirstCreateStrategy(); } else if (config.getMaxWalNodesNum() == 0) { walNodesManager = new ElasticStrategy(); @@ -87,9 +84,6 @@ public static String getApplicantUniqueId(String storageGroupName, boolean seque || config .getDataRegionConsensusProtocolClass() .equals(ConsensusFactory.IOT_CONSENSUS_V2) - || config - .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.FAST_IOT_CONSENSUS) ? storageGroupName : storageGroupName + IoTDBConstant.FILE_NAME_SEPARATOR @@ -112,10 +106,7 @@ public void registerWALNode( || (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) && !config .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.IOT_CONSENSUS_V2) - && !config - .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.FAST_IOT_CONSENSUS))) { + .equals(ConsensusFactory.IOT_CONSENSUS_V2))) { return; } @@ -130,10 +121,7 @@ public void deleteWALNode(String applicantUniqueId) { || (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) && !config .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.IOT_CONSENSUS_V2) - && !config - .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.FAST_IOT_CONSENSUS))) { + .equals(ConsensusFactory.IOT_CONSENSUS_V2))) { return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java index 18bb293792af..bf67a39cbbc1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java @@ -134,11 +134,8 @@ public void run() { logDirectory); } - // PipeConsensus will not only delete WAL node folder, but also register WAL node. - if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS) - || config - .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.IOT_CONSENSUS_V2)) { + // IoTConsensusV2 will not only delete WAL node folder, but also register WAL node. + if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) { // register wal node WALManager.getInstance() .registerWALNode( diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 945199534fc9..8291c61dd2ef 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -154,10 +154,20 @@ data_replication_factor=1 # 1. org.apache.iotdb.consensus.simple.SimpleConsensus (The data_replication_factor can only be set to 1) # 2. org.apache.iotdb.consensus.iot.IoTConsensus # 3. org.apache.iotdb.consensus.ratis.RatisConsensus +# 4. org.apache.iotdb.consensus.ratis.IoTConsensusV2 # effectiveMode: first_start # Datatype: string data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus +# IoTConsensusV2 mode. +# This parameter is unmodifiable after ConfigNode starts for the first time. +# These consensus protocol modes are currently supported: +# 1. batch +# 2. stream +# effectiveMode: first_start +# Datatype: string +iot_consensus_v2_mode=batch + #################### ### Directory configuration #################### @@ -1804,12 +1814,12 @@ schema_region_ratis_periodic_snapshot_interval=86400 data_region_ratis_periodic_snapshot_interval=86400 #################### -### Fast IoTConsensus Configuration +### IoTConsensusV2 Configuration #################### # Default event buffer size for connector and receiver in pipe consensus # effectiveMode: hot_reload # DataType: int -fast_iot_consensus_pipeline_size=5 +iot_consensus_v2_pipeline_size=5 #################### ### Procedure Configuration diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index 05e995af8f9f..bbcb8fd20e5e 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -37,11 +37,12 @@ struct TGlobalConfig { 5: required i64 timePartitionInterval 6: required string readConsistencyLevel 7: required double diskSpaceWarningThreshold - 8: optional string timestampPrecision - 9: optional string schemaEngineMode - 10: optional i32 tagAttributeTotalSize - 11: optional bool isEnterprise - 12: optional i64 timePartitionOrigin + 8: optional string iotConsensusV2Mode + 9: optional string timestampPrecision + 10: optional string schemaEngineMode + 11: optional i32 tagAttributeTotalSize + 12: optional bool isEnterprise + 13: optional i64 timePartitionOrigin } struct TRatisConfig { From ce8f3f4aa7bedd80276c623ba9ad2e712a769ff7 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Thu, 12 Sep 2024 14:58:50 +0800 Subject: [PATCH 2/9] retrigger ci --- .../java/org/apache/iotdb/consensus/ConsensusFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java index 9b338dd74529..6196b37f1236 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java @@ -54,9 +54,9 @@ public static Optional getConsensusImpl( // special judge for IoTConsensusV2 if (className.equals(IOT_CONSENSUS_V2)) { className = REAL_PIPE_CONSENSUS; - // initialize pipeConsensus' thrift component + // initialize iotConsensusV2's thrift component PipeConsensusClientMgrContainer.build(); - // initialize pipeConsensus's metric component + // initialize iotConsensusV2's metric component PipeConsensusSyncLagManager.build(); } Class executor = Class.forName(className); From 98ed6c47e640bd60a88b86541f6e4476a4352917 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Fri, 13 Sep 2024 16:35:41 +0800 Subject: [PATCH 3/9] fix ut --- .../iotdb/consensus/config/PipeConsensusConfig.java | 10 ++++++++++ .../iotdb/db/consensus/DataRegionConsensusImpl.java | 2 +- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java index fa412442bf6a..cd0ea44145b3 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java @@ -361,5 +361,15 @@ public enum ReplicateMode { public String getValue() { return value; } + + public static ReplicateMode fromValue(String value) { + if (value.equalsIgnoreCase(STREAM.getValue())) { + return STREAM; + } else if (value.equalsIgnoreCase(BATCH.getValue())) { + return BATCH; + } + // return batch by default + return BATCH; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index eb01ca7d374e..06d95a75cee1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -171,7 +171,7 @@ private static ConsensusConfig buildConsensusConfig() { .setProgressIndexManager(new ProgressIndexDataNodeManager()) .setConsensusPipeGuardJobIntervalInSeconds(300) .build()) - .setReplicateMode(ReplicateMode.valueOf(CONF.getIotConsensusV2Mode())) + .setReplicateMode(ReplicateMode.fromValue(CONF.getIotConsensusV2Mode())) .build()) .setRatisConfig( RatisConfig.newBuilder() From 64b5752298ead90b58a08843fdeaa280e8bd9fc6 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Fri, 13 Sep 2024 18:53:21 +0800 Subject: [PATCH 4/9] fix ut --- .../apache/iotdb/confignode/conf/SystemPropertiesUtils.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java index e1cbba983fff..19ff9613ceef 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java @@ -147,12 +147,6 @@ public static void checkSystemProperties() throws IOException { conf.setDataRegionConsensusProtocolClass(dataRegionConsensusProtocolClass); } - String iotConsensusV2Mode = systemProperties.getProperty(IOT_CONSENSUS_V2_MODE, null); - if (!iotConsensusV2Mode.equals(conf.getIotConsensusV2Mode())) { - LOGGER.warn(format, IOT_CONSENSUS_V2_MODE, conf.getIotConsensusV2Mode(), iotConsensusV2Mode); - conf.setIotConsensusV2Mode(iotConsensusV2Mode); - } - String schemaRegionConsensusProtocolClass = systemProperties.getProperty(SCHEMA_CONSENSUS_PROTOCOL, null); if (!schemaRegionConsensusProtocolClass.equals(conf.getSchemaRegionConsensusProtocolClass())) { From 6f6936e180b7353e4c4b10237ea19b3a7a8dc455 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Wed, 18 Sep 2024 14:53:14 +0800 Subject: [PATCH 5/9] fix review --- .../iotdb/confignode/conf/SystemPropertiesUtils.java | 1 - .../iotdb/consensus/config/PipeConsensusConfig.java | 6 +----- .../resources/conf/iotdb-system.properties.template | 2 +- .../src/main/thrift/confignode.thrift | 12 ++++++------ 4 files changed, 8 insertions(+), 13 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java index 19ff9613ceef..edd478e053bd 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java @@ -53,7 +53,6 @@ public class SystemPropertiesUtils { private static final String TIMESTAMP_PRECISION = "timestamp_precision"; private static final String CN_CONSENSUS_PROTOCOL = "config_node_consensus_protocol_class"; private static final String DATA_CONSENSUS_PROTOCOL = "data_region_consensus_protocol_class"; - private static final String IOT_CONSENSUS_V2_MODE = "iot_consensus_v2_mode"; private static final String SCHEMA_CONSENSUS_PROTOCOL = "schema_region_consensus_protocol_class"; private static final String SERIES_PARTITION_SLOT_NUM = "series_partition_slot_num"; private static final String SERIES_PARTITION_EXECUTOR_CLASS = "series_partition_executor_class"; diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java index cd0ea44145b3..9295fa793422 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java @@ -32,7 +32,7 @@ public class PipeConsensusConfig { private final RPC rpc; private final Pipe pipe; // Use stream mode by default. User can configure it - private ReplicateMode replicateMode = ReplicateMode.STREAM; + private final ReplicateMode replicateMode; public PipeConsensusConfig(RPC rpc, Pipe pipe, ReplicateMode replicateMode) { this.rpc = rpc; @@ -40,10 +40,6 @@ public PipeConsensusConfig(RPC rpc, Pipe pipe, ReplicateMode replicateMode) { this.replicateMode = replicateMode; } - public void setReplicateMode(ReplicateMode replicateMode) { - this.replicateMode = replicateMode; - } - public ReplicateMode getReplicateMode() { return replicateMode; } diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 8291c61dd2ef..9ed28d079523 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -155,7 +155,7 @@ data_replication_factor=1 # 2. org.apache.iotdb.consensus.iot.IoTConsensus # 3. org.apache.iotdb.consensus.ratis.RatisConsensus # 4. org.apache.iotdb.consensus.ratis.IoTConsensusV2 -# effectiveMode: first_start +# effectiveMode: restart # Datatype: string data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index bbcb8fd20e5e..f62b9ec33a2d 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -37,12 +37,12 @@ struct TGlobalConfig { 5: required i64 timePartitionInterval 6: required string readConsistencyLevel 7: required double diskSpaceWarningThreshold - 8: optional string iotConsensusV2Mode - 9: optional string timestampPrecision - 10: optional string schemaEngineMode - 11: optional i32 tagAttributeTotalSize - 12: optional bool isEnterprise - 13: optional i64 timePartitionOrigin + 8: optional string timestampPrecision + 9: optional string schemaEngineMode + 10: optional i32 tagAttributeTotalSize + 11: optional bool isEnterprise + 12: optional i64 timePartitionOrigin + 13: optional string iotConsensusV2Mode } struct TRatisConfig { From 78cb32c475dc5f7390b90dd081cb42874a2a378e Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Fri, 20 Sep 2024 16:12:09 +0800 Subject: [PATCH 6/9] fix review --- .../conf/iotdb-system.properties.template | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 9ed28d079523..39e1687ecf92 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -155,18 +155,9 @@ data_replication_factor=1 # 2. org.apache.iotdb.consensus.iot.IoTConsensus # 3. org.apache.iotdb.consensus.ratis.RatisConsensus # 4. org.apache.iotdb.consensus.ratis.IoTConsensusV2 -# effectiveMode: restart -# Datatype: string -data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus - -# IoTConsensusV2 mode. -# This parameter is unmodifiable after ConfigNode starts for the first time. -# These consensus protocol modes are currently supported: -# 1. batch -# 2. stream # effectiveMode: first_start # Datatype: string -iot_consensus_v2_mode=batch +data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus #################### ### Directory configuration @@ -1816,11 +1807,19 @@ data_region_ratis_periodic_snapshot_interval=86400 #################### ### IoTConsensusV2 Configuration #################### -# Default event buffer size for connector and receiver in pipe consensus -# effectiveMode: hot_reload +# Default event buffer size for connector and receiver in iot consensus v2 +# effectiveMode: restart # DataType: int iot_consensus_v2_pipeline_size=5 +# IoTConsensusV2 mode. +# These consensus protocol modes are currently supported: +# 1. batch +# 2. stream +# effectiveMode: restart +# Datatype: string +iot_consensus_v2_mode=batch + #################### ### Procedure Configuration #################### From 457b7e312010028752acb814fb034af01da6a9cc Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Fri, 20 Sep 2024 16:28:26 +0800 Subject: [PATCH 7/9] fix review: restart for mode param --- .../confignode/conf/ConfigNodeConfig.java | 11 ---------- .../confignode/conf/ConfigNodeDescriptor.java | 3 --- .../confignode/manager/node/NodeManager.java | 1 - .../org/apache/iotdb/db/conf/IoTDBConfig.java | 20 +++++++------------ .../apache/iotdb/db/conf/IoTDBDescriptor.java | 14 +++++++------ .../PipeConsensusAsyncConnector.java | 4 ++-- .../pipeconsensus/PipeConsensusReceiver.java | 10 +++++----- .../org/apache/iotdb/db/service/DataNode.java | 1 - .../src/main/thrift/confignode.thrift | 1 - 9 files changed, 22 insertions(+), 43 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java index 5a187d7b9e22..86bc4e579ed8 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java @@ -68,9 +68,6 @@ public class ConfigNodeConfig { /** Data region consensus protocol. */ private String dataRegionConsensusProtocolClass = ConsensusFactory.IOT_CONSENSUS; - /** iotConsensusV2 protocol mode. */ - private String iotConsensusV2Mode = ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE; - /** Default number of DataRegion replicas. */ private int dataReplicationFactor = 1; @@ -500,14 +497,6 @@ public void setDataRegionConsensusProtocolClass(String dataRegionConsensusProtoc this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass; } - public String getIotConsensusV2Mode() { - return iotConsensusV2Mode; - } - - public void setIotConsensusV2Mode(String iotConsensusV2Mode) { - this.iotConsensusV2Mode = iotConsensusV2Mode; - } - public double getDataRegionPerDataNode() { return dataRegionPerDataNode; } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index 1858e9d4e8ee..771905358760 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java @@ -216,9 +216,6 @@ private void loadProperties(Properties properties) throws BadNodeUrlException, I "data_region_consensus_protocol_class", conf.getDataRegionConsensusProtocolClass()) .trim()); - conf.setIotConsensusV2Mode( - properties.getProperty("iot_consensus_v2_mode", conf.getIotConsensusV2Mode()).trim()); - conf.setDataReplicationFactor( Integer.parseInt( properties diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java index a346362d5a07..28116bbc3451 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java @@ -170,7 +170,6 @@ private void setGlobalConfig(ConfigurationResp dataSet) { globalConfig.setTimestampPrecision(commonConfig.getTimestampPrecision()); globalConfig.setSchemaEngineMode(commonConfig.getSchemaEngineMode()); globalConfig.setTagAttributeTotalSize(commonConfig.getTagAttributeTotalSize()); - globalConfig.setIotConsensusV2Mode(configNodeConfig.getIotConsensusV2Mode()); dataSet.setGlobalConfig(globalConfig); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index c26cbdb2c838..c3f0e4380e89 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -935,13 +935,6 @@ public class IoTDBConfig { */ private String dataRegionConsensusProtocolClass = ConsensusFactory.IOT_CONSENSUS; - /** - * The mode of iotConsensusV2 protocol. The Datanode should communicate with ConfigNode on startup - * and set this variable so that the correct class name can be obtained later when the data region - * consensus layer singleton is initialized - */ - private String iotConsensusV2Mode = ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE; - /** * The consensus protocol class for schema region. The Datanode should communicate with ConfigNode * on startup and set this variable so that the correct class name can be obtained later when the @@ -1133,8 +1126,9 @@ public class IoTDBConfig { private double maxMemoryRatioForQueue = 0.6; private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L; - // PipeConsensus Config - private int pipeConsensusPipelineSize = 5; + // IoTConsensusV2 Config + private int iotConsensusV2PipelineSize = 5; + private String iotConsensusV2Mode = ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE; /** Load related */ private double maxAllocateMemoryRatioForLoad = 0.8; @@ -1228,12 +1222,12 @@ public void setRegionMigrationSpeedLimitBytesPerSecond( this.regionMigrationSpeedLimitBytesPerSecond = regionMigrationSpeedLimitBytesPerSecond; } - public int getPipeConsensusPipelineSize() { - return pipeConsensusPipelineSize; + public int getIotConsensusV2PipelineSize() { + return iotConsensusV2PipelineSize; } - public void setPipeConsensusPipelineSize(int pipeConsensusPipelineSize) { - this.pipeConsensusPipelineSize = pipeConsensusPipelineSize; + public void setIotConsensusV2PipelineSize(int iotConsensusV2PipelineSize) { + this.iotConsensusV2PipelineSize = iotConsensusV2PipelineSize; } public void setMaxSizePerBatch(int maxSizePerBatch) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 09db94994438..e294143f461d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -1074,12 +1074,12 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO "datanode_schema_cache_eviction_policy", conf.getDataNodeSchemaCacheEvictionPolicy())); loadIoTConsensusProps(properties); - loadPipeConsensusProps(properties); + loadIoTConsensusV2Props(properties); } private void reloadConsensusProps(Properties properties) throws IOException { loadIoTConsensusProps(properties); - loadPipeConsensusProps(properties); + loadIoTConsensusV2Props(properties); DataRegionConsensusImpl.reloadConsensusConfig(); } @@ -1126,16 +1126,18 @@ private void loadIoTConsensusProps(Properties properties) throws IOException { .trim())); } - private void loadPipeConsensusProps(Properties properties) throws IOException { - conf.setPipeConsensusPipelineSize( + private void loadIoTConsensusV2Props(Properties properties) throws IOException { + conf.setIotConsensusV2PipelineSize( Integer.parseInt( properties.getProperty( "iot_consensus_v2_pipeline_size", ConfigurationFileUtils.getConfigurationDefaultValue( "iot_consensus_v2_pipeline_size")))); - if (conf.getPipeConsensusPipelineSize() <= 0) { - conf.setPipeConsensusPipelineSize(5); + if (conf.getIotConsensusV2PipelineSize() <= 0) { + conf.setIotConsensusV2PipelineSize(5); } + conf.setIotConsensusV2Mode( + properties.getProperty("iot_consensus_v2_mode", conf.getIotConsensusV2Mode()).trim()); } private void loadAuthorCache(Properties properties) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java index e4489799652c..870c383cd7a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java @@ -92,7 +92,7 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse private final BlockingQueue retryEventQueue = new LinkedBlockingQueue<>(); // We use enrichedEvent here to make use of EnrichedEvent.equalsInPipeConsensus private final BlockingQueue transferBuffer = - new LinkedBlockingDeque<>(IOTDB_CONFIG.getPipeConsensusPipelineSize()); + new LinkedBlockingDeque<>(IOTDB_CONFIG.getIotConsensusV2PipelineSize()); private final AtomicBoolean isClosed = new AtomicBoolean(false); private final int thisDataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); private PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics; @@ -206,7 +206,7 @@ public synchronized void removeEventFromBuffer(EnrichedEvent event) { consensusGroupId, event, transferBuffer.size(), - IOTDB_CONFIG.getPipeConsensusPipelineSize()); + IOTDB_CONFIG.getIotConsensusV2PipelineSize()); } if (transferBuffer.isEmpty()) { LOGGER.info( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 6b17faf65240..588545f36514 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -92,7 +92,7 @@ public class PipeConsensusReceiver { private static final long PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS = (long) IOTDB_CONFIG.getConnectionTimeoutInMS() / 6 - * IOTDB_CONFIG.getPipeConsensusPipelineSize(); + * IOTDB_CONFIG.getIotConsensusV2PipelineSize(); private static final long CLOSE_TSFILE_WRITER_MAX_WAIT_TIME_IN_MS = 5000; private static final long RETRY_WAIT_TIME = 500; private final RequestExecutor requestExecutor; @@ -1041,7 +1041,7 @@ private static class PipeConsensusTsFileWriterPool { public PipeConsensusTsFileWriterPool( ConsensusPipeName consensusPipeName, String receiverBasePath) throws IOException { this.consensusPipeName = consensusPipeName; - for (int i = 0; i < IOTDB_CONFIG.getPipeConsensusPipelineSize(); i++) { + for (int i = 0; i < IOTDB_CONFIG.getIotConsensusV2PipelineSize(); i++) { PipeConsensusTsFileWriter tsFileWriter = new PipeConsensusTsFileWriter(i, consensusPipeName); tsFileWriter.setFilePath(receiverBasePath); @@ -1359,7 +1359,7 @@ private TPipeConsensusTransferResp onRequest( return null; } - if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getPipeConsensusPipelineSize() + if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getIotConsensusV2PipelineSize() && !reqExecutionOrderBuffer.first().equals(requestMeta)) { // If reqBuffer is full and current thread do not hold the reqBuffer's peek, this req // is not supposed to be processed. So current thread should notify the corresponding @@ -1389,7 +1389,7 @@ private TPipeConsensusTransferResp onRequest( return resp; } - if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getPipeConsensusPipelineSize() + if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getIotConsensusV2PipelineSize() && reqExecutionOrderBuffer.first().equals(requestMeta)) { long startApplyNanos = System.nanoTime(); metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos); @@ -1417,7 +1417,7 @@ private TPipeConsensusTransferResp onRequest( // not send any more events at this time, that is, the sender has sent all events. At // this point we apply the event at reqBuffer's peek if (timeout - && reqExecutionOrderBuffer.size() < IOTDB_CONFIG.getPipeConsensusPipelineSize() + && reqExecutionOrderBuffer.size() < IOTDB_CONFIG.getIotConsensusV2PipelineSize() && reqExecutionOrderBuffer.first() != null && reqExecutionOrderBuffer.first().equals(requestMeta)) { long startApplyNanos = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 12126cc65d97..be65e7e1aedf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -398,7 +398,6 @@ private void pullAndCheckSystemConfigurations() throws StartupException { .checkConsensusProtocolExists(TConsensusGroupType.DataRegion)) { config.setDataRegionConsensusProtocolClass( configurationResp.globalConfig.getDataRegionConsensusProtocolClass()); - config.setIotConsensusV2Mode(configurationResp.globalConfig.getIotConsensusV2Mode()); } if (!IoTDBStartCheck.getInstance() diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift index f62b9ec33a2d..05e995af8f9f 100644 --- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift +++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift @@ -42,7 +42,6 @@ struct TGlobalConfig { 10: optional i32 tagAttributeTotalSize 11: optional bool isEnterprise 12: optional i64 timePartitionOrigin - 13: optional string iotConsensusV2Mode } struct TRatisConfig { From f198fc9cc70bb90cc491b8c11dc47379cdbf6ff7 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Fri, 20 Sep 2024 16:56:18 +0800 Subject: [PATCH 8/9] fix review --- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 19 +++++++++---------- .../apache/iotdb/db/conf/IoTDBDescriptor.java | 10 ++++++---- .../receiver/PipeDataNodeReceiverAgent.java | 2 +- .../pipeconsensus/PipeConsensusReceiver.java | 3 ++- .../conf/iotdb-system.properties.template | 8 ++++---- .../resources/sbin/destroy-datanode.bat | 12 ++++++------ .../resources/sbin/destroy-datanode.sh | 4 ++-- 7 files changed, 30 insertions(+), 28 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index c3f0e4380e89..d8d337d2502d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1129,6 +1129,7 @@ public class IoTDBConfig { // IoTConsensusV2 Config private int iotConsensusV2PipelineSize = 5; private String iotConsensusV2Mode = ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE; + private String[] iotConsensusV2ReceiverFileDirs = new String[0]; /** Load related */ private double maxAllocateMemoryRatioForLoad = 0.8; @@ -1177,8 +1178,6 @@ public class IoTDBConfig { /** initialized as empty, updated based on the latest `systemDir` during querying */ private String[] pipeReceiverFileDirs = new String[0]; - private String[] pipeConsensusReceiverFileDirs = new String[0]; - /** Resource control */ private boolean quotaEnable = false; @@ -1352,8 +1351,8 @@ private void formulateFolders() { for (int i = 0; i < pipeReceiverFileDirs.length; i++) { pipeReceiverFileDirs[i] = addDataHomeDir(pipeReceiverFileDirs[i]); } - for (int i = 0; i < pipeConsensusReceiverFileDirs.length; i++) { - pipeConsensusReceiverFileDirs[i] = addDataHomeDir(pipeConsensusReceiverFileDirs[i]); + for (int i = 0; i < iotConsensusV2ReceiverFileDirs.length; i++) { + iotConsensusV2ReceiverFileDirs[i] = addDataHomeDir(iotConsensusV2ReceiverFileDirs[i]); } mqttDir = addDataHomeDir(mqttDir); extPipeDir = addDataHomeDir(extPipeDir); @@ -4073,13 +4072,13 @@ public String[] getPipeReceiverFileDirs() { : this.pipeReceiverFileDirs; } - public void setPipeConsensusReceiverFileDirs(String[] pipeConsensusReceiverFileDirs) { - this.pipeConsensusReceiverFileDirs = pipeConsensusReceiverFileDirs; + public void setIotConsensusV2ReceiverFileDirs(String[] iotConsensusV2ReceiverFileDirs) { + this.iotConsensusV2ReceiverFileDirs = iotConsensusV2ReceiverFileDirs; } - public String[] getPipeConsensusReceiverFileDirs() { - return (Objects.isNull(this.pipeConsensusReceiverFileDirs) - || this.pipeConsensusReceiverFileDirs.length == 0) + public String[] getIotConsensusV2ReceiverFileDirs() { + return (Objects.isNull(this.iotConsensusV2ReceiverFileDirs) + || this.iotConsensusV2ReceiverFileDirs.length == 0) ? new String[] { systemDir + File.separator @@ -4089,7 +4088,7 @@ public String[] getPipeConsensusReceiverFileDirs() { + File.separator + "receiver" } - : this.pipeConsensusReceiverFileDirs; + : this.iotConsensusV2ReceiverFileDirs; } public boolean isQuotaEnable() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index e294143f461d..e45c14ff243b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -1137,7 +1137,9 @@ private void loadIoTConsensusV2Props(Properties properties) throws IOException { conf.setIotConsensusV2PipelineSize(5); } conf.setIotConsensusV2Mode( - properties.getProperty("iot_consensus_v2_mode", conf.getIotConsensusV2Mode()).trim()); + properties.getProperty( + "iot_consensus_v2_mode", + ConfigurationFileUtils.getConfigurationDefaultValue("iot_consensus_v2_mode"))); } private void loadAuthorCache(Properties properties) { @@ -2448,12 +2450,12 @@ private void loadPipeProps(Properties properties) { .filter(dir -> !dir.isEmpty()) .toArray(String[]::new)); - conf.setPipeConsensusReceiverFileDirs( + conf.setIotConsensusV2ReceiverFileDirs( Arrays.stream( properties .getProperty( - "pipe_consensus_receiver_file_dirs", - String.join(",", conf.getPipeConsensusReceiverFileDirs())) + "iot_consensus_v2_receiver_file_dirs", + String.join(",", conf.getIotConsensusV2ReceiverFileDirs())) .trim() .split(",")) .filter(dir -> !dir.isEmpty()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java index 7886ab22569c..a626a2eecb1e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java @@ -68,7 +68,7 @@ public void cleanPipeReceiverDirs() { .forEach(IoTDBReceiverAgent::cleanPipeReceiverDir); // consensus String[] pipeConsensusReceiverFileDirs = - IoTDBDescriptor.getInstance().getConfig().getPipeConsensusReceiverFileDirs(); + IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2ReceiverFileDirs(); Arrays.stream(pipeConsensusReceiverFileDirs) .map(File::new) .forEach(IoTDBReceiverAgent::cleanPipeReceiverDir); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 588545f36514..c7f41d63800f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -119,7 +119,8 @@ public PipeConsensusReceiver( // Each pipeConsensusReceiver has its own base directories. for example, a default dir path is // data/datanode/system/pipe/consensus/receiver/__consensus.{consensusGroupId}_{leaderDataNodeId}_{followerDataNodeId} receiverBaseDirsName = - Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getPipeConsensusReceiverFileDirs()); + Arrays.asList( + IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2ReceiverFileDirs()); try { this.folderManager = diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 39e1687ecf92..c5d5a299cc54 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -319,19 +319,19 @@ sort_tmp_dir=data/datanode/tmp # If its prefix is "/", then the path is absolute. Otherwise, it is relative. dn_pipe_receiver_file_dirs=data/datanode/system/pipe/receiver -# pipe_consensus_receiver_file_dirs +# iot_consensus_v2_receiver_file_dirs # If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/${dn_system_dir}/pipe/consensus/receiver). # If it is absolute, system will save the data in the exact location it points to. # If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder. # If there are more than one directory, please separate them by commas ",". -# Note: If pipe_consensus_receiver_file_dirs is assigned an empty string(i.e.,zero-size), it will be handled as a relative path. +# Note: If iot_consensus_v2_receiver_file_dirs is assigned an empty string(i.e.,zero-size), it will be handled as a relative path. # effectiveMode: restart # For windows platform # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative. -# pipe_consensus_receiver_file_dirs=data\\datanode\\system\\pipe\\consensus\\receiver +# iot_consensus_v2_receiver_file_dirs=data\\datanode\\system\\pipe\\consensus\\receiver # For Linux platform # If its prefix is "/", then the path is absolute. Otherwise, it is relative. -pipe_consensus_receiver_file_dirs=data/datanode/system/pipe/consensus/receiver +iot_consensus_v2_receiver_file_dirs=data/datanode/system/pipe/consensus/receiver #################### ### Metric Configuration diff --git a/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.bat b/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.bat index 46b6da01d97a..b64cd00dc499 100644 --- a/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.bat +++ b/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.bat @@ -182,16 +182,16 @@ for %%i in (%pipe_receiver_file_dirs%) do ( ) ) -for /f "eol=# tokens=2 delims==" %%i in ('findstr /i "^pipe_consensus_receiver_file_dirs" +for /f "eol=# tokens=2 delims==" %%i in ('findstr /i "^iot_consensus_v2_receiver_file_dirs" %IOTDB_DATANODE_CONFIG%') do ( - set pipe_consensus_receiver_file_dirs=%%i + set iot_consensus_v2_receiver_file_dirs=%%i ) -if "%pipe_consensus_receiver_file_dirs%"=="" ( - set "pipe_consensus_receiver_file_dirs=data\\datanode\\system\\pipe\\consensus\\receiver" +if "%iot_consensus_v2_receiver_file_dirs%"=="" ( + set "iot_consensus_v2_receiver_file_dirs=data\\datanode\\system\\pipe\\consensus\\receiver" ) -set "pipe_consensus_receiver_file_dirs=!pipe_consensus_receiver_file_dirs:%delimiter%= !" -for %%i in (%pipe_consensus_receiver_file_dirs%) do ( +set "iot_consensus_v2_receiver_file_dirs=!iot_consensus_v2_receiver_file_dirs:%delimiter%= !" +for %%i in (%iot_consensus_v2_receiver_file_dirs%) do ( set "var=%%i" if "!var:~0,2!"=="\\" ( rmdir /s /q "%%i" 2>nul diff --git a/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.sh b/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.sh index 39c0a1e612b5..1e8c74d62d7d 100644 --- a/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.sh +++ b/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.sh @@ -46,7 +46,7 @@ dn_wal_dirs=$(echo $(grep '^dn_wal_dirs=' ${IOTDB_DATANODE_CONFIG} || echo "data dn_tracing_dir=$(echo $(grep '^dn_tracing_dir=' ${IOTDB_DATANODE_CONFIG} || echo "datanode/tracing") | sed 's/.*=//') dn_sync_dir=$(echo $(grep '^dn_sync_dir=' ${IOTDB_DATANODE_CONFIG} || echo "data/datanode/sync") | sed 's/.*=//') pipe_receiver_file_dirs=$(echo $(grep '^pipe_receiver_file_dirs=' ${IOTDB_DATANODE_CONFIG} || echo "data/datanode/system/pipe/receiver") | sed 's/.*=//') -pipe_consensus_receiver_file_dirs=$(echo $(grep '^pipe_consensus_receiver_file_dirs=' ${IOTDB_DATANODE_CONFIG} || echo "data/datanode/system/pipe/consensus/receiver") | sed 's/.*=//') +iot_consensus_v2_receiver_file_dirs=$(echo $(grep '^iot_consensus_v2_receiver_file_dirs=' ${IOTDB_DATANODE_CONFIG} || echo "data/datanode/system/pipe/consensus/receiver") | sed 's/.*=//') sort_tmp_dir=$(echo $(grep '^sort_tmp_dir=' ${IOTDB_DATANODE_CONFIG} || echo "data/datanode/tmp") | sed 's/.*=//') function clearPath { @@ -71,7 +71,7 @@ clearPath $dn_wal_dirs clearPath $dn_tracing_dir clearPath $dn_sync_dir clearPath $pipe_receiver_file_dirs -clearPath $pipe_consensus_receiver_file_dirs +clearPath $iot_consensus_v2_receiver_file_dirs clearPath $sort_tmp_dir echo "DataNode clean done ..." \ No newline at end of file From 88ca1b51d8bedf9c574c88ae5976a34c7be3ef75 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Sat, 21 Sep 2024 15:35:36 +0800 Subject: [PATCH 9/9] fix review --- .../assembly/resources/conf/iotdb-system.properties.template | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index c5d5a299cc54..7b268455b5ce 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -154,7 +154,7 @@ data_replication_factor=1 # 1. org.apache.iotdb.consensus.simple.SimpleConsensus (The data_replication_factor can only be set to 1) # 2. org.apache.iotdb.consensus.iot.IoTConsensus # 3. org.apache.iotdb.consensus.ratis.RatisConsensus -# 4. org.apache.iotdb.consensus.ratis.IoTConsensusV2 +# 4. org.apache.iotdb.consensus.iot.IoTConsensusV2 # effectiveMode: first_start # Datatype: string data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus