diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java index b3802f3ea8d8..3c222c4bfe34 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java @@ -217,9 +217,7 @@ public class ClusterConstant { public static final String SIMPLE_CONSENSUS_STR = "Simple"; public static final String RATIS_CONSENSUS_STR = "Ratis"; public static final String IOT_CONSENSUS_STR = "IoT"; - public static final String PIPE_CONSENSUS_STR = "Pipe"; - public static final String STREAM_CONSENSUS_STR = "Stream"; - public static final String BATCH_CONSENSUS_STR = "Batch"; + public static final String IOT_CONSENSUS_V2_STR = "IoTV2"; public static final String JAVA_CMD = System.getProperty("java.home") diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java index f3c9527e5952..f6156a3331c1 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java @@ -30,14 +30,11 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import static org.apache.iotdb.consensus.ConsensusFactory.FAST_IOT_CONSENSUS; import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS; import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2; import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS; -import static org.apache.iotdb.consensus.ConsensusFactory.REAL_PIPE_CONSENSUS; import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS; import static org.apache.iotdb.db.utils.DateTimeUtils.convertLongToDate; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.BATCH_CONSENSUS_STR; import static org.apache.iotdb.it.env.cluster.ClusterConstant.CLUSTER_CONFIGURATIONS; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_CONFIG_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_DATA_NODE_NUM; @@ -47,17 +44,16 @@ import static org.apache.iotdb.it.env.cluster.ClusterConstant.HIGH_PERFORMANCE_MODE_CONFIG_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.HIGH_PERFORMANCE_MODE_DATA_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.IOT_CONSENSUS_STR; +import static org.apache.iotdb.it.env.cluster.ClusterConstant.IOT_CONSENSUS_V2_STR; import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE; import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_CONFIG_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_DATA_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.LOCK_FILE_PATH; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_CONSENSUS_STR; import static org.apache.iotdb.it.env.cluster.ClusterConstant.RATIS_CONSENSUS_STR; import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE; import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE_CONFIG_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE_DATA_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.SIMPLE_CONSENSUS_STR; -import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAM_CONSENSUS_STR; import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE; import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE_CONFIG_NODE_NUM; import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE_DATA_NODE_NUM; @@ -222,12 +218,8 @@ public static String fromConsensusFullNameToAbbr(String consensus) { return RATIS_CONSENSUS_STR; case IOT_CONSENSUS: return IOT_CONSENSUS_STR; - case REAL_PIPE_CONSENSUS: - return PIPE_CONSENSUS_STR; case IOT_CONSENSUS_V2: - return STREAM_CONSENSUS_STR; - case FAST_IOT_CONSENSUS: - return BATCH_CONSENSUS_STR; + return IOT_CONSENSUS_V2_STR; default: throw new IllegalArgumentException("Unknown consensus type: " + consensus); } @@ -241,12 +233,8 @@ public static String fromConsensusAbbrToFullName(String consensus) { return RATIS_CONSENSUS; case IOT_CONSENSUS_STR: return IOT_CONSENSUS; - case PIPE_CONSENSUS_STR: - return REAL_PIPE_CONSENSUS; - case STREAM_CONSENSUS_STR: + case IOT_CONSENSUS_V2_STR: return IOT_CONSENSUS_V2; - case BATCH_CONSENSUS_STR: - return FAST_IOT_CONSENSUS; default: throw new IllegalArgumentException("Unknown consensus type: " + consensus); } diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java index ced2964ec5cb..42fba66eb3bf 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeStartupCheck.java @@ -142,10 +142,9 @@ private void checkGlobalConfig() throws ConfigurationException { "the SchemaRegion doesn't support org.apache.iotdb.consensus.iot.IoTConsensus"); } - // When the schemaengine region consensus protocol is set to PipeConsensus, + // When the schemaengine region consensus protocol is set to IoTConsensusV2, // we should report an error - if (CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS) - || CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) { + if (CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) { throw new ConfigurationException( "schema_region_consensus_protocol_class", String.valueOf(CONF.getSchemaRegionConsensusProtocolClass()), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index 94e1e5c0eb66..13d58397c9f1 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java @@ -80,8 +80,6 @@ public class RouteBalancer implements IClusterStatusSubscriber { && ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS)) || (CONF.isEnableAutoLeaderBalanceForIoTConsensus() && ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS)) - || (CONF.isEnableAutoLeaderBalanceForIoTConsensus() - && ConsensusFactory.FAST_IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS)) || (CONF.isEnableAutoLeaderBalanceForIoTConsensus() && ConsensusFactory.IOT_CONSENSUS_V2.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS)) // The simple consensus protocol will always automatically designate itself as the leader @@ -185,7 +183,6 @@ private void balanceRegionLeader( regionGroupId, newLeaderId); switch (consensusProtocolClass) { - case ConsensusFactory.FAST_IOT_CONSENSUS: case ConsensusFactory.IOT_CONSENSUS_V2: case ConsensusFactory.IOT_CONSENSUS: case ConsensusFactory.SIMPLE_CONSENSUS: diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java index 8cab79e315e4..6196b37f1236 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java @@ -21,7 +21,6 @@ import org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer; import org.apache.iotdb.consensus.config.ConsensusConfig; -import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode; import org.apache.iotdb.consensus.pipe.metric.PipeConsensusSyncLagManager; import org.slf4j.Logger; @@ -29,8 +28,6 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; public class ConsensusFactory { @@ -41,16 +38,9 @@ public class ConsensusFactory { public static final String RATIS_CONSENSUS = "org.apache.iotdb.consensus.ratis.RatisConsensus"; public static final String IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTConsensus"; public static final String REAL_PIPE_CONSENSUS = "org.apache.iotdb.consensus.pipe.PipeConsensus"; - // Corresponding to streamConsensus public static final String IOT_CONSENSUS_V2 = "org.apache.iotdb.consensus.iot.IoTConsensusV2"; - // Corresponding to batchConsensus - public static final String FAST_IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.FastIoTConsensus"; - private static final Map PIPE_CONSENSUS_MODE_MAP = new HashMap<>(); - - static { - PIPE_CONSENSUS_MODE_MAP.put(IOT_CONSENSUS_V2, ReplicateMode.STREAM); - PIPE_CONSENSUS_MODE_MAP.put(FAST_IOT_CONSENSUS, ReplicateMode.BATCH); - } + public static final String IOT_CONSENSUS_V2_BATCH_MODE = "batch"; + public static final String IOT_CONSENSUS_V2_STREAM_MODE = "stream"; private static final Logger logger = LoggerFactory.getLogger(ConsensusFactory.class); @@ -61,13 +51,12 @@ private ConsensusFactory() { public static Optional getConsensusImpl( String className, ConsensusConfig config, IStateMachine.Registry registry) { try { - // special judge for PipeConsensus - if (className.equals(IOT_CONSENSUS_V2) || className.equals(FAST_IOT_CONSENSUS)) { - config.getPipeConsensusConfig().setReplicateMode(PIPE_CONSENSUS_MODE_MAP.get(className)); + // special judge for IoTConsensusV2 + if (className.equals(IOT_CONSENSUS_V2)) { className = REAL_PIPE_CONSENSUS; - // initialize pipeConsensus' thrift component + // initialize iotConsensusV2's thrift component PipeConsensusClientMgrContainer.build(); - // initialize pipeConsensus's metric component + // initialize iotConsensusV2's metric component PipeConsensusSyncLagManager.build(); } Class executor = Class.forName(className); diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java index bec2c7d8e1e0..9295fa793422 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/config/PipeConsensusConfig.java @@ -32,14 +32,11 @@ public class PipeConsensusConfig { private final RPC rpc; private final Pipe pipe; // Use stream mode by default. User can configure it - private ReplicateMode replicateMode = ReplicateMode.STREAM; + private final ReplicateMode replicateMode; - public PipeConsensusConfig(RPC rpc, Pipe pipe) { + public PipeConsensusConfig(RPC rpc, Pipe pipe, ReplicateMode replicateMode) { this.rpc = rpc; this.pipe = pipe; - } - - public void setReplicateMode(ReplicateMode replicateMode) { this.replicateMode = replicateMode; } @@ -62,6 +59,7 @@ public static Builder newBuilder() { public static class Builder { private RPC rpc; private Pipe pipe; + private ReplicateMode replicateMode; public Builder setPipe(Pipe pipe) { this.pipe = pipe; @@ -73,8 +71,13 @@ public Builder setRPC(RPC rpc) { return this; } + public Builder setReplicateMode(ReplicateMode replicateMode) { + this.replicateMode = replicateMode; + return this; + } + public PipeConsensusConfig build() { - return new PipeConsensusConfig(rpc, pipe); + return new PipeConsensusConfig(rpc, pipe, replicateMode); } } @@ -354,5 +357,15 @@ public enum ReplicateMode { public String getValue() { return value; } + + public static ReplicateMode fromValue(String value) { + if (value.equalsIgnoreCase(STREAM.getValue())) { + return STREAM; + } else if (value.equalsIgnoreCase(BATCH.getValue())) { + return BATCH; + } + // return batch by default + return BATCH; + } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 4c4b0e824d08..d8d337d2502d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1126,8 +1126,10 @@ public class IoTDBConfig { private double maxMemoryRatioForQueue = 0.6; private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L; - // PipeConsensus Config - private int pipeConsensusPipelineSize = 5; + // IoTConsensusV2 Config + private int iotConsensusV2PipelineSize = 5; + private String iotConsensusV2Mode = ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE; + private String[] iotConsensusV2ReceiverFileDirs = new String[0]; /** Load related */ private double maxAllocateMemoryRatioForLoad = 0.8; @@ -1176,8 +1178,6 @@ public class IoTDBConfig { /** initialized as empty, updated based on the latest `systemDir` during querying */ private String[] pipeReceiverFileDirs = new String[0]; - private String[] pipeConsensusReceiverFileDirs = new String[0]; - /** Resource control */ private boolean quotaEnable = false; @@ -1221,12 +1221,12 @@ public void setRegionMigrationSpeedLimitBytesPerSecond( this.regionMigrationSpeedLimitBytesPerSecond = regionMigrationSpeedLimitBytesPerSecond; } - public int getPipeConsensusPipelineSize() { - return pipeConsensusPipelineSize; + public int getIotConsensusV2PipelineSize() { + return iotConsensusV2PipelineSize; } - public void setPipeConsensusPipelineSize(int pipeConsensusPipelineSize) { - this.pipeConsensusPipelineSize = pipeConsensusPipelineSize; + public void setIotConsensusV2PipelineSize(int iotConsensusV2PipelineSize) { + this.iotConsensusV2PipelineSize = iotConsensusV2PipelineSize; } public void setMaxSizePerBatch(int maxSizePerBatch) { @@ -1351,8 +1351,8 @@ private void formulateFolders() { for (int i = 0; i < pipeReceiverFileDirs.length; i++) { pipeReceiverFileDirs[i] = addDataHomeDir(pipeReceiverFileDirs[i]); } - for (int i = 0; i < pipeConsensusReceiverFileDirs.length; i++) { - pipeConsensusReceiverFileDirs[i] = addDataHomeDir(pipeConsensusReceiverFileDirs[i]); + for (int i = 0; i < iotConsensusV2ReceiverFileDirs.length; i++) { + iotConsensusV2ReceiverFileDirs[i] = addDataHomeDir(iotConsensusV2ReceiverFileDirs[i]); } mqttDir = addDataHomeDir(mqttDir); extPipeDir = addDataHomeDir(extPipeDir); @@ -3188,6 +3188,14 @@ public void setDataRegionConsensusProtocolClass(String dataRegionConsensusProtoc this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass; } + public String getIotConsensusV2Mode() { + return iotConsensusV2Mode; + } + + public void setIotConsensusV2Mode(String iotConsensusV2Mode) { + this.iotConsensusV2Mode = iotConsensusV2Mode; + } + public String getSchemaRegionConsensusProtocolClass() { return schemaRegionConsensusProtocolClass; } @@ -4064,13 +4072,13 @@ public String[] getPipeReceiverFileDirs() { : this.pipeReceiverFileDirs; } - public void setPipeConsensusReceiverFileDirs(String[] pipeConsensusReceiverFileDirs) { - this.pipeConsensusReceiverFileDirs = pipeConsensusReceiverFileDirs; + public void setIotConsensusV2ReceiverFileDirs(String[] iotConsensusV2ReceiverFileDirs) { + this.iotConsensusV2ReceiverFileDirs = iotConsensusV2ReceiverFileDirs; } - public String[] getPipeConsensusReceiverFileDirs() { - return (Objects.isNull(this.pipeConsensusReceiverFileDirs) - || this.pipeConsensusReceiverFileDirs.length == 0) + public String[] getIotConsensusV2ReceiverFileDirs() { + return (Objects.isNull(this.iotConsensusV2ReceiverFileDirs) + || this.iotConsensusV2ReceiverFileDirs.length == 0) ? new String[] { systemDir + File.separator @@ -4080,7 +4088,7 @@ public String[] getPipeConsensusReceiverFileDirs() { + File.separator + "receiver" } - : this.pipeConsensusReceiverFileDirs; + : this.iotConsensusV2ReceiverFileDirs; } public boolean isQuotaEnable() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 58de8530dc4e..e45c14ff243b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -1074,12 +1074,12 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO "datanode_schema_cache_eviction_policy", conf.getDataNodeSchemaCacheEvictionPolicy())); loadIoTConsensusProps(properties); - loadPipeConsensusProps(properties); + loadIoTConsensusV2Props(properties); } private void reloadConsensusProps(Properties properties) throws IOException { loadIoTConsensusProps(properties); - loadPipeConsensusProps(properties); + loadIoTConsensusV2Props(properties); DataRegionConsensusImpl.reloadConsensusConfig(); } @@ -1126,16 +1126,20 @@ private void loadIoTConsensusProps(Properties properties) throws IOException { .trim())); } - private void loadPipeConsensusProps(Properties properties) throws IOException { - conf.setPipeConsensusPipelineSize( + private void loadIoTConsensusV2Props(Properties properties) throws IOException { + conf.setIotConsensusV2PipelineSize( Integer.parseInt( properties.getProperty( - "fast_iot_consensus_pipeline_size", + "iot_consensus_v2_pipeline_size", ConfigurationFileUtils.getConfigurationDefaultValue( - "fast_iot_consensus_pipeline_size")))); - if (conf.getPipeConsensusPipelineSize() <= 0) { - conf.setPipeConsensusPipelineSize(5); + "iot_consensus_v2_pipeline_size")))); + if (conf.getIotConsensusV2PipelineSize() <= 0) { + conf.setIotConsensusV2PipelineSize(5); } + conf.setIotConsensusV2Mode( + properties.getProperty( + "iot_consensus_v2_mode", + ConfigurationFileUtils.getConfigurationDefaultValue("iot_consensus_v2_mode"))); } private void loadAuthorCache(Properties properties) { @@ -2446,12 +2450,12 @@ private void loadPipeProps(Properties properties) { .filter(dir -> !dir.isEmpty()) .toArray(String[]::new)); - conf.setPipeConsensusReceiverFileDirs( + conf.setIotConsensusV2ReceiverFileDirs( Arrays.stream( properties .getProperty( - "pipe_consensus_receiver_file_dirs", - String.join(",", conf.getPipeConsensusReceiverFileDirs())) + "iot_consensus_v2_receiver_file_dirs", + String.join(",", conf.getIotConsensusV2ReceiverFileDirs())) .trim() .split(",")) .filter(dir -> !dir.isEmpty()) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java index 515a9f54ddca..06d95a75cee1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/DataRegionConsensusImpl.java @@ -30,6 +30,7 @@ import org.apache.iotdb.consensus.config.IoTConsensusConfig; import org.apache.iotdb.consensus.config.IoTConsensusConfig.RPC; import org.apache.iotdb.consensus.config.PipeConsensusConfig; +import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode; import org.apache.iotdb.consensus.config.RatisConfig; import org.apache.iotdb.consensus.config.RatisConfig.Snapshot; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -168,8 +169,9 @@ private static ConsensusConfig buildConsensusConfig() { () -> PipeDataNodeAgent.task().getAllConsensusPipe()) .setConsensusPipeReceiver(PipeDataNodeAgent.receiver().pipeConsensus()) .setProgressIndexManager(new ProgressIndexDataNodeManager()) - .setConsensusPipeGuardJobIntervalInSeconds(300) // TODO: move to config + .setConsensusPipeGuardJobIntervalInSeconds(300) .build()) + .setReplicateMode(ReplicateMode.fromValue(CONF.getIotConsensusV2Mode())) .build()) .setRatisConfig( RatisConfig.newBuilder() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java index 7886ab22569c..a626a2eecb1e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/receiver/PipeDataNodeReceiverAgent.java @@ -68,7 +68,7 @@ public void cleanPipeReceiverDirs() { .forEach(IoTDBReceiverAgent::cleanPipeReceiverDir); // consensus String[] pipeConsensusReceiverFileDirs = - IoTDBDescriptor.getInstance().getConfig().getPipeConsensusReceiverFileDirs(); + IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2ReceiverFileDirs(); Arrays.stream(pipeConsensusReceiverFileDirs) .map(File::new) .forEach(IoTDBReceiverAgent::cleanPipeReceiverDir); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java index e4489799652c..870c383cd7a1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java @@ -92,7 +92,7 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse private final BlockingQueue retryEventQueue = new LinkedBlockingQueue<>(); // We use enrichedEvent here to make use of EnrichedEvent.equalsInPipeConsensus private final BlockingQueue transferBuffer = - new LinkedBlockingDeque<>(IOTDB_CONFIG.getPipeConsensusPipelineSize()); + new LinkedBlockingDeque<>(IOTDB_CONFIG.getIotConsensusV2PipelineSize()); private final AtomicBoolean isClosed = new AtomicBoolean(false); private final int thisDataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); private PipeConsensusConnectorMetrics pipeConsensusConnectorMetrics; @@ -206,7 +206,7 @@ public synchronized void removeEventFromBuffer(EnrichedEvent event) { consensusGroupId, event, transferBuffer.size(), - IOTDB_CONFIG.getPipeConsensusPipelineSize()); + IOTDB_CONFIG.getIotConsensusV2PipelineSize()); } if (transferBuffer.isEmpty()) { LOGGER.info( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java index 6b17faf65240..c7f41d63800f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java @@ -92,7 +92,7 @@ public class PipeConsensusReceiver { private static final long PIPE_CONSENSUS_RECEIVER_MAX_WAITING_TIME_IN_MS = (long) IOTDB_CONFIG.getConnectionTimeoutInMS() / 6 - * IOTDB_CONFIG.getPipeConsensusPipelineSize(); + * IOTDB_CONFIG.getIotConsensusV2PipelineSize(); private static final long CLOSE_TSFILE_WRITER_MAX_WAIT_TIME_IN_MS = 5000; private static final long RETRY_WAIT_TIME = 500; private final RequestExecutor requestExecutor; @@ -119,7 +119,8 @@ public PipeConsensusReceiver( // Each pipeConsensusReceiver has its own base directories. for example, a default dir path is // data/datanode/system/pipe/consensus/receiver/__consensus.{consensusGroupId}_{leaderDataNodeId}_{followerDataNodeId} receiverBaseDirsName = - Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getPipeConsensusReceiverFileDirs()); + Arrays.asList( + IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2ReceiverFileDirs()); try { this.folderManager = @@ -1041,7 +1042,7 @@ private static class PipeConsensusTsFileWriterPool { public PipeConsensusTsFileWriterPool( ConsensusPipeName consensusPipeName, String receiverBasePath) throws IOException { this.consensusPipeName = consensusPipeName; - for (int i = 0; i < IOTDB_CONFIG.getPipeConsensusPipelineSize(); i++) { + for (int i = 0; i < IOTDB_CONFIG.getIotConsensusV2PipelineSize(); i++) { PipeConsensusTsFileWriter tsFileWriter = new PipeConsensusTsFileWriter(i, consensusPipeName); tsFileWriter.setFilePath(receiverBasePath); @@ -1359,7 +1360,7 @@ private TPipeConsensusTransferResp onRequest( return null; } - if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getPipeConsensusPipelineSize() + if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getIotConsensusV2PipelineSize() && !reqExecutionOrderBuffer.first().equals(requestMeta)) { // If reqBuffer is full and current thread do not hold the reqBuffer's peek, this req // is not supposed to be processed. So current thread should notify the corresponding @@ -1389,7 +1390,7 @@ private TPipeConsensusTransferResp onRequest( return resp; } - if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getPipeConsensusPipelineSize() + if (reqExecutionOrderBuffer.size() >= IOTDB_CONFIG.getIotConsensusV2PipelineSize() && reqExecutionOrderBuffer.first().equals(requestMeta)) { long startApplyNanos = System.nanoTime(); metric.recordDispatchWaitingTimer(startApplyNanos - startDispatchNanos); @@ -1417,7 +1418,7 @@ private TPipeConsensusTransferResp onRequest( // not send any more events at this time, that is, the sender has sent all events. At // this point we apply the event at reqBuffer's peek if (timeout - && reqExecutionOrderBuffer.size() < IOTDB_CONFIG.getPipeConsensusPipelineSize() + && reqExecutionOrderBuffer.size() < IOTDB_CONFIG.getIotConsensusV2PipelineSize() && reqExecutionOrderBuffer.first() != null && reqExecutionOrderBuffer.first().equals(requestMeta)) { long startApplyNanos = System.nanoTime(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index f18de9ef2b9d..5e49a1eeb38b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -210,7 +210,6 @@ public boolean isGeneratedByRemoteConsensusLeader() { switch (config.getDataRegionConsensusProtocolClass()) { case ConsensusFactory.IOT_CONSENSUS: case ConsensusFactory.IOT_CONSENSUS_V2: - case ConsensusFactory.FAST_IOT_CONSENSUS: case ConsensusFactory.RATIS_CONSENSUS: return isGeneratedByRemoteConsensusLeader; case ConsensusFactory.SIMPLE_CONSENSUS: diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 44582e30e844..be65e7e1aedf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -839,8 +839,7 @@ public TDataNodeConfiguration generateDataNodeConfiguration() { } private boolean isUsingPipeConsensus() { - return config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2) - || config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS); + return config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2); } private void registerUdfServices() throws StartupException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java index 11b2a1c6c632..a4ea80984688 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java @@ -757,9 +757,6 @@ public void deleteDataRegion(DataRegionId regionId) { region.syncDeleteDataFiles(); region.deleteFolder(systemDir); if (CONFIG.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) - || CONFIG - .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.FAST_IOT_CONSENSUS) || CONFIG .getDataRegionConsensusProtocolClass() .equals(ConsensusFactory.IOT_CONSENSUS_V2)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 09b30bf1bb6d..17ac5a299980 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3871,7 +3871,6 @@ public void markDeleted() { private void acquireDirectBufferMemory() throws DataRegionException { long acquireDirectBufferMemCost = 0; if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) - || config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS) || config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) { acquireDirectBufferMemCost = config.getWalBufferSize(); } else if (config diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java index bff78d50e5bf..e952dec11791 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java @@ -70,10 +70,7 @@ public class WALManager implements IService { private WALManager() { if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) - || config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2) - || config - .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.FAST_IOT_CONSENSUS)) { + || config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) { walNodesManager = new FirstCreateStrategy(); } else if (config.getMaxWalNodesNum() == 0) { walNodesManager = new ElasticStrategy(); @@ -87,9 +84,6 @@ public static String getApplicantUniqueId(String storageGroupName, boolean seque || config .getDataRegionConsensusProtocolClass() .equals(ConsensusFactory.IOT_CONSENSUS_V2) - || config - .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.FAST_IOT_CONSENSUS) ? storageGroupName : storageGroupName + IoTDBConstant.FILE_NAME_SEPARATOR @@ -112,10 +106,7 @@ public void registerWALNode( || (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) && !config .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.IOT_CONSENSUS_V2) - && !config - .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.FAST_IOT_CONSENSUS))) { + .equals(ConsensusFactory.IOT_CONSENSUS_V2))) { return; } @@ -130,10 +121,7 @@ public void deleteWALNode(String applicantUniqueId) { || (!config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS) && !config .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.IOT_CONSENSUS_V2) - && !config - .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.FAST_IOT_CONSENSUS))) { + .equals(ConsensusFactory.IOT_CONSENSUS_V2))) { return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java index 18bb293792af..bf67a39cbbc1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/WALNodeRecoverTask.java @@ -134,11 +134,8 @@ public void run() { logDirectory); } - // PipeConsensus will not only delete WAL node folder, but also register WAL node. - if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS) - || config - .getDataRegionConsensusProtocolClass() - .equals(ConsensusFactory.IOT_CONSENSUS_V2)) { + // IoTConsensusV2 will not only delete WAL node folder, but also register WAL node. + if (config.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) { // register wal node WALManager.getInstance() .registerWALNode( diff --git a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template index 945199534fc9..7b268455b5ce 100644 --- a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template +++ b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template @@ -154,6 +154,7 @@ data_replication_factor=1 # 1. org.apache.iotdb.consensus.simple.SimpleConsensus (The data_replication_factor can only be set to 1) # 2. org.apache.iotdb.consensus.iot.IoTConsensus # 3. org.apache.iotdb.consensus.ratis.RatisConsensus +# 4. org.apache.iotdb.consensus.iot.IoTConsensusV2 # effectiveMode: first_start # Datatype: string data_region_consensus_protocol_class=org.apache.iotdb.consensus.iot.IoTConsensus @@ -318,19 +319,19 @@ sort_tmp_dir=data/datanode/tmp # If its prefix is "/", then the path is absolute. Otherwise, it is relative. dn_pipe_receiver_file_dirs=data/datanode/system/pipe/receiver -# pipe_consensus_receiver_file_dirs +# iot_consensus_v2_receiver_file_dirs # If this property is unset, system will save the data in the default relative path directory under the IoTDB folder(i.e., %IOTDB_HOME%/${dn_system_dir}/pipe/consensus/receiver). # If it is absolute, system will save the data in the exact location it points to. # If it is relative, system will save the data in the relative path directory it indicates under the IoTDB folder. # If there are more than one directory, please separate them by commas ",". -# Note: If pipe_consensus_receiver_file_dirs is assigned an empty string(i.e.,zero-size), it will be handled as a relative path. +# Note: If iot_consensus_v2_receiver_file_dirs is assigned an empty string(i.e.,zero-size), it will be handled as a relative path. # effectiveMode: restart # For windows platform # If its prefix is a drive specifier followed by "\\", or if its prefix is "\\\\", then the path is absolute. Otherwise, it is relative. -# pipe_consensus_receiver_file_dirs=data\\datanode\\system\\pipe\\consensus\\receiver +# iot_consensus_v2_receiver_file_dirs=data\\datanode\\system\\pipe\\consensus\\receiver # For Linux platform # If its prefix is "/", then the path is absolute. Otherwise, it is relative. -pipe_consensus_receiver_file_dirs=data/datanode/system/pipe/consensus/receiver +iot_consensus_v2_receiver_file_dirs=data/datanode/system/pipe/consensus/receiver #################### ### Metric Configuration @@ -1804,12 +1805,20 @@ schema_region_ratis_periodic_snapshot_interval=86400 data_region_ratis_periodic_snapshot_interval=86400 #################### -### Fast IoTConsensus Configuration +### IoTConsensusV2 Configuration #################### -# Default event buffer size for connector and receiver in pipe consensus -# effectiveMode: hot_reload +# Default event buffer size for connector and receiver in iot consensus v2 +# effectiveMode: restart # DataType: int -fast_iot_consensus_pipeline_size=5 +iot_consensus_v2_pipeline_size=5 + +# IoTConsensusV2 mode. +# These consensus protocol modes are currently supported: +# 1. batch +# 2. stream +# effectiveMode: restart +# Datatype: string +iot_consensus_v2_mode=batch #################### ### Procedure Configuration diff --git a/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.bat b/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.bat index 46b6da01d97a..b64cd00dc499 100644 --- a/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.bat +++ b/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.bat @@ -182,16 +182,16 @@ for %%i in (%pipe_receiver_file_dirs%) do ( ) ) -for /f "eol=# tokens=2 delims==" %%i in ('findstr /i "^pipe_consensus_receiver_file_dirs" +for /f "eol=# tokens=2 delims==" %%i in ('findstr /i "^iot_consensus_v2_receiver_file_dirs" %IOTDB_DATANODE_CONFIG%') do ( - set pipe_consensus_receiver_file_dirs=%%i + set iot_consensus_v2_receiver_file_dirs=%%i ) -if "%pipe_consensus_receiver_file_dirs%"=="" ( - set "pipe_consensus_receiver_file_dirs=data\\datanode\\system\\pipe\\consensus\\receiver" +if "%iot_consensus_v2_receiver_file_dirs%"=="" ( + set "iot_consensus_v2_receiver_file_dirs=data\\datanode\\system\\pipe\\consensus\\receiver" ) -set "pipe_consensus_receiver_file_dirs=!pipe_consensus_receiver_file_dirs:%delimiter%= !" -for %%i in (%pipe_consensus_receiver_file_dirs%) do ( +set "iot_consensus_v2_receiver_file_dirs=!iot_consensus_v2_receiver_file_dirs:%delimiter%= !" +for %%i in (%iot_consensus_v2_receiver_file_dirs%) do ( set "var=%%i" if "!var:~0,2!"=="\\" ( rmdir /s /q "%%i" 2>nul diff --git a/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.sh b/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.sh index 39c0a1e612b5..1e8c74d62d7d 100644 --- a/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.sh +++ b/iotdb-core/node-commons/src/assembly/resources/sbin/destroy-datanode.sh @@ -46,7 +46,7 @@ dn_wal_dirs=$(echo $(grep '^dn_wal_dirs=' ${IOTDB_DATANODE_CONFIG} || echo "data dn_tracing_dir=$(echo $(grep '^dn_tracing_dir=' ${IOTDB_DATANODE_CONFIG} || echo "datanode/tracing") | sed 's/.*=//') dn_sync_dir=$(echo $(grep '^dn_sync_dir=' ${IOTDB_DATANODE_CONFIG} || echo "data/datanode/sync") | sed 's/.*=//') pipe_receiver_file_dirs=$(echo $(grep '^pipe_receiver_file_dirs=' ${IOTDB_DATANODE_CONFIG} || echo "data/datanode/system/pipe/receiver") | sed 's/.*=//') -pipe_consensus_receiver_file_dirs=$(echo $(grep '^pipe_consensus_receiver_file_dirs=' ${IOTDB_DATANODE_CONFIG} || echo "data/datanode/system/pipe/consensus/receiver") | sed 's/.*=//') +iot_consensus_v2_receiver_file_dirs=$(echo $(grep '^iot_consensus_v2_receiver_file_dirs=' ${IOTDB_DATANODE_CONFIG} || echo "data/datanode/system/pipe/consensus/receiver") | sed 's/.*=//') sort_tmp_dir=$(echo $(grep '^sort_tmp_dir=' ${IOTDB_DATANODE_CONFIG} || echo "data/datanode/tmp") | sed 's/.*=//') function clearPath { @@ -71,7 +71,7 @@ clearPath $dn_wal_dirs clearPath $dn_tracing_dir clearPath $dn_sync_dir clearPath $pipe_receiver_file_dirs -clearPath $pipe_consensus_receiver_file_dirs +clearPath $iot_consensus_v2_receiver_file_dirs clearPath $sort_tmp_dir echo "DataNode clean done ..." \ No newline at end of file