Skip to content

Commit 69c7223

Browse files
committed
logical replication
1 parent 3d3fdb7 commit 69c7223

File tree

1 file changed

+40
-3
lines changed

1 file changed

+40
-3
lines changed

tests/bwc/test_rolling_upgrade.py

+40-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
import unittest
23
from crate.client import connect
34
from crate.client.exceptions import ProgrammingError
@@ -88,6 +89,10 @@ def _test_rolling_upgrade(self, path, nodes):
8889
}
8990
cluster = self._new_cluster(path.from_version, nodes, settings=settings)
9091
cluster.start()
92+
replica_cluster = None
93+
if path.from_version.startswith("5"):
94+
replica_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False)
95+
replica_cluster.start()
9196
with connect(cluster.node().http_url, error_trace=True) as conn:
9297
c = conn.cursor()
9398
c.execute("create user arthur with (password = 'secret')")
@@ -133,6 +138,18 @@ def _test_rolling_upgrade(self, path, nodes):
133138
# Add the shards of the new partition primaries
134139
expected_active_shards += shards
135140

141+
if path.from_version.startswith("5"):
142+
c.execute("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=1)")
143+
expected_active_shards += 1
144+
c.execute("create publication p for table doc.x")
145+
with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:
146+
rc = replica_conn.cursor()
147+
rc.execute("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=1)")
148+
rc.execute("create publication rp for table doc.rx")
149+
rc.execute(f"create subscription rs connection 'crate://localhost:{cluster.node().addresses.transport.port}?user=crate&sslmode=sniff' publication p")
150+
c.execute(f"create subscription s connection 'crate://localhost:{replica_cluster.node().addresses.transport.port}?user=crate&sslmode=sniff' publication rp")
151+
expected_active_shards += 1
152+
136153
for idx, node in enumerate(cluster):
137154
# Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node.
138155
# Otherwise upgraded node simply requests N-1 columns from old version with N columns and it always works.
@@ -151,11 +168,10 @@ def _test_rolling_upgrade(self, path, nodes):
151168
# Run a query as a user created on an older version (ensure user is read correctly from cluster state, auth works, etc)
152169
with connect(cluster.node().http_url, username='arthur', password='secret', error_trace=True) as custom_user_conn:
153170
c = custom_user_conn.cursor()
154-
# 'arthur' can only see 'parted' shards, 6 shards are for 't1'
155-
wait_for_active_shards(c, expected_active_shards - 6)
171+
wait_for_active_shards(c)
156172
c.execute("SELECT 1")
157173
# has no privilege
158-
with self.assertRaisesRegex(ProgrammingError, "RelationUnknown.*"):
174+
with self.assertRaises(ProgrammingError):
159175
c.execute("EXPLAIN SELECT * FROM doc.t1")
160176
# has privilege
161177
c.execute("EXPLAIN SELECT * FROM doc.v1")
@@ -237,6 +253,27 @@ def _test_rolling_upgrade(self, path, nodes):
237253
# Add the shards of the new partition primaries
238254
expected_active_shards += shards
239255

256+
# skip 5.5 -> 5.6 and later versions, they fail due to https://github.com/crate/crate/issues/17734
257+
if int(path.to_version.split('.')[1]) < 5:
258+
with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:
259+
rc = replica_conn.cursor()
260+
wait_for_active_shards(c)
261+
wait_for_active_shards(rc)
262+
# Ensure publishing to remote cluster works
263+
rc.execute("select count(*) from doc.x")
264+
count = rc.fetchall()[0][0]
265+
c.execute("insert into doc.x values (1)")
266+
time.sleep(3) # replication delay...
267+
rc.execute("select count(*) from doc.x")
268+
self.assertEqual(rc.fetchall()[0][0], count + 1)
269+
# Ensure subscription from remote cluster works
270+
c.execute("select count(*) from doc.rx")
271+
count = c.fetchall()[0][0]
272+
rc.execute("insert into doc.rx values (1)")
273+
time.sleep(3) # replication delay...
274+
c.execute("select count(*) from doc.rx")
275+
self.assertEqual(c.fetchall()[0][0], count + 1)
276+
240277
# Finally validate that all shards (primaries and replicas) of all partitions are started
241278
# and writes into the partitioned table while upgrading were successful
242279
with connect(cluster.node().http_url, error_trace=True) as conn:

0 commit comments

Comments
 (0)