Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: using clusterId to judge whether the target cluster is source cluster #11994

Merged
merged 24 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
694cd6e
update a new handshake request
xuanronaldo Jan 28, 2024
7ac2296
format code
xuanronaldo Jan 28, 2024
248ae19
1. rename the PipeTransferHandshakeReq to PipeTransferHandshakeV1Req.
xuanronaldo Jan 29, 2024
ff01ee9
handle PipeTransferHandshakeV2Req in sync connector
xuanronaldo Jan 29, 2024
ab1419d
1. send PipeTransferHandshakeV2Req default in async connector
xuanronaldo Jan 29, 2024
b71d031
add PipeConstant class
xuanronaldo Jan 29, 2024
c4577c9
format code
xuanronaldo Jan 29, 2024
2a91d77
fix problems of comments
xuanronaldo Jan 30, 2024
dbc53d2
Merge branch 'master' of https://github.com/apache/iotdb into TIMECHO…
xuanronaldo Jan 30, 2024
0c1d93c
add apache rat in PipeTransferHandshakeV2Req
xuanronaldo Jan 30, 2024
288419d
fix problems of comments.
xuanronaldo Jan 31, 2024
d975937
rename PipeConstant to PipeConnectorConstant
xuanronaldo Jan 31, 2024
c3ef4aa
format code
xuanronaldo Jan 31, 2024
4c04878
Merge branch 'master' of https://github.com/apache/iotdb into pr/11994
SteveYurongSu Feb 1, 2024
1c3d981
fix
SteveYurongSu Feb 1, 2024
cb63937
refactor
SteveYurongSu Feb 2, 2024
47edecf
RENAME
SteveYurongSu Feb 2, 2024
68c380b
fix receiver
SteveYurongSu Feb 2, 2024
4cb7011
Update IoTDBThriftReceiverV1.java
SteveYurongSu Feb 2, 2024
f9a1d71
Update IoTDBThriftSyncClientManager.java
SteveYurongSu Feb 2, 2024
f5f79f8
Update IoTDBThriftAsyncClientManager.java
SteveYurongSu Feb 2, 2024
d6db5f1
Update IoTDBAirGapConnector.java
SteveYurongSu Feb 2, 2024
075f8bc
refactor
SteveYurongSu Feb 2, 2024
7efd38f
Update PipeTransferHandshakeV2Req.java
SteveYurongSu Feb 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.progress.assigner.SimpleConsensusProgressIndexAssigner;
import org.apache.iotdb.db.pipe.resource.PipeHardlinkFileDirStartupCleaner;
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.service.ResourcesInformationHolder;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
Expand All @@ -41,20 +44,22 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class PipeRuntimeAgent implements IService {

private static final Logger LOGGER = LoggerFactory.getLogger(PipeRuntimeAgent.class);
private static final int DATA_NODE_ID = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();

private final AtomicBoolean isShutdown = new AtomicBoolean(false);

private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
new PipePeriodicalJobExecutor();
private final AtomicReference<String> clusterId = new AtomicReference<>(null);

private final SimpleConsensusProgressIndexAssigner simpleConsensusProgressIndexAssigner =
new SimpleConsensusProgressIndexAssigner();

private final PipePeriodicalJobExecutor pipePeriodicalJobExecutor =
new PipePeriodicalJobExecutor();

//////////////////////////// System Service Interface ////////////////////////////

public synchronized void preparePipeResources(
Expand Down Expand Up @@ -103,6 +108,22 @@ public ServiceType getID() {
return ServiceType.PIPE_RUNTIME_AGENT;
}

public String getClusterIdIfPossible() {
if (clusterId.get() == null) {
synchronized (clusterId) {
if (clusterId.get() == null) {
try (final ConfigNodeClient configNodeClient =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
clusterId.set(configNodeClient.getClusterId().getClusterId());
} catch (Exception e) {
LOGGER.warn("Unable to get clusterId, because: {}", e.getMessage(), e);
}
}
}
}
return clusterId.get();
}

////////////////////// SimpleConsensus ProgressIndex Assigner //////////////////////

public void assignSimpleProgressIndexIfNeeded(InsertNode insertNode) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.connector.payload.evolvable.common;

public class PipeTransferHandshakeConstant {

public static final String HANDSHAKE_KEY_TIME_PRECISION = "timestampPrecision";
public static final String HANDSHAKE_KEY_CLUSTER_ID = "clusterID";

private PipeTransferHandshakeConstant() {
// Utility class
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import java.nio.ByteBuffer;
import java.util.Objects;

public class PipeTransferHandshakeReq extends TPipeTransferReq {
public class PipeTransferHandshakeV1Req extends TPipeTransferReq {

private transient String timestampPrecision;

private PipeTransferHandshakeReq() {
private PipeTransferHandshakeV1Req() {
// Empty constructor
}

Expand All @@ -44,14 +44,14 @@ public String getTimestampPrecision() {

/////////////////////////////// Thrift ///////////////////////////////

public static PipeTransferHandshakeReq toTPipeTransferReq(String timestampPrecision)
public static PipeTransferHandshakeV1Req toTPipeTransferReq(String timestampPrecision)
throws IOException {
final PipeTransferHandshakeReq handshakeReq = new PipeTransferHandshakeReq();
final PipeTransferHandshakeV1Req handshakeReq = new PipeTransferHandshakeV1Req();

handshakeReq.timestampPrecision = timestampPrecision;

handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
handshakeReq.type = PipeRequestType.HANDSHAKE.getType();
handshakeReq.type = PipeRequestType.HANDSHAKE_V1.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(timestampPrecision, outputStream);
Expand All @@ -62,8 +62,8 @@ public static PipeTransferHandshakeReq toTPipeTransferReq(String timestampPrecis
return handshakeReq;
}

public static PipeTransferHandshakeReq fromTPipeTransferReq(TPipeTransferReq transferReq) {
final PipeTransferHandshakeReq handshakeReq = new PipeTransferHandshakeReq();
public static PipeTransferHandshakeV1Req fromTPipeTransferReq(TPipeTransferReq transferReq) {
final PipeTransferHandshakeV1Req handshakeReq = new PipeTransferHandshakeV1Req();

handshakeReq.timestampPrecision = ReadWriteIOUtils.readString(transferReq.body);

Expand All @@ -80,7 +80,7 @@ public static byte[] toTransferHandshakeBytes(String timestampPrecision) throws
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), outputStream);
ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE.getType(), outputStream);
ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE_V1.getType(), outputStream);
ReadWriteIOUtils.write(timestampPrecision, outputStream);
return byteArrayOutputStream.toByteArray();
}
Expand All @@ -96,7 +96,7 @@ public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
PipeTransferHandshakeReq that = (PipeTransferHandshakeReq) obj;
PipeTransferHandshakeV1Req that = (PipeTransferHandshakeV1Req) obj;
return timestampPrecision.equals(that.timestampPrecision)
&& version == that.version
&& type == that.type
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.pipe.connector.payload.evolvable.request;

import org.apache.iotdb.commons.pipe.connector.payload.request.IoTDBConnectorRequestVersion;
import org.apache.iotdb.commons.pipe.connector.payload.request.PipeRequestType;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;

import org.apache.thrift.TException;

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public class PipeTransferHandshakeV2Req extends TPipeTransferReq {

private transient Map<String, String> params;

private PipeTransferHandshakeV2Req() {
// Empty constructor
}

public Map<String, String> getParams() {
return params;
}

/////////////////////////////// Thrift ///////////////////////////////

public static PipeTransferHandshakeV2Req toTPipeTransferReq(Map<String, String> params)
throws TException, IOException {
final PipeTransferHandshakeV2Req handshakeReq = new PipeTransferHandshakeV2Req();

handshakeReq.version = IoTDBConnectorRequestVersion.VERSION_1.getVersion();
handshakeReq.type = PipeRequestType.HANDSHAKE_V2.getType();
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(params.size(), outputStream);
for (final Map.Entry<String, String> entry : params.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
handshakeReq.body =
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
}

handshakeReq.params = params;

return handshakeReq;
}

public static PipeTransferHandshakeV2Req fromTPipeTransferReq(TPipeTransferReq transferReq) {
final PipeTransferHandshakeV2Req handshakeReq = new PipeTransferHandshakeV2Req();

Map<String, String> params = new HashMap<>();
final int size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
final String key = ReadWriteIOUtils.readString(transferReq.body);
final String value = ReadWriteIOUtils.readString(transferReq.body);
params.put(key, value);
}
handshakeReq.params = params;

handshakeReq.version = transferReq.version;
handshakeReq.type = transferReq.type;
handshakeReq.body = transferReq.body;

return handshakeReq;
}

/////////////////////////////// Air Gap ///////////////////////////////

public static byte[] toTransferHandshakeBytes(HashMap<String, String> params) throws IOException {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
ReadWriteIOUtils.write(IoTDBConnectorRequestVersion.VERSION_1.getVersion(), outputStream);
ReadWriteIOUtils.write(PipeRequestType.HANDSHAKE_V2.getType(), outputStream);
ReadWriteIOUtils.write(params.size(), outputStream);
for (final Map.Entry<String, String> entry : params.entrySet()) {
ReadWriteIOUtils.write(entry.getKey(), outputStream);
ReadWriteIOUtils.write(entry.getValue(), outputStream);
}
return byteArrayOutputStream.toByteArray();
}
}

/////////////////////////////// Object ///////////////////////////////

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
PipeTransferHandshakeV2Req that = (PipeTransferHandshakeV2Req) obj;
return Objects.equals(params, that.params)
&& version == that.version
&& type == that.type
&& Objects.equals(body, that.body);
}

@Override
public int hashCode() {
return Objects.hash(params, version, type, body);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeAgent;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapELanguageConstant;
import org.apache.iotdb.db.pipe.connector.payload.airgap.AirGapOneByteResponse;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.common.PipeTransferHandshakeConstant;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFilePieceReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferFileSealReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV1Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferHandshakeV2Req;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
Expand Down Expand Up @@ -63,6 +66,7 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -194,11 +198,23 @@ public void handshake() throws Exception {
continue;
}

if (!send(
socket,
PipeTransferHandshakeReq.toTransferHandshakeBytes(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()))) {
throw new PipeException("Handshake error with target server ip: " + ip + ", port: " + port);
final HashMap<String, String> params = new HashMap<>();
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_CLUSTER_ID,
PipeAgent.runtime().getClusterIdIfPossible());
params.put(
PipeTransferHandshakeConstant.HANDSHAKE_KEY_TIME_PRECISION,
CommonDescriptor.getInstance().getConfig().getTimestampPrecision());

// Try to handshake by PipeTransferHandshakeV2Req. If failed, retry to handshake by
// PipeTransferHandshakeV1Req. If failed again, throw PipeConnectionException.
if (!send(socket, PipeTransferHandshakeV2Req.toTransferHandshakeBytes(params))
&& !send(
socket,
PipeTransferHandshakeV1Req.toTransferHandshakeBytes(
CommonDescriptor.getInstance().getConfig().getTimestampPrecision()))) {
throw new PipeConnectionException(
"Handshake error with target server ip: " + ip + ", port: " + port);
} else {
isSocketAlive.set(i, true);
socket.setSoTimeout((int) PIPE_CONFIG.getPipeConnectorTransferTimeoutMs());
Expand Down
Loading
Loading