Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipe: Implement table meta sync on configNode #14156

Open
wants to merge 64 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
866b05f
partial
Caideyipi Nov 8, 2024
341deb9
partial
Caideyipi Nov 8, 2024
6292602
Update CNPhysicalPlanGenerator.java
Caideyipi Nov 8, 2024
9a9cd57
Merge branch 'master' of https://github.com/apache/iotdb into sync-meta
Caideyipi Nov 18, 2024
706d6f4
init
Caideyipi Nov 18, 2024
bfadb3e
Update IoTDBConfigNodeReceiver.java
Caideyipi Nov 18, 2024
def1fac
Update IoTDBConfigNodeReceiver.java
Caideyipi Nov 18, 2024
84f0e02
partial
Caideyipi Nov 18, 2024
f3db694
partial
Caideyipi Nov 19, 2024
c522e28
Merge branch 'master' of https://github.com/apache/iotdb into sync-meta
Caideyipi Nov 19, 2024
551c3e3
Continue
Caideyipi Nov 19, 2024
6939cc3
partial
Caideyipi Nov 19, 2024
1281acd
test
Caideyipi Nov 19, 2024
b00fbce
Update ConfigPhysicalPlanVisitor.java
Caideyipi Nov 19, 2024
c9a102f
partial
Caideyipi Nov 19, 2024
4ae5f57
Merge branch 'master' of https://github.com/apache/iotdb into sync-meta
Caideyipi Nov 19, 2024
9e5ed6a
Update IoTDBConfigNodeReceiver.java
Caideyipi Nov 19, 2024
0c7d460
Update ConfigRegionListeningQueue.java
Caideyipi Nov 19, 2024
4439cfa
init handler
Caideyipi Nov 19, 2024
820abc3
partial
Caideyipi Nov 19, 2024
2b91cca
Update IoTDBConfigRegionExtractor.java
Caideyipi Nov 19, 2024
edc76dc
Fix
Caideyipi Nov 19, 2024
8d5b58e
partial
Caideyipi Nov 19, 2024
c9ed057
Create PipeConfigPhysicalPlanTablePatternParseVisitor.java
Caideyipi Nov 19, 2024
32082d1
partial
Caideyipi Nov 19, 2024
5c1278e
Added tests
Caideyipi Nov 19, 2024
6c59e07
Merge branch 'master' of https://github.com/apache/iotdb into sync-meta
Caideyipi Nov 19, 2024
99247e5
Fix
Caideyipi Nov 19, 2024
d00b851
Update PipeConfigPhysicalPlanTablePatternParseVisitor.java
Caideyipi Nov 20, 2024
24442d5
Update PipeConfigPhysicalPlanTablePatternParseVisitorTest.java
Caideyipi Nov 20, 2024
3b51426
Fix
Caideyipi Nov 20, 2024
10c44ec
Update PipeConfigPhysicalPlanTSStatusVisitor.java
Caideyipi Nov 20, 2024
f2d88bb
Update IoTDBConfigRegionExtractor.java
Caideyipi Nov 20, 2024
37148f6
Merge branch 'master' of https://github.com/apache/iotdb into sync-meta
Caideyipi Nov 20, 2024
b188dee
refactor
Caideyipi Nov 20, 2024
0c09baf
Update IoTDBConfigRegionExtractor.java
Caideyipi Nov 20, 2024
db6db86
parse
Caideyipi Nov 20, 2024
d3320f7
Update PipeTransferConfigSnapshotSealReq.java
Caideyipi Nov 20, 2024
331c2ad
continue
Caideyipi Nov 20, 2024
b1c0227
next
Caideyipi Nov 20, 2024
6a40e81
continue deletedatabase
Caideyipi Nov 20, 2024
2861f37
Update DeleteDatabaseProcedure.java
Caideyipi Nov 20, 2024
0f20f2c
continue
Caideyipi Nov 20, 2024
544d003
delete devices
Caideyipi Nov 20, 2024
c93e780
Update PipeDeleteDevicesPlan.java
Caideyipi Nov 20, 2024
7c1a99f
continue
Caideyipi Nov 20, 2024
1d2a6bc
Merge branch 'master' of https://github.com/apache/iotdb into sync-meta
Caideyipi Nov 20, 2024
a2893e9
Update PipeDeleteDevicesPlan.java
Caideyipi Nov 20, 2024
61ad094
Update PipeConfigPhysicalPlanTSStatusVisitor.java
Caideyipi Nov 20, 2024
241cae3
Update PipeConfigPhysicalPlanTSStatusVisitor.java
Caideyipi Nov 20, 2024
94804b7
continue
Caideyipi Nov 20, 2024
e5fc6b0
continue
Caideyipi Nov 20, 2024
1c39a6e
Update IoTDBPipeTableManualIT.java
Caideyipi Nov 20, 2024
0d7be44
Update IoTDBPipeTableManualIT.java
Caideyipi Nov 21, 2024
f442478
Add parsing
Caideyipi Nov 21, 2024
aa8db1f
Update PipeConfigPhysicalPlanTablePatternParseVisitorTest.java
Caideyipi Nov 21, 2024
61e7d51
Merge branch 'master' of https://github.com/apache/iotdb into sync-meta
Caideyipi Nov 21, 2024
ea117ef
Update PipeConfigPhysicalPlanTSStatusVisitor.java
Caideyipi Nov 21, 2024
1bcac04
Merge remote-tracking branch 'upstream/master' into sync-meta
Caideyipi Nov 21, 2024
5545ea9
Update PipeConfigPhysicalPlanTablePatternParseVisitorTest.java
Caideyipi Nov 21, 2024
3febaae
Update TestUtils.java
Caideyipi Nov 21, 2024
8313358
Update IoTDBPipeTableManualIT.java
Caideyipi Nov 21, 2024
bc555ef
partial sr
Caideyipi Nov 21, 2024
3bec37a
Update IoTDBPipeTableManualIT.java
Caideyipi Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -1037,8 +1038,8 @@ public static void assertDataEventuallyOnEnv(
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
}
} catch (Exception e) {
Assert.fail();
} catch (final Exception e) {
Assert.fail(e.getMessage());
}
});
} catch (Exception e) {
Expand Down Expand Up @@ -1077,17 +1078,32 @@ public static void assertDataEventuallyOnEnv(
}

public static void assertDataAlwaysOnEnv(
BaseEnv env, String sql, String expectedHeader, Set<String> expectedResSet) {
assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10);
final BaseEnv env,
final String sql,
final String expectedHeader,
final Set<String> expectedResSet) {
assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10, null);
}

