Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: implement RenameDatabaseProcessor & WriteBackConnector for table model events #14131

Merged
merged 12 commits into from
Nov 26, 2024
18 changes: 18 additions & 0 deletions .github/workflows/pipe-it-2cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ jobs:
with:
distribution: liberica
java-version: ${{ matrix.java }}
- name: Sleep for a random duration between 0 and 10000 milliseconds
run: |
sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- name: IT Test
shell: bash
# we do not compile client-cpp for saving time, it is tested in client.yml
Expand Down Expand Up @@ -108,6 +111,9 @@ jobs:
with:
distribution: liberica
java-version: ${{ matrix.java }}
- name: Sleep for a random duration between 0 and 10000 milliseconds
run: |
sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- name: IT Test
shell: bash
# we do not compile client-cpp for saving time, it is tested in client.yml
Expand Down Expand Up @@ -146,6 +152,9 @@ jobs:
with:
distribution: liberica
java-version: ${{ matrix.java }}
- name: Sleep for a random duration between 0 and 10000 milliseconds
run: |
sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- name: IT Test
shell: bash
# we do not compile client-cpp for saving time, it is tested in client.yml
Expand Down Expand Up @@ -184,6 +193,9 @@ jobs:
with:
distribution: liberica
java-version: ${{ matrix.java }}
- name: Sleep for a random duration between 0 and 10000 milliseconds
run: |
sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- name: IT Test
shell: bash
# we do not compile client-cpp for saving time, it is tested in client.yml
Expand Down Expand Up @@ -222,6 +234,9 @@ jobs:
with:
distribution: liberica
java-version: ${{ matrix.java }}
- name: Sleep for a random duration between 0 and 10000 milliseconds
run: |
sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- name: IT Test
shell: bash
# we do not compile client-cpp for saving time, it is tested in client.yml
Expand Down Expand Up @@ -259,6 +274,9 @@ jobs:
with:
distribution: liberica
java-version: ${{ matrix.java }}
- name: Sleep for a random duration between 0 and 10000 milliseconds
run: |
sleep $(( $(( RANDOM % 10000 + 1 )) / 1000))
- name: IT Test
shell: bash
# we do not compile client-cpp for saving time, it is tested in client.yml
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,4 +216,63 @@ private void testSinkFormat(final String format) throws Exception {
TableModelUtils.assertCountData("test", "test", 150, receiverEnv);
}
}

@Test
public void testWriteBackSink() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {

TableModelUtils.createDataBaseAndTable(senderEnv, "test", "test");
TableModelUtils.insertData("test", "test", 0, 50, senderEnv, true);

if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList("insert into root.vehicle.d0(time, s1) values (0, 1)", "flush"))) {
return;
}

final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("capture.table", "true");
extractorAttributes.put("capture.tree", "true");
extractorAttributes.put("forwarding-pipe-requests", "false");

processorAttributes.put("processor", "rename-database-processor");
processorAttributes.put("processor.new-db-name", "test1");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.batch.enable", "true");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));
connectorAttributes.put("connector.realtime-first", "false");

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

TableModelUtils.insertData("test", "test", 50, 100, senderEnv, true);

if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList("insert into root.vehicle.d0(time, s1) values (1, 1)", "flush"))) {
return;
}

TableModelUtils.assertCountData("test1", "test", 100, receiverEnv);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.iotdb.db.pipe.processor.downsampling.sdt.SwingingDoorTrendingSamplingProcessor;
import org.apache.iotdb.db.pipe.processor.downsampling.tumbling.TumblingTimeSamplingProcessor;
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.pipe.processor.schemachange.RenameDatabaseProcessor;
import org.apache.iotdb.db.pipe.processor.twostage.plugin.TwoStageCountProcessor;

