Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IoTConsensusV2: Introduce IoTConsensusV2 and corresponding mode option #13440

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,7 @@ public class ClusterConstant {
public static final String SIMPLE_CONSENSUS_STR = "Simple";
public static final String RATIS_CONSENSUS_STR = "Ratis";
public static final String IOT_CONSENSUS_STR = "IoT";
public static final String PIPE_CONSENSUS_STR = "Pipe";
public static final String STREAM_CONSENSUS_STR = "Stream";
public static final String BATCH_CONSENSUS_STR = "Batch";
public static final String IOT_CONSENSUS_V2_STR = "IoTV2";

public static final String JAVA_CMD =
System.getProperty("java.home")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,11 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.iotdb.consensus.ConsensusFactory.FAST_IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.REAL_PIPE_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
import static org.apache.iotdb.db.utils.DateTimeUtils.convertLongToDate;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.BATCH_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.CLUSTER_CONFIGURATIONS;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.DEFAULT_DATA_NODE_NUM;
Expand All @@ -47,17 +44,16 @@
import static org.apache.iotdb.it.env.cluster.ClusterConstant.HIGH_PERFORMANCE_MODE_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.HIGH_PERFORMANCE_MODE_DATA_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.IOT_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.IOT_CONSENSUS_V2_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.LIGHT_WEIGHT_STANDALONE_MODE_DATA_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.LOCK_FILE_PATH;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.PIPE_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.RATIS_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SCALABLE_SINGLE_NODE_MODE_DATA_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.SIMPLE_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.STREAM_CONSENSUS_STR;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE_CONFIG_NODE_NUM;
import static org.apache.iotdb.it.env.cluster.ClusterConstant.STRONG_CONSISTENCY_CLUSTER_MODE_DATA_NODE_NUM;
Expand Down Expand Up @@ -222,12 +218,8 @@ public static String fromConsensusFullNameToAbbr(String consensus) {
return RATIS_CONSENSUS_STR;
case IOT_CONSENSUS:
return IOT_CONSENSUS_STR;
case REAL_PIPE_CONSENSUS:
return PIPE_CONSENSUS_STR;
case IOT_CONSENSUS_V2:
return STREAM_CONSENSUS_STR;
case FAST_IOT_CONSENSUS:
return BATCH_CONSENSUS_STR;
return IOT_CONSENSUS_V2_STR;
default:
throw new IllegalArgumentException("Unknown consensus type: " + consensus);
}
Expand All @@ -241,12 +233,8 @@ public static String fromConsensusAbbrToFullName(String consensus) {
return RATIS_CONSENSUS;
case IOT_CONSENSUS_STR:
return IOT_CONSENSUS;
case PIPE_CONSENSUS_STR:
return REAL_PIPE_CONSENSUS;
case STREAM_CONSENSUS_STR:
case IOT_CONSENSUS_V2_STR:
return IOT_CONSENSUS_V2;
case BATCH_CONSENSUS_STR:
return FAST_IOT_CONSENSUS;
default:
throw new IllegalArgumentException("Unknown consensus type: " + consensus);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,9 @@ private void checkGlobalConfig() throws ConfigurationException {
"the SchemaRegion doesn't support org.apache.iotdb.consensus.iot.IoTConsensus");
}

// When the schemaengine region consensus protocol is set to PipeConsensus,
// When the schemaengine region consensus protocol is set to IoTConsensusV2,
// we should report an error
if (CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.FAST_IOT_CONSENSUS)
|| CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) {
if (CONF.getSchemaRegionConsensusProtocolClass().equals(ConsensusFactory.IOT_CONSENSUS_V2)) {
throw new ConfigurationException(
"schema_region_consensus_protocol_class",
String.valueOf(CONF.getSchemaRegionConsensusProtocolClass()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ public class RouteBalancer implements IClusterStatusSubscriber {
&& ConsensusFactory.RATIS_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&& ConsensusFactory.IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&& ConsensusFactory.FAST_IOT_CONSENSUS.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
|| (CONF.isEnableAutoLeaderBalanceForIoTConsensus()
&& ConsensusFactory.IOT_CONSENSUS_V2.equals(DATA_REGION_CONSENSUS_PROTOCOL_CLASS))
// The simple consensus protocol will always automatically designate itself as the leader
Expand Down Expand Up @@ -185,7 +183,6 @@ private void balanceRegionLeader(
regionGroupId,
newLeaderId);
switch (consensusProtocolClass) {
case ConsensusFactory.FAST_IOT_CONSENSUS:
case ConsensusFactory.IOT_CONSENSUS_V2:
case ConsensusFactory.IOT_CONSENSUS:
case ConsensusFactory.SIMPLE_CONSENSUS:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@

import org.apache.iotdb.commons.client.container.PipeConsensusClientMgrContainer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;
import org.apache.iotdb.consensus.pipe.metric.PipeConsensusSyncLagManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

public class ConsensusFactory {
Expand All @@ -41,16 +38,9 @@ public class ConsensusFactory {
public static final String RATIS_CONSENSUS = "org.apache.iotdb.consensus.ratis.RatisConsensus";
public static final String IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTConsensus";
public static final String REAL_PIPE_CONSENSUS = "org.apache.iotdb.consensus.pipe.PipeConsensus";
// Corresponding to streamConsensus
public static final String IOT_CONSENSUS_V2 = "org.apache.iotdb.consensus.iot.IoTConsensusV2";
// Corresponding to batchConsensus
public static final String FAST_IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.FastIoTConsensus";
private static final Map<String, ReplicateMode> PIPE_CONSENSUS_MODE_MAP = new HashMap<>();

static {
PIPE_CONSENSUS_MODE_MAP.put(IOT_CONSENSUS_V2, ReplicateMode.STREAM);
PIPE_CONSENSUS_MODE_MAP.put(FAST_IOT_CONSENSUS, ReplicateMode.BATCH);
}
public static final String IOT_CONSENSUS_V2_BATCH_MODE = "batch";
public static final String IOT_CONSENSUS_V2_STREAM_MODE = "stream";

private static final Logger logger = LoggerFactory.getLogger(ConsensusFactory.class);

Expand All @@ -61,13 +51,12 @@ private ConsensusFactory() {
public static Optional<IConsensus> getConsensusImpl(
String className, ConsensusConfig config, IStateMachine.Registry registry) {
try {
// special judge for PipeConsensus
if (className.equals(IOT_CONSENSUS_V2) || className.equals(FAST_IOT_CONSENSUS)) {
config.getPipeConsensusConfig().setReplicateMode(PIPE_CONSENSUS_MODE_MAP.get(className));
// special judge for IoTConsensusV2
if (className.equals(IOT_CONSENSUS_V2)) {
className = REAL_PIPE_CONSENSUS;
// initialize pipeConsensus' thrift component
// initialize iotConsensusV2's thrift component
PipeConsensusClientMgrContainer.build();
// initialize pipeConsensus's metric component
// initialize iotConsensusV2's metric component
PipeConsensusSyncLagManager.build();
}
Class<?> executor = Class.forName(className);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,11 @@ public class PipeConsensusConfig {
private final RPC rpc;
private final Pipe pipe;
// Use stream mode by default. User can configure it
private ReplicateMode replicateMode = ReplicateMode.STREAM;
private final ReplicateMode replicateMode;

public PipeConsensusConfig(RPC rpc, Pipe pipe) {
public PipeConsensusConfig(RPC rpc, Pipe pipe, ReplicateMode replicateMode) {
this.rpc = rpc;
this.pipe = pipe;
}

public void setReplicateMode(ReplicateMode replicateMode) {
this.replicateMode = replicateMode;
}

Expand All @@ -62,6 +59,7 @@ public static Builder newBuilder() {
public static class Builder {
private RPC rpc;
private Pipe pipe;
private ReplicateMode replicateMode;

public Builder setPipe(Pipe pipe) {
this.pipe = pipe;
Expand All @@ -73,8 +71,13 @@ public Builder setRPC(RPC rpc) {
return this;
}

public Builder setReplicateMode(ReplicateMode replicateMode) {
this.replicateMode = replicateMode;
return this;
}

public PipeConsensusConfig build() {
return new PipeConsensusConfig(rpc, pipe);
return new PipeConsensusConfig(rpc, pipe, replicateMode);
}
}

Expand Down Expand Up @@ -354,5 +357,15 @@ public enum ReplicateMode {
public String getValue() {
return value;
}

public static ReplicateMode fromValue(String value) {
if (value.equalsIgnoreCase(STREAM.getValue())) {
return STREAM;
} else if (value.equalsIgnoreCase(BATCH.getValue())) {
return BATCH;
}
// return batch by default
return BATCH;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1126,8 +1126,10 @@ public class IoTDBConfig {
private double maxMemoryRatioForQueue = 0.6;
private long regionMigrationSpeedLimitBytesPerSecond = 32 * 1024 * 1024L;

// PipeConsensus Config
private int pipeConsensusPipelineSize = 5;
// IoTConsensusV2 Config
private int iotConsensusV2PipelineSize = 5;
private String iotConsensusV2Mode = ConsensusFactory.IOT_CONSENSUS_V2_BATCH_MODE;
private String[] iotConsensusV2ReceiverFileDirs = new String[0];

/** Load related */
private double maxAllocateMemoryRatioForLoad = 0.8;
Expand Down Expand Up @@ -1176,8 +1178,6 @@ public class IoTDBConfig {
/** initialized as empty, updated based on the latest `systemDir` during querying */
private String[] pipeReceiverFileDirs = new String[0];

private String[] pipeConsensusReceiverFileDirs = new String[0];

/** Resource control */
private boolean quotaEnable = false;

Expand Down Expand Up @@ -1221,12 +1221,12 @@ public void setRegionMigrationSpeedLimitBytesPerSecond(
this.regionMigrationSpeedLimitBytesPerSecond = regionMigrationSpeedLimitBytesPerSecond;
}

public int getPipeConsensusPipelineSize() {
return pipeConsensusPipelineSize;
public int getIotConsensusV2PipelineSize() {
return iotConsensusV2PipelineSize;
}

public void setPipeConsensusPipelineSize(int pipeConsensusPipelineSize) {
this.pipeConsensusPipelineSize = pipeConsensusPipelineSize;
public void setIotConsensusV2PipelineSize(int iotConsensusV2PipelineSize) {
this.iotConsensusV2PipelineSize = iotConsensusV2PipelineSize;
}

public void setMaxSizePerBatch(int maxSizePerBatch) {
Expand Down Expand Up @@ -1351,8 +1351,8 @@ private void formulateFolders() {
for (int i = 0; i < pipeReceiverFileDirs.length; i++) {
pipeReceiverFileDirs[i] = addDataHomeDir(pipeReceiverFileDirs[i]);
}
for (int i = 0; i < pipeConsensusReceiverFileDirs.length; i++) {
pipeConsensusReceiverFileDirs[i] = addDataHomeDir(pipeConsensusReceiverFileDirs[i]);
for (int i = 0; i < iotConsensusV2ReceiverFileDirs.length; i++) {
iotConsensusV2ReceiverFileDirs[i] = addDataHomeDir(iotConsensusV2ReceiverFileDirs[i]);
}
mqttDir = addDataHomeDir(mqttDir);
extPipeDir = addDataHomeDir(extPipeDir);
Expand Down Expand Up @@ -3188,6 +3188,14 @@ public void setDataRegionConsensusProtocolClass(String dataRegionConsensusProtoc
this.dataRegionConsensusProtocolClass = dataRegionConsensusProtocolClass;
}

public String getIotConsensusV2Mode() {
return iotConsensusV2Mode;
}

public void setIotConsensusV2Mode(String iotConsensusV2Mode) {
this.iotConsensusV2Mode = iotConsensusV2Mode;
}

public String getSchemaRegionConsensusProtocolClass() {
return schemaRegionConsensusProtocolClass;
}
Expand Down Expand Up @@ -4064,13 +4072,13 @@ public String[] getPipeReceiverFileDirs() {
: this.pipeReceiverFileDirs;
}

public void setPipeConsensusReceiverFileDirs(String[] pipeConsensusReceiverFileDirs) {
this.pipeConsensusReceiverFileDirs = pipeConsensusReceiverFileDirs;
public void setIotConsensusV2ReceiverFileDirs(String[] iotConsensusV2ReceiverFileDirs) {
this.iotConsensusV2ReceiverFileDirs = iotConsensusV2ReceiverFileDirs;
}

public String[] getPipeConsensusReceiverFileDirs() {
return (Objects.isNull(this.pipeConsensusReceiverFileDirs)
|| this.pipeConsensusReceiverFileDirs.length == 0)
public String[] getIotConsensusV2ReceiverFileDirs() {
return (Objects.isNull(this.iotConsensusV2ReceiverFileDirs)
|| this.iotConsensusV2ReceiverFileDirs.length == 0)
? new String[] {
systemDir
+ File.separator
Expand All @@ -4080,7 +4088,7 @@ public String[] getPipeConsensusReceiverFileDirs() {
+ File.separator
+ "receiver"
}
: this.pipeConsensusReceiverFileDirs;
: this.iotConsensusV2ReceiverFileDirs;
}

public boolean isQuotaEnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1074,12 +1074,12 @@ public void loadProperties(Properties properties) throws BadNodeUrlException, IO
"datanode_schema_cache_eviction_policy", conf.getDataNodeSchemaCacheEvictionPolicy()));

loadIoTConsensusProps(properties);
loadPipeConsensusProps(properties);
loadIoTConsensusV2Props(properties);
}

private void reloadConsensusProps(Properties properties) throws IOException {
loadIoTConsensusProps(properties);
loadPipeConsensusProps(properties);
loadIoTConsensusV2Props(properties);
DataRegionConsensusImpl.reloadConsensusConfig();
}

Expand Down Expand Up @@ -1126,16 +1126,20 @@ private void loadIoTConsensusProps(Properties properties) throws IOException {
.trim()));
}

private void loadPipeConsensusProps(Properties properties) throws IOException {
conf.setPipeConsensusPipelineSize(
private void loadIoTConsensusV2Props(Properties properties) throws IOException {
conf.setIotConsensusV2PipelineSize(
Integer.parseInt(
properties.getProperty(
"fast_iot_consensus_pipeline_size",
"iot_consensus_v2_pipeline_size",
ConfigurationFileUtils.getConfigurationDefaultValue(
"fast_iot_consensus_pipeline_size"))));
if (conf.getPipeConsensusPipelineSize() <= 0) {
conf.setPipeConsensusPipelineSize(5);
"iot_consensus_v2_pipeline_size"))));
if (conf.getIotConsensusV2PipelineSize() <= 0) {
conf.setIotConsensusV2PipelineSize(5);
}
conf.setIotConsensusV2Mode(
Pengzna marked this conversation as resolved.
Show resolved Hide resolved
properties.getProperty(
"iot_consensus_v2_mode",
ConfigurationFileUtils.getConfigurationDefaultValue("iot_consensus_v2_mode")));
}

private void loadAuthorCache(Properties properties) {
Expand Down Expand Up @@ -2446,12 +2450,12 @@ private void loadPipeProps(Properties properties) {
.filter(dir -> !dir.isEmpty())
.toArray(String[]::new));

conf.setPipeConsensusReceiverFileDirs(
conf.setIotConsensusV2ReceiverFileDirs(
Arrays.stream(
properties
.getProperty(
"pipe_consensus_receiver_file_dirs",
String.join(",", conf.getPipeConsensusReceiverFileDirs()))
"iot_consensus_v2_receiver_file_dirs",
String.join(",", conf.getIotConsensusV2ReceiverFileDirs()))
.trim()
.split(","))
.filter(dir -> !dir.isEmpty())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.config.IoTConsensusConfig.RPC;
import org.apache.iotdb.consensus.config.PipeConsensusConfig;
import org.apache.iotdb.consensus.config.PipeConsensusConfig.ReplicateMode;
import org.apache.iotdb.consensus.config.RatisConfig;
import org.apache.iotdb.consensus.config.RatisConfig.Snapshot;
import org.apache.iotdb.db.conf.IoTDBConfig;
Expand Down Expand Up @@ -168,8 +169,9 @@ private static ConsensusConfig buildConsensusConfig() {
() -> PipeDataNodeAgent.task().getAllConsensusPipe())
.setConsensusPipeReceiver(PipeDataNodeAgent.receiver().pipeConsensus())
.setProgressIndexManager(new ProgressIndexDataNodeManager())
.setConsensusPipeGuardJobIntervalInSeconds(300) // TODO: move to config
.setConsensusPipeGuardJobIntervalInSeconds(300)
.build())
.setReplicateMode(ReplicateMode.fromValue(CONF.getIotConsensusV2Mode()))
.build())
.setRatisConfig(
RatisConfig.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void cleanPipeReceiverDirs() {
.forEach(IoTDBReceiverAgent::cleanPipeReceiverDir);
// consensus
String[] pipeConsensusReceiverFileDirs =
IoTDBDescriptor.getInstance().getConfig().getPipeConsensusReceiverFileDirs();
IoTDBDescriptor.getInstance().getConfig().getIotConsensusV2ReceiverFileDirs();
Arrays.stream(pipeConsensusReceiverFileDirs)
.map(File::new)
.forEach(IoTDBReceiverAgent::cleanPipeReceiverDir);
Expand Down
Loading
Loading