|
5 | 5 | from crate.client.exceptions import ProgrammingError |
6 | 6 | from cr8.run_crate import CrateNode |
7 | 7 |
|
8 | | -from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath |
| 8 | +from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath, assert_busy |
9 | 9 |
|
10 | 10 | ROLLING_UPGRADES_V4 = ( |
11 | 11 | # 4.0.0 -> 4.0.1 -> 4.0.2 don't support rolling upgrades due to a bug |
|
33 | 33 | ) |
34 | 34 |
|
35 | 35 | ROLLING_UPGRADES_V5 = ( |
36 | | - UpgradePath('5.0.x', '5.1.x'), |
37 | | - UpgradePath('5.1.x', '5.2.x'), |
38 | | - UpgradePath('5.2.x', '5.3.x'), |
39 | | - UpgradePath('5.3.x', '5.4.x'), |
40 | | - UpgradePath('5.4.x', '5.5.x'), |
41 | | - UpgradePath('5.5.x', '5.6.x'), |
42 | | - UpgradePath('5.6.x', '5.7.x'), |
43 | | - UpgradePath('5.7.x', '5.8.x'), |
44 | | - UpgradePath('5.8.x', '5.9.x'), |
45 | | - UpgradePath('5.9.x', '5.10.x'), |
46 | | - UpgradePath('5.10.x', '6.0.x'), |
47 | | - UpgradePath('6.0.x', '6.0'), |
48 | 36 | UpgradePath('6.0', '6.1.x'), |
49 | 37 | UpgradePath('6.1.x', '6.1'), |
50 | 38 | UpgradePath('6.1', 'latest-nightly'), |
@@ -96,9 +84,17 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int): |
96 | 84 | cluster = self._new_cluster(path.from_version, nodes, settings=settings) |
97 | 85 | cluster.start() |
98 | 86 | node = cluster.node() |
| 87 | + remote_node = None |
99 | 88 | with connect(node.http_url, error_trace=True) as conn: |
100 | 89 | new_shards = init_data(conn, node.version, shards, replicas) |
101 | 90 | expected_active_shards += new_shards |
| 91 | + if node.version >= (5, 10, 0): |
| 92 | + remote_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False) |
| 93 | + remote_cluster.start() |
| 94 | + remote_node = remote_cluster.node() |
| 95 | + with connect(remote_node.http_url, error_trace=True) as remote_conn: |
| 96 | + new_shards = init_logical_replication_data(self, conn, remote_conn, node.addresses.transport.port, remote_node.addresses.transport.port, expected_active_shards) |
| 97 | + expected_active_shards += new_shards |
102 | 98 |
|
103 | 99 | for idx, node in enumerate(cluster): |
104 | 100 | # Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node. |
@@ -129,6 +125,9 @@ def _test_rolling_upgrade(self, path: UpgradePath, nodes: int): |
129 | 125 | c = conn.cursor() |
130 | 126 | new_shards = self._test_queries_on_new_node(idx, c, node, new_node, nodes, shards, expected_active_shards) |
131 | 127 | expected_active_shards += new_shards |
| 128 | + if node.version >= (5, 10, 0): |
| 129 | + with connect(remote_node.http_url, error_trace=True) as remote_conn: |
| 130 | + test_logical_replication_queries(self, conn, remote_conn) |
132 | 131 |
|
133 | 132 | # Finally validate that all shards (primaries and replicas) of all partitions are started |
134 | 133 | # and writes into the partitioned table while upgrading were successful |
@@ -328,3 +327,57 @@ def init_data(conn: Connection, version: tuple[int, int, int], shards: int, repl |
328 | 327 | c.execute("INSERT INTO doc.parted (id, value) VALUES (1, 1)") |
329 | 328 | new_shards += shards |
330 | 329 | return new_shards |
| 330 | + |
| 331 | + |
| 332 | +def init_logical_replication_data(self, local_conn: Connection, remote_conn: Connection, local_transport_port:int, remote_transport_port: int, local_active_shards: int) -> int: |
| 333 | + assert 4300 <= local_transport_port <= 4310 and 4300 <= remote_transport_port <= 4310 |
| 334 | + |
| 335 | + c = local_conn.cursor() |
| 336 | + c.execute("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=0)") |
| 337 | + c.execute("create publication p for table doc.x") |
| 338 | + |
| 339 | + rc = remote_conn.cursor() |
| 340 | + rc.execute("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=0)") |
| 341 | + rc.execute("create publication rp for table doc.rx") |
| 342 | + |
| 343 | + rc.execute(f"create subscription rs connection 'crate://localhost:{local_transport_port}?user=crate&sslmode=sniff' publication p") |
| 344 | + c.execute(f"create subscription s connection 'crate://localhost:{remote_transport_port}?user=crate&sslmode=sniff' publication rp") |
| 345 | + |
| 346 | + new_shards = 2 # 1 shard for doc.x and another 1 shard for doc.rx |
| 347 | + wait_for_active_shards(rc, new_shards) |
| 348 | + wait_for_active_shards(c, local_active_shards + new_shards) |
| 349 | + assert_busy(lambda: self.assertEqual(num_docs_x(rc), 0)) |
| 350 | + assert_busy(lambda: self.assertEqual(num_docs_rx(c), 0)) |
| 351 | + |
| 352 | + return new_shards |
| 353 | + |
| 354 | + |
| 355 | +def test_logical_replication_queries(self, local_conn: Connection, remote_conn: Connection): |
| 356 | + c = local_conn.cursor() |
| 357 | + rc = remote_conn.cursor() |
| 358 | + |
| 359 | + # Cannot drop replicated tables |
| 360 | + with self.assertRaises(ProgrammingError): |
| 361 | + rc.execute("drop table doc.x") |
| 362 | + c.execute("drop table doc.rx") |
| 363 | + |
| 364 | + count = num_docs_x(rc) |
| 365 | + count2 = num_docs_rx(c) |
| 366 | + |
| 367 | + c.execute("insert into doc.x values (1)") |
| 368 | + c.execute("refresh table doc.x") |
| 369 | + rc.execute("insert into doc.rx values (1)") |
| 370 | + rc.execute("refresh table doc.rx") |
| 371 | + |
| 372 | + assert_busy(lambda: self.assertEqual(num_docs_x(rc), count + 1)) |
| 373 | + assert_busy(lambda: self.assertEqual(num_docs_rx(c), count2 + 1)) |
| 374 | + |
| 375 | + |
| 376 | +def num_docs_x(cursor): |
| 377 | + cursor.execute("select count(*) from doc.x") |
| 378 | + return cursor.fetchall()[0][0] |
| 379 | + |
| 380 | + |
| 381 | +def num_docs_rx(cursor): |
| 382 | + cursor.execute("select count(*) from doc.rx") |
| 383 | + return cursor.fetchall()[0][0] |
0 commit comments