class PipeDataRegionProcessorConstructor extends PipeProcessorConstructor {
Expand Down Expand Up @@ -68,5 +69,8 @@ protected void initConstructors() {
pluginConstructors.put(
BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(),
PipeConsensusProcessor::new);
pluginConstructors.put(
BuiltinPipePlugin.RENAME_DATABASE_PROCESSOR.getPipePluginName(),
RenameDatabaseProcessor::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,45 +20,46 @@
package org.apache.iotdb.db.pipe.connector.protocol.writeback;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletBinaryReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletInsertNodeReqV2;
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReqV2;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.protocol.session.IClientSession;
import org.apache.iotdb.db.protocol.session.InternalClientSession;
import org.apache.iotdb.db.protocol.session.SessionManager;
import org.apache.iotdb.db.queryengine.common.SessionInfo;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
import org.apache.iotdb.pipe.api.PipeConnector;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.configuration.PipeRuntimeEnvironment;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
import org.apache.iotdb.pipe.api.event.Event;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.ZoneId;
import java.io.IOException;
import java.util.Objects;

public class WriteBackConnector implements PipeConnector {

private static final Logger LOGGER = LoggerFactory.getLogger(WriteBackConnector.class);

// Simulate the behavior of the client-to-server communication
// for correctly handling data insertion in IoTDBReceiverAgent#receive method
private static final Coordinator COORDINATOR = Coordinator.getInstance();
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
private IClientSession session;

private static final String TREE_MODEL_DATABASE_NAME_IDENTIFIER = null;

@Override
public void validate(final PipeParameterValidator validator) throws Exception {
// Do nothing
Expand All @@ -68,7 +69,16 @@ public void validate(final PipeParameterValidator validator) throws Exception {
public void customize(
final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
throws Exception {
// Do nothing
final PipeRuntimeEnvironment environment = configuration.getRuntimeEnvironment();
session =
new InternalClientSession(
String.format(
"%s_%s_%s_%s",
WriteBackConnector.class.getSimpleName(),
environment.getPipeName(),
environment.getCreationTime(),
environment.getRegionId()));
SESSION_MANAGER.registerSession(session);
}

@Override
Expand Down Expand Up @@ -101,16 +111,9 @@ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exc
}
}

@Override
public void transfer(final Event event) throws Exception {
if (!(event instanceof PipeHeartbeatEvent || event instanceof PipeTerminateEvent)) {
LOGGER.warn("WriteBackConnector does not support transferring generic event: {}.", event);
}
}

private void doTransferWrapper(
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
throws PipeException, WALPipeException {
throws PipeException, WALPipeException, IOException {
// We increase the reference count for this event to determine if the event may be released.
if (!pipeInsertNodeTabletInsertionEvent.increaseReferenceCount(
WriteBackConnector.class.getName())) {
Expand All @@ -126,29 +129,28 @@ private void doTransferWrapper(

private void doTransfer(
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
throws PipeException, WALPipeException {
final TSStatus status;

throws PipeException, WALPipeException, IOException {
final InsertNode insertNode =
pipeInsertNodeTabletInsertionEvent.getInsertNodeViaCacheIfPossible();
if (Objects.isNull(insertNode)) {
status =
PipeDataNodeAgent.receiver()
.thrift()
.receive(
PipeTransferTabletBinaryReq.toTPipeTransferReq(
pipeInsertNodeTabletInsertionEvent.getByteBuffer()))
.getStatus();
} else {
final InsertBaseStatement statement =
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(insertNode).constructStatement();
status = statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : executeStatement(statement);
}
final String dataBaseName =
pipeInsertNodeTabletInsertionEvent.isTableModelEvent()
? pipeInsertNodeTabletInsertionEvent.getTableModelDatabaseName()
: TREE_MODEL_DATABASE_NAME_IDENTIFIER;

final TSStatus status =
PipeDataNodeAgent.receiver()
.thrift()
.receive(
Objects.isNull(insertNode)
? PipeTransferTabletBinaryReqV2.toTPipeTransferReq(
pipeInsertNodeTabletInsertionEvent.getByteBuffer(), dataBaseName)
: PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(
insertNode, dataBaseName))
.getStatus();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
"Transfer PipeInsertNodeTabletInsertionEvent %s error, result status %s",
"Write back PipeInsertNodeTabletInsertionEvent %s error, result status %s",
pipeInsertNodeTabletInsertionEvent, status));
}
}
Expand All @@ -168,37 +170,35 @@ private void doTransferWrapper(final PipeRawTabletInsertionEvent pipeRawTabletIn

private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent)
throws PipeException {
final InsertBaseStatement statement =
PipeTransferTabletRawReq.toTPipeTransferRawReq(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.isAligned())
.constructStatement();
final TSStatus status =
statement.isEmpty() ? RpcUtils.SUCCESS_STATUS : executeStatement(statement);

PipeDataNodeAgent.receiver()
.thrift()
.receive(
PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
pipeRawTabletInsertionEvent.convertToTablet(),
pipeRawTabletInsertionEvent.isAligned(),
pipeRawTabletInsertionEvent.isTableModelEvent()
? pipeRawTabletInsertionEvent.getTableModelDatabaseName()
: TREE_MODEL_DATABASE_NAME_IDENTIFIER))
.getStatus();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new PipeException(
String.format(
"Transfer PipeRawTabletInsertionEvent %s error, result status %s",
"Write back PipeRawTabletInsertionEvent %s error, result status %s",
pipeRawTabletInsertionEvent, status));
}
}

private TSStatus executeStatement(final InsertBaseStatement statement) {
return Coordinator.getInstance()
.executeForTreeModel(
new PipeEnrichedStatement(statement),
SessionManager.getInstance().requestQueryId(),
new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()),
"",
ClusterPartitionFetcher.getInstance(),
ClusterSchemaFetcher.getInstance(),
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
.status;
@Override
public void transfer(final Event event) throws Exception {
// Ignore the event except TabletInsertionEvent
}

@Override
public void close() throws Exception {
// Do nothing
if (session != null) {
SESSION_MANAGER.closeSession(session, COORDINATOR::cleanupQueryExecution);
}
SESSION_MANAGER.removeCurrSession();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,8 @@ public String getTableModelDatabaseName() {
: treeModelDatabaseName
: tableModelDatabaseName;
}

public void renameTableModelDatabase(final String tableModelDatabaseName) {
this.tableModelDatabaseName = tableModelDatabaseName;
}
}
Loading
Loading