diff --git a/example/session/src/main/java/org/apache/iotdb/SessionExample.java b/example/session/src/main/java/org/apache/iotdb/SessionExample.java index 35423b06f7c9..c010a49ad280 100644 --- a/example/session/src/main/java/org/apache/iotdb/SessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/SessionExample.java @@ -644,4 +644,14 @@ private static void setTimeout() throws StatementExecutionException { Session tempSession = new Session(LOCAL_HOST, 6667, "root", "root", 10000, 20000); tempSession.setTimeout(60000); } + + private static void createClusterSession() throws IoTDBConnectionException { + ArrayList nodeList = new ArrayList<>(); + nodeList.add("127.0.0.1:6669"); + nodeList.add("127.0.0.1:6667"); + nodeList.add("127.0.0.1:6668"); + Session clusterSession = new Session(nodeList, "root", "root"); + clusterSession.open(); + clusterSession.close(); + } } diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index 447c2b5b946e..aac0a5f620dd 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -75,6 +75,7 @@ public class Session { public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data type:"; public static final String MSG_DONOT_ENABLE_REDIRECT = "Query do not enable redirect," + " please confirm the session and server conf."; + protected List nodeUrls; protected String username; protected String password; protected int fetchSize; @@ -240,6 +241,66 @@ public Session( this.enableCacheLeader = enableCacheLeader; } + public Session(List nodeUrls, String username, String password) { + this( + nodeUrls, + username, + password, + Config.DEFAULT_FETCH_SIZE, + null, + Config.DEFAULT_INITIAL_BUFFER_CAPACITY, + Config.DEFAULT_MAX_FRAME_SIZE, + Config.DEFAULT_CACHE_LEADER_MODE); + } + + /** + * Multiple nodeUrl,If one node down, connect to the next one + * + * @param nodeUrls List Multiple ip:rpcPort eg.127.0.0.1:9001 + */ + public Session(List nodeUrls, String username, String password, int fetchSize) { + this( + nodeUrls, + username, + password, + fetchSize, + null, + Config.DEFAULT_INITIAL_BUFFER_CAPACITY, + Config.DEFAULT_MAX_FRAME_SIZE, + Config.DEFAULT_CACHE_LEADER_MODE); + } + + public Session(List nodeUrls, String username, String password, ZoneId zoneId) { + this( + nodeUrls, + username, + password, + Config.DEFAULT_FETCH_SIZE, + zoneId, + Config.DEFAULT_INITIAL_BUFFER_CAPACITY, + Config.DEFAULT_MAX_FRAME_SIZE, + Config.DEFAULT_CACHE_LEADER_MODE); + } + + public Session( + List nodeUrls, + String username, + String password, + int fetchSize, + ZoneId zoneId, + int thriftDefaultBufferSize, + int thriftMaxFrameSize, + boolean enableCacheLeader) { + this.nodeUrls = nodeUrls; + this.username = username; + this.password = password; + this.fetchSize = fetchSize; + this.zoneId = zoneId; + this.thriftDefaultBufferSize = thriftDefaultBufferSize; + this.thriftMaxFrameSize = thriftMaxFrameSize; + this.enableCacheLeader = enableCacheLeader; + } + public void setFetchSize(int fetchSize) { this.fetchSize = fetchSize; } @@ -294,6 +355,9 @@ public synchronized void close() throws IoTDBConnectionException { public SessionConnection constructSessionConnection( Session session, EndPoint endpoint, ZoneId zoneId) throws IoTDBConnectionException { + if (endpoint == null) { + return new SessionConnection(session, zoneId); + } return new SessionConnection(session, endpoint, zoneId); } diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 387902b38dab..79b99677c98b 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -60,7 +60,9 @@ import org.slf4j.LoggerFactory; import java.time.ZoneId; +import java.util.ArrayList; import java.util.List; +import java.util.Random; public class SessionConnection { @@ -74,6 +76,7 @@ public class SessionConnection { private long statementId; private ZoneId zoneId; private EndPoint endPoint; + private List endPointList = new ArrayList<>(); private boolean enableRedirect = false; // TestOnly @@ -83,10 +86,18 @@ public SessionConnection(Session session, EndPoint endPoint, ZoneId zoneId) throws IoTDBConnectionException { this.session = session; this.endPoint = endPoint; + endPointList.add(endPoint); this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId; init(endPoint); } + public SessionConnection(Session session, ZoneId zoneId) throws IoTDBConnectionException { + this.session = session; + this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId; + this.endPointList = SessionUtils.parseSeedNodeUrls(session.nodeUrls); + initClusterConn(); + } + private void init(EndPoint endPoint) throws IoTDBConnectionException { RpcTransportFactory.setDefaultBufferCapacity(session.thriftDefaultBufferSize); RpcTransportFactory.setThriftMaxFrameSize(session.thriftMaxFrameSize); @@ -145,6 +156,21 @@ private void init(EndPoint endPoint) throws IoTDBConnectionException { } } + private void initClusterConn() throws IoTDBConnectionException { + for (EndPoint endPoint : endPointList) { + try { + session.defaultEndPoint = endPoint; + init(endPoint); + } catch (IoTDBConnectionException e) { + if (!reconnect()) { + logger.error("Cluster has no nodes to connect"); + throw new IoTDBConnectionException(e); + } + } + break; + } + } + public void close() throws IoTDBConnectionException { TSCloseSessionReq req = new TSCloseSessionReq(sessionId); try { @@ -720,24 +746,38 @@ protected void testInsertTablets(TSInsertTabletsReq request) } private boolean reconnect() { - boolean flag = false; + boolean connectedSuccess = false; + Random random = new Random(); for (int i = 1; i <= Config.RETRY_NUM; i++) { - try { - if (transport != null) { - close(); - init(endPoint); - flag = true; - } - } catch (Exception e) { - try { - Thread.sleep(Config.RETRY_INTERVAL_MS); - } catch (InterruptedException e1) { - logger.error("reconnect is interrupted.", e1); - Thread.currentThread().interrupt(); + if (transport != null) { + transport.close(); + int currHostIndex = random.nextInt(endPointList.size()); + int tryHostNum = 0; + for (int j = currHostIndex; j < endPointList.size(); j++) { + if (tryHostNum == endPointList.size()) { + break; + } + session.defaultEndPoint = endPointList.get(j); + this.endPoint = endPointList.get(j); + if (j == endPointList.size() - 1) { + j = -1; + } + tryHostNum++; + try { + init(endPoint); + connectedSuccess = true; + } catch (IoTDBConnectionException e) { + logger.error("The current node may have been down {},try next node", endPoint); + continue; + } + break; } } + if (connectedSuccess) { + break; + } } - return flag; + return connectedSuccess; } protected void createSchemaTemplate(TSCreateSchemaTemplateReq request) diff --git a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java b/session/src/main/java/org/apache/iotdb/session/SessionUtils.java index f0289903ba1e..f50ad52e465d 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionUtils.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionUtils.java @@ -18,6 +18,7 @@ */ package org.apache.iotdb.session; +import org.apache.iotdb.service.rpc.thrift.EndPoint; import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.utils.Binary; @@ -27,10 +28,17 @@ import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; public class SessionUtils { + private static final Logger logger = LoggerFactory.getLogger(SessionUtils.class); + public static ByteBuffer getTimeBuffer(Tablet tablet) { ByteBuffer timeBuffer = ByteBuffer.allocate(tablet.getTimeBytesSize()); for (int i = 0; i < tablet.rowSize; i++) { @@ -149,4 +157,31 @@ private static void getValueBufferOfDataType( String.format("Data type %s is not supported.", dataType)); } } + + public static List parseSeedNodeUrls(List nodeUrls) { + if (nodeUrls == null) { + throw new NumberFormatException("nodeUrls is null"); + } + List endPointsList = new ArrayList<>(); + for (String nodeUrl : nodeUrls) { + EndPoint endPoint = parseNodeUrl(nodeUrl); + endPointsList.add(endPoint); + } + return endPointsList; + } + + private static EndPoint parseNodeUrl(String nodeUrl) { + EndPoint endPoint = new EndPoint(); + String[] split = nodeUrl.split(":"); + if (split.length != 2) { + throw new NumberFormatException("NodeUrl Incorrect format"); + } + String ip = split[0]; + try { + int rpcPort = Integer.parseInt(split[1]); + return endPoint.setIp(ip).setPort(rpcPort); + } catch (Exception e) { + throw new NumberFormatException("NodeUrl Incorrect format"); + } + } } diff --git a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java index 6b1234c98eba..760392176bcc 100644 --- a/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java +++ b/session/src/test/java/org/apache/iotdb/session/IoTDBSessionComplexIT.java @@ -889,4 +889,39 @@ public void testInsertTabletWithTriggers() session.close(); } + + @Test + public void testSessionCluster() throws IoTDBConnectionException, StatementExecutionException { + ArrayList nodeList = new ArrayList<>(); + nodeList.add("127.0.0.1:6669"); + nodeList.add("127.0.0.1:6667"); + nodeList.add("127.0.0.1:6668"); + session = new Session(nodeList, "root", "root"); + session.open(); + + session.setStorageGroup("root.sg1"); + + createTimeseries(); + insertByStr(); + + insertViaSQL(); + queryByDevice("root.sg1.d1"); + + session.close(); + } + + @Test + public void testErrorSessionCluster() throws IoTDBConnectionException { + ArrayList nodeList = new ArrayList<>(); + // test Format error + nodeList.add("127.0.0.16669"); + nodeList.add("127.0.0.1:6667"); + session = new Session(nodeList, "root", "root"); + try { + session.open(); + } catch (Exception e) { + Assert.assertEquals("NodeUrl Incorrect format", e.getMessage()); + } + session.close(); + } } diff --git a/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterSessionSimpleIT.java b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterSessionSimpleIT.java new file mode 100644 index 000000000000..6f9189fd5edf --- /dev/null +++ b/testcontainer/src/test/java/org/apache/iotdb/db/sql/ClusterSessionSimpleIT.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.sql; + +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.session.Session; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; + +import org.junit.Rule; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.DockerComposeContainer; +import org.testcontainers.containers.NoProjectNameDockerComposeContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +public class ClusterSessionSimpleIT { + + private static Logger node1Logger = LoggerFactory.getLogger("iotdb-server_1"); + private static Logger node2Logger = LoggerFactory.getLogger("iotdb-server_2"); + private static Logger node3Logger = LoggerFactory.getLogger("iotdb-server_3"); + + private Session session; + + @Rule + public DockerComposeContainer environment = + new NoProjectNameDockerComposeContainer( + "3nodes", new File("src/test/resources/3nodes/docker-compose.yaml")) + .withExposedService("iotdb-server_1", 6667, Wait.forListeningPort()) + .withLogConsumer("iotdb-server_1", new Slf4jLogConsumer(node1Logger)) + .withExposedService("iotdb-server_2", 6667, Wait.forListeningPort()) + .withLogConsumer("iotdb-server_2", new Slf4jLogConsumer(node2Logger)) + .withExposedService("iotdb-server_3", 6667, Wait.forListeningPort()) + .withLogConsumer("iotdb-server_3", new Slf4jLogConsumer(node3Logger)) + .withLocalCompose(true); + + protected DockerComposeContainer getContainer() { + return environment; + } + + @Test + public void testSessionCluster() throws IoTDBConnectionException, StatementExecutionException { + List stringList = new ArrayList<>(); + Integer service1Port = getContainer().getServicePort("iotdb-server_1", 6667); + Integer service2Port = getContainer().getServicePort("iotdb-server_2", 6667); + Integer service3Port = getContainer().getServicePort("iotdb-server_3", 6667); + stringList.add("localhost:" + service1Port); + stringList.add("localhost:" + service2Port); + stringList.add("localhost:" + service3Port); + session = new Session(stringList, "root", "root"); + session.open(); + session.setStorageGroup("root.sg1"); + session.createTimeseries( + "root.sg1.d1.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + + session.createTimeseries( + "root.sg1.d2.s1", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY); + session.close(); + } +}