public static void assertDataAlwaysOnEnv(
final BaseEnv env,
final String sql,
final String expectedHeader,
final Set<String> expectedResSet,
final String database) {
assertDataAlwaysOnEnv(env, sql, expectedHeader, expectedResSet, 10, database);
}

public static void assertDataAlwaysOnEnv(
BaseEnv env,
String sql,
String expectedHeader,
Set<String> expectedResSet,
long consistentSeconds) {
try (Connection connection = env.getConnection();
long consistentSeconds,
final String database) {
try (Connection connection =
env.getConnection(
Objects.isNull(database) ? BaseEnv.TREE_SQL_DIALECT : BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
// Keep retrying if there are execution failures
await()
Expand All @@ -1098,6 +1114,9 @@ public static void assertDataAlwaysOnEnv(
.failFast(
() -> {
try {
if (Objects.nonNull(database)) {
statement.execute("use " + database);
}
TestUtils.assertResultSetEqual(
executeQueryWithRetry(statement, sql), expectedHeader, expectedResSet);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.pipe.it.manual;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
import org.apache.iotdb.itbase.env.BaseEnv;
import org.apache.iotdb.pipe.it.tablemodel.TableModelUtils;
import org.apache.iotdb.rpc.TSStatusCode;

import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2ManualCreateSchema.class})
public class IoTDBPipeTableManualIT extends AbstractPipeDualManualIT {
@Test
public void testTableSync() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.inclusion", "all");
extractorAttributes.put("extractor.inclusion.exclusion", "data.delete");
extractorAttributes.put("extractor.capture.tree", "false");
extractorAttributes.put("extractor.capture.table", "true");
extractorAttributes.put("extractor.database-name", "test");
extractorAttributes.put("extractor.table-name", "t.*[0-9]");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

final String dbName = "test";
TableModelUtils.createDataBase(senderEnv, dbName, 300);

if (!TestUtils.tryExecuteNonQueriesWithRetry(
dbName,
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
Arrays.asList(
"create table table1(a id, b attribute, c int32) with (ttl=3000)",
"alter table table1 add column d int64",
"alter table table1 drop column b",
"alter table table1 set properties ttl=default",
"insert into table1 (a, c, d) values(1, 1, 1)",
"delete devices from table1",
"create table noTransferTable(a id, b attribute, c int32) with (ttl=3000)"))) {
return;
}

TableModelUtils.createDataBase(senderEnv, "noTransferDatabase", 300);

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"show tables from test",
"TableName,TTL(ms),",
Collections.singleton("table1,300,"),
dbName);

// Will not include no-transfer table
TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
"show tables from test",
"TableName,TTL(ms),",
Collections.singleton("table1,300,"),
dbName);

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"desc table1",
"ColumnName,DataType,Category,",
new HashSet<>(
Arrays.asList(
"time,TIMESTAMP,TIME,",
"a,STRING,ID,",
"c,INT32,MEASUREMENT,",
"d,INT64,MEASUREMENT,")),
dbName);

if (!TestUtils.tryExecuteNonQueryWithRetry(
dbName, BaseEnv.TABLE_SQL_DIALECT, senderEnv, "drop table table1")) {
return;
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"show tables from test",
"TableName,TTL(ms),",
Collections.emptySet(),
dbName);

if (!TestUtils.tryExecuteNonQueryWithRetry(
dbName, BaseEnv.TABLE_SQL_DIALECT, senderEnv, "drop database test")) {
return;
}

// Will not include no-transfer database
TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"show databases",
"Database,TTL(ms),SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,",
Collections.emptySet(),
null);
}
}

@Test
public void testNoTree() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.inclusion", "all");
extractorAttributes.put("extractor.inclusion.exclusion", "data.delete");
extractorAttributes.put("extractor.capture.tree", "false");
extractorAttributes.put("extractor.capture.table", "true");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

if (!TestUtils.tryExecuteNonQueriesWithRetry(
senderEnv,
Arrays.asList(
"create database root.test",
"alter database root.test with schema_region_group_num=2, data_region_group_num=3",
"create timeSeries root.test.d1.s1 int32",
"insert into root.test.d1 (s1) values (1)"))) {
return;
}

TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
"show databases",
"Database,SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,",
Collections.emptySet());
}
}

