|
| 1 | +import time |
1 | 2 | import unittest
|
2 | 3 | from crate.client import connect
|
3 | 4 | from crate.client.exceptions import ProgrammingError
|
@@ -88,6 +89,9 @@ def _test_rolling_upgrade(self, path, nodes):
|
88 | 89 | }
|
89 | 90 | cluster = self._new_cluster(path.from_version, nodes, settings=settings)
|
90 | 91 | cluster.start()
|
| 92 | + replica_cluster = self._new_cluster(path.from_version, 1, settings=settings) |
| 93 | + replica_cluster.start() |
| 94 | + cluster.node() |
91 | 95 | with connect(cluster.node().http_url, error_trace=True) as conn:
|
92 | 96 | c = conn.cursor()
|
93 | 97 | c.execute("create user arthur with (password = 'secret')")
|
@@ -133,6 +137,15 @@ def _test_rolling_upgrade(self, path, nodes):
|
133 | 137 | # Add the shards of the new partition primaries
|
134 | 138 | expected_active_shards += shards
|
135 | 139 |
|
| 140 | + c.execute("create table doc.x (a int)") |
| 141 | + c.execute("create publication p for table doc.x") |
| 142 | + with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn: |
| 143 | + rc = replica_conn.cursor() |
| 144 | + rc.execute(f"create subscription rs connection 'crate://{cluster.node().http_url}?user=crate&sslmode=sniff' publication p") |
| 145 | + rc.execute("create table doc.rx (a int)") |
| 146 | + rc.execute("create publication rp for table doc.rx") |
| 147 | + c.execute(f"create subscription s connection 'crate://{replica_cluster.node().http_url}?user=crate&sslmode=sniff' publication rp") |
| 148 | + |
136 | 149 | for idx, node in enumerate(cluster):
|
137 | 150 | # Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node.
|
138 | 151 | # Otherwise upgraded node simply requests N-1 columns from old version with N columns and it always works.
|
@@ -236,6 +249,25 @@ def _test_rolling_upgrade(self, path, nodes):
|
236 | 249 | # Add the shards of the new partition primaries
|
237 | 250 | expected_active_shards += shards
|
238 | 251 |
|
| 252 | + with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn: |
| 253 | + rc = replica_conn.cursor() |
| 254 | + # Ensure publishing to remote cluster works |
| 255 | + rc.execute("select count(*) from doc.x") |
| 256 | + count = rc.fetchall()[0][0] |
| 257 | + c.execute("insert into doc.x values (1)") |
| 258 | + rc.execute("refresh table doc.x") |
| 259 | + time.sleep(1) # replication delay... |
| 260 | + rc.execute("select count(*) from doc.x") |
| 261 | + self.assertEqual(rc.fetchall()[0][0], count + 1) |
| 262 | + |
| 263 | + # Ensure subscription from remote cluster works |
| 264 | + c.execute("select count(*) from doc.rx") |
| 265 | + count = c.fetchall()[0][0] |
| 266 | + rc.execute("insert into doc.rx values (1)") |
| 267 | + time.sleep(1) # replication delay... |
| 268 | + c.execute("select count(*) from doc.rx") |
| 269 | + self.assertEqual(c.fetchall()[0][0], count + 1) |
| 270 | + |
239 | 271 | # Finally validate that all shards (primaries and replicas) of all partitions are started
|
240 | 272 | # and writes into the partitioned table while upgrading were successful
|
241 | 273 | with connect(cluster.node().http_url, error_trace=True) as conn:
|
|
0 commit comments