diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java index 75997e483e26..1f2eca0e5e89 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeRuntimeAgent.java @@ -32,6 +32,9 @@ import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.progress.assigner.SimpleConsensusProgressIndexAssigner; import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner; +import org.apache.iotdb.db.protocol.client.ConfigNodeClient; +import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager; +import org.apache.iotdb.db.protocol.client.ConfigNodeInfo; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.service.ResourcesInformationHolder; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -41,6 +44,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; public class PipeRuntimeAgent implements IService { @@ -48,13 +52,14 @@ public class PipeRuntimeAgent implements IService { private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId(); private final AtomicBoolean isShutdown = new AtomicBoolean(false); - - private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor = - new PipePeriodicalJobExecutor(); + private final AtomicReference clusterId = new AtomicReference<>(null); private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner = new SimpleConsensusProgressIndexAssigner(); + private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor = + new PipePeriodicalJobExecutor(); + //////////////////////////// System Service Interface //////////////////////////// public synchronized void preparePipeResources( @@ -103,6 +108,22 @@ public ServiceType getID() { return ServiceType.PIPE_RUNTIME_AGENT; } + public String getClusterIdIfPossible() { + if (clusterId.get() == null) { + synchronized (clusterId) { + if (clusterId.get() == null) { + try (final ConfigNodeClient configNodeClient = + ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) { + clusterId.set(configNodeClient.getClusterId().getClusterId()); + } catch (Exception e) { + LOGGER.warn("Unable to get clusterId, because: {}", e.getMessage(), e); + } + } + } + } + return clusterId.get(); + } + ////////////////////// SimpleConsensus ProgressIndex Assigner ////////////////////// public void assignSimpleProgressIndexIfNeeded(InsertNode insertNode) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/common/PipeTransferHandshakeConstant.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/common/PipeTransferHandshakeConstant.java new file mode 100644 index 000000000000..3d2bf1cbb0f4 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/common/PipeTransferHandshakeConstant.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.payload.evolvable.common; + +public class PipeTransferHandshakeConstant { + + public static final String HANDSHAKE_KEY_TIME_PRECISION = "timestampPrecision"; + public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID"; + + private PipeTransferHandshakeConstant() { + // Utility class + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV1Req.java similarity index 83% rename from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java rename to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV1Req.java index 80f9d1217eb5..c829f5310289 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeReq.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV1Req.java @@ -30,11 +30,11 @@ import java.nio.ByteBuffer; import java.util.Objects; -public class PipeTransferHandshakeReq extends TPipeTransferReq { +public class PipeTransferHandshakeV1Req extends TPipeTransferReq { private transient String timestampPrecision; - private PipeTransferHandshakeReq() { + private PipeTransferHandshakeV1Req() { // Empty constructor } @@ -44,14 +44,14 @@ public String getTimestampPrecision() { /////////////////////////////// Thrift /////////////////////////////// - public static PipeTransferHandshakeReq toTPipeTransferReq(String timestampPrecision) + public static PipeTransferHandshakeV1Req toTPipeTransferReq(String timestampPrecision) throws IOException { - final PipeTransferHandshakeReq handshakeReq = new PipeTransferHandshakeReq(); + final PipeTransferHandshakeV1Req handshakeReq = new PipeTransferHandshakeV1Req(); handshakeReq.timestampPrecision = timestampPrecision; handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion(); - handshakeReq.type = PipeRequestType.HANDSHAKE.getType(); + handshakeReq.type = PipeRequestType.HANDSHAKE_V1.getType(); try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { ReadWriteIOUtils.write(timestampPrecision, outputStream); @@ -62,8 +62,8 @@ public static PipeTransferHandshakeReq toTPipeTransferReq(String timestampPrecis return handshakeReq; } - public static PipeTransferHandshakeReq fromTPipeTransferReq(TPipeTransferReq transferReq) { - final PipeTransferHandshakeReq handshakeReq = new PipeTransferHandshakeReq(); + public static PipeTransferHandshakeV1Req fromTPipeTransferReq(TPipeTransferReq transferReq) { + final PipeTransferHandshakeV1Req handshakeReq = new PipeTransferHandshakeV1Req(); handshakeReq.timestampPrecision = ReadWriteIOUtils.readString(transferReq.body); @@ -80,7 +80,7 @@ public static byte[] toTransferHandshakeBytes(String timestampPrecision) throws try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), outputStream); - ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE.getType(), outputStream); + ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE_V1.getType(), outputStream); ReadWriteIOUtils.write(timestampPrecision, outputStream); return byteArrayOutputStream.toByteArray(); } @@ -96,7 +96,7 @@ public boolean equals(Object obj) { if (obj == null || getClass() != obj.getClass()) { return false; } - PipeTransferHandshakeReq that = (PipeTransferHandshakeReq) obj; + PipeTransferHandshakeV1Req that = (PipeTransferHandshakeV1Req) obj; return timestampPrecision.equals(that.timestampPrecision) && version == that.version && type == that.type diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV2Req.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV2Req.java new file mode 100644 index 000000000000..f2c8a9233a83 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferHandshakeV2Req.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.payload.evolvable.request; + +import org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion; +import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType; +import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq; +import org.apache.iotdb.tsfile.utils.PublicBAOS; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; + +import org.apache.thrift.TException; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +public class PipeTransferHandshakeV2Req extends TPipeTransferReq { + + private transient Map params; + + private PipeTransferHandshakeV2Req() { + // Empty constructor + } + + public Map getParams() { + return params; + } + + /////////////////////////////// Thrift /////////////////////////////// + + public static PipeTransferHandshakeV2Req toTPipeTransferReq(Map params) + throws TException, IOException { + final PipeTransferHandshakeV2Req handshakeReq = new PipeTransferHandshakeV2Req(); + + handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion(); + handshakeReq.type = PipeRequestType.HANDSHAKE_V2.getType(); + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(params.size(), outputStream); + for (final Map.Entry entry : params.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue(), outputStream); + } + handshakeReq.body = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + } + + handshakeReq.params = params; + + return handshakeReq; + } + + public static PipeTransferHandshakeV2Req fromTPipeTransferReq(TPipeTransferReq transferReq) { + final PipeTransferHandshakeV2Req handshakeReq = new PipeTransferHandshakeV2Req(); + + Map params = new HashMap<>(); + final int size = ReadWriteIOUtils.readInt(transferReq.body); + for (int i = 0; i < size; ++i) { + final String key = ReadWriteIOUtils.readString(transferReq.body); + final String value = ReadWriteIOUtils.readString(transferReq.body); + params.put(key, value); + } + handshakeReq.params = params; + + handshakeReq.version = transferReq.version; + handshakeReq.type = transferReq.type; + handshakeReq.body = transferReq.body; + + return handshakeReq; + } + + /////////////////////////////// Air Gap /////////////////////////////// + + public static byte[] toTransferHandshakeBytes(HashMap params) throws IOException { + try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { + ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), outputStream); + ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE_V2.getType(), outputStream); + ReadWriteIOUtils.write(params.size(), outputStream); + for (final Map.Entry entry : params.entrySet()) { + ReadWriteIOUtils.write(entry.getKey(), outputStream); + ReadWriteIOUtils.write(entry.getValue(), outputStream); + } + return byteArrayOutputStream.toByteArray(); + } + } + + /////////////////////////////// Object /////////////////////////////// + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + PipeTransferHandshakeV2Req that = (PipeTransferHandshakeV2Req) obj; + return Objects.equals(params, that.params) + && version == that.version + && type == that.type + && Objects.equals(body, that.body); + } + + @Override + public int hashCode() { + return Objects.hash(params, version, type, body); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java index b9ffaa94a261..854f527222c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBAirGapConnector.java @@ -26,11 +26,14 @@ import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant; import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq; -import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq; @@ -63,6 +66,7 @@ import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Objects; import java.util.Set; @@ -194,11 +198,23 @@ public void handshake() throws Exception { continue; } - if (!send( - socket, - PipeTransferHandshakeReq.toTransferHandshakeBytes( - CommonDescriptor.getInstance().getConfig().getTimestampPrecision()))) { - throw new PipeException("Handshake error with target server ip: " + ip + ", port: " + port); + final HashMap params = new HashMap<>(); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, + PipeAgent.runtime().getClusterIdIfPossible()); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + + // Try to handshake by PipeTransferHandshakeV2Req. If failed, retry to handshake by + // PipeTransferHandshakeV1Req. If failed again, throw PipeConnectionException. + if (!send(socket, PipeTransferHandshakeV2Req.toTransferHandshakeBytes(params)) + && !send( + socket, + PipeTransferHandshakeV1Req.toTransferHandshakeBytes( + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()))) { + throw new PipeConnectionException( + "Handshake error with target server ip: " + ip + ", port: " + port); } else { isSocketAlive.set(i, true); socket.setSoTimeout((int) PIPE_CONFIG.getPipeConnectorTransferTimeoutMs()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java index 477165d2bd87..b70794667dd3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncClientManager.java @@ -24,7 +24,10 @@ import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.async.AsyncPipeDataTransferServiceClient; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq; +import org.apache.iotdb.db.pipe.agent.PipeAgent; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req; import org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftClientManager; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.pipe.api.exception.PipeException; @@ -35,6 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -122,14 +126,14 @@ private boolean handshakeIfNecessary( } final AtomicBoolean isHandshakeFinished = new AtomicBoolean(false); + final AtomicReference resp = new AtomicReference<>(); final AtomicReference exception = new AtomicReference<>(); - - client.pipeTransfer( - PipeTransferHandshakeReq.toTPipeTransferReq( - CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), + final AsyncMethodCallback callback = new AsyncMethodCallback() { @Override public void onComplete(TPipeTransferResp response) { + resp.set(response); + if (response.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn( "Handshake error with receiver {}:{}, code: {}, message: {}.", @@ -167,8 +171,47 @@ public void onError(Exception e) { isHandshakeFinished.set(true); } - }); + }; + + // Try to handshake by PipeTransferHandshakeV2Req. + final HashMap params = new HashMap<>(); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, + PipeAgent.runtime().getClusterIdIfPossible()); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + client.pipeTransfer(PipeTransferHandshakeV2Req.toTPipeTransferReq(params), callback); + waitHandshakeFinished(isHandshakeFinished); + + // Retry to handshake by PipeTransferHandshakeV1Req. + if (resp.get() != null + && resp.get().getStatus().getCode() == TSStatusCode.PIPE_VERSION_ERROR.getStatusCode()) { + LOGGER.info( + "Handshake error by PipeTransferHandshakeV2Req with receiver {}:{} " + + "retry to handshake by PipeTransferHandshakeV1Req.", + targetNodeUrl.getIp(), + targetNodeUrl.getPort()); + + isHandshakeFinished.set(false); + resp.set(null); + exception.set(null); + + client.pipeTransfer( + PipeTransferHandshakeV1Req.toTPipeTransferReq( + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), + callback); + waitHandshakeFinished(isHandshakeFinished); + } + + if (exception.get() != null) { + throw new PipeConnectionException("Failed to handshake.", exception.get()); + } + + return false; + } + private void waitHandshakeFinished(AtomicBoolean isHandshakeFinished) { try { while (!isHandshakeFinished.get()) { Thread.sleep(10); @@ -177,12 +220,6 @@ public void onError(Exception e) { Thread.currentThread().interrupt(); throw new PipeException("Interrupted while waiting for handshake response.", e); } - - if (exception.get() != null) { - throw new PipeConnectionException("Failed to handshake.", exception.get()); - } - - return false; } public void updateLeaderCache(String deviceId, TEndPoint endPoint) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java index f3f832377680..6cc5c5ac6831 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBThriftSyncClientManager.java @@ -24,20 +24,22 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.client.IoTDBThriftSyncConnectorClient; -import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq; +import org.apache.iotdb.db.pipe.agent.PipeAgent; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req; import org.apache.iotdb.db.pipe.connector.protocol.thrift.IoTDBThriftClientManager; import org.apache.iotdb.pipe.api.exception.PipeConnectionException; import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp; import org.apache.iotdb.tsfile.utils.Pair; -import org.apache.thrift.TException; -import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -95,7 +97,7 @@ public void checkClientStatusAndTryReconstructIfNecessary() throws IOException { "All target servers %s are not available.", endPoint2ClientAndStatus.keySet())); } - private void reconstructClient(TEndPoint endPoint) throws IOException { + private void reconstructClient(TEndPoint endPoint) { final Pair clientAndStatus = endPoint2ClientAndStatus.get(endPoint); @@ -111,6 +113,12 @@ private void reconstructClient(TEndPoint endPoint) throws IOException { } } + initClientAndStatus(clientAndStatus, endPoint); + sendHandshakeReq(clientAndStatus, endPoint); + } + + private void initClientAndStatus( + Pair clientAndStatus, TEndPoint endPoint) { try { clientAndStatus.setLeft( new IoTDBThriftSyncConnectorClient( @@ -124,7 +132,7 @@ private void reconstructClient(TEndPoint endPoint) throws IOException { useSSL, trustStorePath, trustStorePwd)); - } catch (TTransportException e) { + } catch (Exception e) { throw new PipeConnectionException( String.format( PipeConnectionException.CONNECTION_ERROR_FORMATTER, @@ -132,14 +140,41 @@ private void reconstructClient(TEndPoint endPoint) throws IOException { endPoint.getPort()), e); } + } + public void sendHandshakeReq( + Pair clientAndStatus, TEndPoint endPoint) { try { - final TPipeTransferResp resp = + final HashMap params = new HashMap<>(); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, + CommonDescriptor.getInstance().getConfig().getTimestampPrecision()); + params.put( + PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, + PipeAgent.runtime().getClusterIdIfPossible()); + + // Try to handshake by PipeTransferHandshakeV2Req. + TPipeTransferResp resp = clientAndStatus .getLeft() - .pipeTransfer( - PipeTransferHandshakeReq.toTPipeTransferReq( - CommonDescriptor.getInstance().getConfig().getTimestampPrecision())); + .pipeTransfer(PipeTransferHandshakeV2Req.toTPipeTransferReq(params)); + // Receiver may be an old version, so we need to retry to handshake by + // PipeTransferHandshakeV1Req. + if (resp.getStatus().getCode() == TSStatusCode.PIPE_TYPE_ERROR.getStatusCode()) { + LOGGER.info( + "Handshake error with target server ip: {}, port: {}, because: {}. " + + "Retry to handshake by PipeTransferHandshakeV1Req.", + endPoint.getIp(), + endPoint.getPort(), + resp.getStatus()); + resp = + clientAndStatus + .getLeft() + .pipeTransfer( + PipeTransferHandshakeV1Req.toTPipeTransferReq( + CommonDescriptor.getInstance().getConfig().getTimestampPrecision())); + } + if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { LOGGER.warn( "Handshake error with target server ip: {}, port: {}, because: {}.", @@ -156,7 +191,7 @@ private void reconstructClient(TEndPoint endPoint) throws IOException { endPoint.getIp(), endPoint.getPort()); } - } catch (TException e) { + } catch (Exception e) { LOGGER.warn( "Handshake error with target server ip: {}, port: {}, because: {}.", endPoint.getIp(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java index 67d2cbad1e85..da6f888bc571 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/thrift/IoTDBThriftReceiverV1.java @@ -31,11 +31,14 @@ import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.db.pipe.agent.PipeAgent; import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant; import org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq; -import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaPlanReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq; @@ -113,8 +116,10 @@ public synchronized TPipeTransferResp receive( final short rawRequestType = req.getType(); if (PipeRequestType.isValidatedRequestType(rawRequestType)) { switch (PipeRequestType.valueOf(rawRequestType)) { - case HANDSHAKE: - return handleTransferHandshake(PipeTransferHandshakeReq.fromTPipeTransferReq(req)); + case HANDSHAKE_V1: + return handleTransferHandshakeV1(PipeTransferHandshakeV1Req.fromTPipeTransferReq(req)); + case HANDSHAKE_V2: + return handleTransferHandshakeV2(PipeTransferHandshakeV2Req.fromTPipeTransferReq(req)); case TRANSFER_TABLET_INSERT_NODE: return handleTransferTabletInsertNode( PipeTransferTabletInsertNodeReq.fromTPipeTransferReq(req), @@ -170,7 +175,7 @@ public synchronized TPipeTransferResp receive( } } - private TPipeTransferResp handleTransferHandshake(PipeTransferHandshakeReq req) { + private TPipeTransferResp handleTransferHandshakeV1(PipeTransferHandshakeV1Req req) { if (!CommonDescriptor.getInstance() .getConfig() .getTimestampPrecision() @@ -246,6 +251,59 @@ private TPipeTransferResp handleTransferHandshake(PipeTransferHandshakeReq req) return new TPipeTransferResp(RpcUtils.SUCCESS_STATUS); } + private TPipeTransferResp handleTransferHandshakeV2(PipeTransferHandshakeV2Req req) + throws IOException { + // Reject to handshake if the receiver can not take clusterId from config node. + final String clusterIdFromConfigNode = PipeAgent.runtime().getClusterIdIfPossible(); + if (clusterIdFromConfigNode == null) { + final TSStatus status = + RpcUtils.getStatus( + TSStatusCode.PIPE_HANDSHAKE_ERROR, + "Receiver can not get clusterId from config node."); + LOGGER.warn("Handshake failed, response status = {}.", status); + return new TPipeTransferResp(status); + } + + // Reject to handshake if the request does not contain sender's clusterId. + final String clusterIdFromHandshakeRequest = + req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID); + if (clusterIdFromHandshakeRequest == null) { + final TSStatus status = + RpcUtils.getStatus( + TSStatusCode.PIPE_HANDSHAKE_ERROR, "Handshake request does not contain clusterId."); + LOGGER.warn("Handshake failed, response status = {}.", status); + return new TPipeTransferResp(status); + } + + // Reject to handshake if the receiver and sender are from the same cluster. + if (Objects.equals(clusterIdFromConfigNode, clusterIdFromHandshakeRequest)) { + final TSStatus status = + RpcUtils.getStatus( + TSStatusCode.PIPE_HANDSHAKE_ERROR, + String.format( + "Receiver and sender are from the same cluster %s.", + clusterIdFromHandshakeRequest)); + LOGGER.warn("Handshake failed, response status = {}.", status); + return new TPipeTransferResp(status); + } + + // Reject to handshake if the request does not contain timestampPrecision. + final String timestampPrecision = + req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION); + if (timestampPrecision == null) { + final TSStatus status = + RpcUtils.getStatus( + TSStatusCode.PIPE_HANDSHAKE_ERROR, + "Handshake request does not contain timestampPrecision."); + LOGGER.warn("Handshake failed, response status = {}.", status); + return new TPipeTransferResp(status); + } + + // Handle the handshake request as a v1 request. + return handleTransferHandshakeV1( + PipeTransferHandshakeV1Req.toTPipeTransferReq(timestampPrecision)); + } + private TPipeTransferResp handleTransferTabletInsertNode( PipeTransferTabletInsertNodeReq req, IPartitionFetcher partitionFetcher, diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java index 7fd72c51e206..0aba9930704e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeDataNodeThriftRequestTest.java @@ -20,13 +20,18 @@ package org.apache.iotdb.db.pipe.connector; import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion; +import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType; import org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotPieceReq; import org.apache.iotdb.commons.pipe.connector.payload.request.PipeTransferSnapshotSealReq; import org.apache.iotdb.commons.pipe.connector.payload.response.PipeTransferSnapshotPieceResp; +import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapPseudoTPipeTransferRequest; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant; import org.apache.iotdb.db.pipe.connector.payload.evolvable.reponse.PipeTransferFilePieceResp; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq; -import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferSchemaPlanReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBatchReq; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq; @@ -52,16 +57,19 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; public class PipeDataNodeThriftRequestTest { private static final String TIME_PRECISION = "ms"; + private static final String CLUSTER_ID = "abcde"; @Test - public void testPipeValidateHandshakeReq() throws IOException { - PipeTransferHandshakeReq req = PipeTransferHandshakeReq.toTPipeTransferReq(TIME_PRECISION); - PipeTransferHandshakeReq deserializeReq = PipeTransferHandshakeReq.fromTPipeTransferReq(req); + public void testPipeValidateHandshakeV1Req() throws IOException { + PipeTransferHandshakeV1Req req = PipeTransferHandshakeV1Req.toTPipeTransferReq(TIME_PRECISION); + PipeTransferHandshakeV1Req deserializeReq = + PipeTransferHandshakeV1Req.fromTPipeTransferReq(req); Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); Assert.assertEquals(req.getType(), deserializeReq.getType()); @@ -70,6 +78,58 @@ public void testPipeValidateHandshakeReq() throws IOException { Assert.assertEquals(req.getTimestampPrecision(), deserializeReq.getTimestampPrecision()); } + @Test + public void testPipeValidateHandshakeV2Req() throws Exception { + HashMap params = new HashMap<>(); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, CLUSTER_ID); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, TIME_PRECISION); + params.put("Nullable", null); + + PipeTransferHandshakeV2Req req = PipeTransferHandshakeV2Req.toTPipeTransferReq(params); + PipeTransferHandshakeV2Req deserializeReq = + PipeTransferHandshakeV2Req.fromTPipeTransferReq(req); + + Assert.assertEquals(req.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(req.getType(), deserializeReq.getType()); + Assert.assertArrayEquals(req.getBody(), deserializeReq.getBody()); + + Assert.assertEquals( + req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID), + deserializeReq.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID)); + Assert.assertEquals( + req.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION), + deserializeReq.getParams().get(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION)); + Assert.assertEquals( + req.getParams().get("Nullable"), deserializeReq.getParams().get("Nullable")); + } + + @Test + public void testPipeValidateHandshakeV2Req4AirGap() throws IOException { + // Construct byteBuffer. + HashMap params = new HashMap<>(); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID, CLUSTER_ID); + params.put(PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION, TIME_PRECISION); + params.put("Nullable", null); + ByteBuffer byteBuffer = + ByteBuffer.wrap(PipeTransferHandshakeV2Req.toTransferHandshakeBytes(params)); + + // Construct request. + byte version = ReadWriteIOUtils.readByte(byteBuffer); + short type = ReadWriteIOUtils.readShort(byteBuffer); + ByteBuffer body = byteBuffer.slice(); + final AirGapPseudoTPipeTransferRequest req = + (AirGapPseudoTPipeTransferRequest) + new AirGapPseudoTPipeTransferRequest().setVersion(version).setType(type).setBody(body); + final PipeTransferHandshakeV2Req deserializeReq = + PipeTransferHandshakeV2Req.fromTPipeTransferReq(req); + + // Assert. + Assert.assertEquals( + IoTDBConnectorRequestVersion.VERSION_1.getVersion(), deserializeReq.getVersion()); + Assert.assertEquals(PipeRequestType.HANDSHAKE_V2.getType(), deserializeReq.getType()); + Assert.assertEquals(params, deserializeReq.getParams()); + } + @Test public void testPipeTransferInsertNodeReq() { PipeTransferTabletInsertNodeReq req = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java index bc8a0c8e29dd..d851219137ba 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeReceiverTest.java @@ -20,7 +20,7 @@ package org.apache.iotdb.db.pipe.connector; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq; import org.apache.iotdb.db.pipe.receiver.thrift.IoTDBThriftReceiverV1; import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher; @@ -43,7 +43,7 @@ public void testIoTDBThriftReceiverV1() { IoTDBThriftReceiverV1 receiver = new IoTDBThriftReceiverV1(); try { receiver.receive( - PipeTransferHandshakeReq.toTPipeTransferReq( + PipeTransferHandshakeV1Req.toTPipeTransferReq( CommonDescriptor.getInstance().getConfig().getTimestampPrecision()), mock(IPartitionFetcher.class), mock(ISchemaFetcher.class)); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java index 1d9528a4318e..be35ae36a61d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/connector/payload/request/PipeRequestType.java @@ -24,7 +24,7 @@ import java.util.Map; public enum PipeRequestType { - HANDSHAKE((short) 1), + HANDSHAKE_V1((short) 1), TRANSFER_TABLET_INSERT_NODE((short) 2), TRANSFER_TABLET_RAW((short) 3), @@ -40,6 +40,8 @@ public enum PipeRequestType { TRANSFER_SNAPSHOT_PIECE((short) 10), TRANSFER_SNAPSHOT_SEAL((short) 11), + + HANDSHAKE_V2((short) 12), ; private final short type;