@Test
public void testNoTable() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();

try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) {
final Map<String, String> extractorAttributes = new HashMap<>();
final Map<String, String> processorAttributes = new HashMap<>();
final Map<String, String> connectorAttributes = new HashMap<>();

extractorAttributes.put("extractor.inclusion", "all");
extractorAttributes.put("extractor.inclusion.exclusion", "data.delete");
extractorAttributes.put("extractor.capture.tree", "true");
extractorAttributes.put("extractor.capture.table", "false");

connectorAttributes.put("connector", "iotdb-thrift-connector");
connectorAttributes.put("connector.ip", receiverIp);
connectorAttributes.put("connector.port", Integer.toString(receiverPort));

final TSStatus status =
client.createPipe(
new TCreatePipeReq("testPipe", connectorAttributes)
.setExtractorAttributes(extractorAttributes)
.setProcessorAttributes(processorAttributes));

Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), status.getCode());

Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(), client.startPipe("testPipe").getCode());

final String dbName = "test";
TableModelUtils.createDataBase(senderEnv, dbName, 300);

if (!TestUtils.tryExecuteNonQueriesWithRetry(
dbName,
BaseEnv.TABLE_SQL_DIALECT,
senderEnv,
Arrays.asList(
"create table table1(a id, b attribute, c int32) with (ttl=3000)",
"alter table table1 add column d int64",
"alter table table1 drop column b",
"alter table table1 set properties ttl=default"))) {
return;
}

TestUtils.assertDataAlwaysOnEnv(
receiverEnv,
"show databases",
"Database,TTL(ms),SchemaReplicationFactor,DataReplicationFactor,TimePartitionInterval,",
Collections.emptySet(),
dbName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ public static void createDataBaseAndTable(BaseEnv baseEnv, String table, String
}
}

public static void createDataBase(BaseEnv baseEnv, String database) {
public static void createDataBase(BaseEnv baseEnv, String database, long ttl) {
try (Connection connection = baseEnv.getConnection(BaseEnv.TABLE_SQL_DIALECT);
Statement statement = connection.createStatement()) {
statement.execute("create database if not exists " + database);
statement.execute("create database if not exists " + database + " with (ttl=" + ttl + ")");
} catch (Exception e) {
fail(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@
import org.apache.iotdb.confignode.consensus.request.write.partition.CreateSchemaPartitionPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.RemoveRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.partition.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeCreateTablePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeactivateTemplatePlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteDevicesPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteLogicalViewPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeDeleteTimeSeriesPlan;
import org.apache.iotdb.confignode.consensus.request.write.pipe.payload.PipeEnrichedPlan;
Expand Down Expand Up @@ -200,6 +202,9 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case AlterDatabase:
plan = new DatabaseSchemaPlan(ConfigPhysicalPlanType.AlterDatabase);
break;
case DeleteDatabaseV2:
plan = new DatabaseSchemaPlan(ConfigPhysicalPlanType.DeleteDatabaseV2);
break;
case SetTTL:
plan = new SetTTLPlan();
break;
Expand Down Expand Up @@ -334,7 +339,7 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
plan = new ExtendSchemaTemplatePlan();
break;
case PreCreateTable:
plan = new PreCreateTablePlan();
plan = new PreCreateTablePlan(configPhysicalPlanType);
break;
case RollbackCreateTable:
plan = new RollbackCreateTablePlan();
Expand Down Expand Up @@ -444,6 +449,12 @@ public static ConfigPhysicalPlan create(final ByteBuffer buffer) throws IOExcept
case PipeDeactivateTemplate:
plan = new PipeDeactivateTemplatePlan();
break;
case PipeCreateTable:
plan = new PipeCreateTablePlan();
break;
case PipeDeleteDevices:
plan = new PipeDeleteDevicesPlan();
break;
case UpdateTriggersOnTransferNodes:
plan = new UpdateTriggersOnTransferNodesPlan();
break;
Expand Down
Loading
Loading