Skip to content

Improve clusterstate metadata rolling upgrade test #347

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 124 additions & 5 deletions tests/bwc/test_rolling_upgrade.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import time
import unittest
from crate.client import connect
from crate.client.exceptions import ProgrammingError

from crate.qa.tests import NodeProvider, insert_data, wait_for_active_shards, UpgradePath

ROLLING_UPGRADES_V4 = (
@@ -86,6 +89,10 @@ def _test_rolling_upgrade(self, path, nodes):
}
cluster = self._new_cluster(path.from_version, nodes, settings=settings)
cluster.start()
replica_cluster = None
if path.from_version.startswith("5"):
replica_cluster = self._new_cluster(path.from_version, 1, settings=settings, explicit_discovery=False)
replica_cluster.start()
with connect(cluster.node().http_url, error_trace=True) as conn:
c = conn.cursor()
c.execute("create user arthur with (password = 'secret')")
@@ -103,14 +110,27 @@ def _test_rolling_upgrade(self, path, nodes):
) CLUSTERED INTO {shards} SHARDS
WITH (number_of_replicas={replicas})
''')
c.execute("deny dql on table doc.t1 to arthur")
c.execute("CREATE VIEW doc.v1 AS SELECT type, title, value FROM doc.t1")
insert_data(conn, 'doc', 't1', 1000)

c.execute("INSERT INTO doc.t1 (type, value, title, author) VALUES (1, 1, 'matchMe title', {name='no match name'})")
c.execute("INSERT INTO doc.t1 (type, value, title, author) VALUES (2, 2, 'no match title', {name='matchMe name'})")

c.execute("INSERT INTO doc.t1 (title, author, o) VALUES ('prefix_check', {\"dyn_empty_array\" = []}, {\"dyn_ignored_subcol\" = 'hello'})")

# For versions < 5.3 fails:
# cr> insert into doc.t2(a,b) values (1,1);
# ClassCastException[class java.lang.String cannot be cast to class java.lang.Number (java.lang.String and java.lang.Number are in module java.base of loader 'bootstrap')]
if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 3:
c.execute('''
create table doc.t2 (
a int check (a >= 0),
b int not null,
c int default abs(random() * 100),
d generated always as (a + b + c),
constraint d CHECK (d >= a + b)
) partitioned by (b,c,d) clustered by(a) into 1 shards with (number_of_replicas = 0)
''')

c.execute('''
CREATE FUNCTION foo(INT)
RETURNS INT
@@ -130,6 +150,30 @@ def _test_rolling_upgrade(self, path, nodes):
# Add the shards of the new partition primaries
expected_active_shards += shards

if int(path.from_version.split('.')[0]) >= 5:
c.execute("create table doc.x (a int) clustered into 1 shards with (number_of_replicas=0)")
expected_active_shards += 1
c.execute("create publication p for table doc.x")
with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:
rc = replica_conn.cursor()
rc.execute("create table doc.rx (a int) clustered into 1 shards with (number_of_replicas=0)")
rc.execute("create publication rp for table doc.rx")
rc.execute(f"create subscription rs connection 'crate://localhost:{cluster.node().addresses.transport.port}?user=crate&sslmode=sniff' publication p")
c.execute(f"create subscription s connection 'crate://localhost:{replica_cluster.node().addresses.transport.port}?user=crate&sslmode=sniff' publication rp")
expected_active_shards += 1

# FDW: two CrateDB clusters setting up foreign data wrappers bidirectionally
if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 7:
c.execute("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)")
expected_active_shards += 1
with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:
rc = replica_conn.cursor()
rc.execute("create table doc.y (a int) clustered into 1 shards with (number_of_replicas=0)")
rc.execute(f"CREATE SERVER source FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{cluster.node().addresses.psql.port}/')")
rc.execute("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER source OPTIONS (schema_name 'doc', table_name 'y')")
c.execute(f"CREATE SERVER remote FOREIGN DATA WRAPPER jdbc OPTIONS (url 'jdbc:postgresql://localhost:{replica_cluster.node().addresses.psql.port}/')")
c.execute("CREATE FOREIGN TABLE doc.remote_y (a int) SERVER remote OPTIONS (schema_name 'doc', table_name 'y')")

for idx, node in enumerate(cluster):
# Enforce an old version node be a handler to make sure that an upgraded node can serve 'select *' from an old version node.
# Otherwise upgraded node simply requests N-1 columns from old version with N columns and it always works.
@@ -148,8 +192,13 @@ def _test_rolling_upgrade(self, path, nodes):
# Run a query as a user created on an older version (ensure user is read correctly from cluster state, auth works, etc)
with connect(cluster.node().http_url, username='arthur', password='secret', error_trace=True) as custom_user_conn:
c = custom_user_conn.cursor()
wait_for_active_shards(c, expected_active_shards)
wait_for_active_shards(c)
c.execute("SELECT 1")
# has no privilege
with self.assertRaises(ProgrammingError):
c.execute("EXPLAIN SELECT * FROM doc.t1")
# has privilege
c.execute("EXPLAIN SELECT * FROM doc.v1")

cluster[idx] = new_node
with connect(new_node.http_url, error_trace=True) as conn:
@@ -159,8 +208,11 @@ def _test_rolling_upgrade(self, path, nodes):
c.execute("select name from sys.users order by 1")
self.assertEqual(c.fetchall(), [["arthur"], ["crate"]])

c.execute("select * from sys.privileges")
self.assertEqual(c.fetchall(), [["CLUSTER", "arthur", "crate", None, "GRANT", "DQL"]])
c.execute("select * from sys.privileges order by ident")
self.assertEqual(
c.fetchall(),
[['TABLE', 'arthur', 'crate', 'doc.t1', 'DENY', 'DQL'],
['CLUSTER', 'arthur', 'crate', None, 'GRANT', 'DQL']])

c.execute('''
SELECT type, AVG(value)
@@ -225,6 +277,62 @@ def _test_rolling_upgrade(self, path, nodes):
# Add the shards of the new partition primaries
expected_active_shards += shards

# For versions < 5.3 fails:
# cr> insert into doc.t2(a,b) values (1,1);
# ClassCastException[class java.lang.String cannot be cast to class java.lang.Number (java.lang.String and java.lang.Number are in module java.base of loader 'bootstrap')]
if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 3:
c.execute("select count(*) from doc.t2")
count = c.fetchall()[0][0]
c.execute(f"insert into doc.t2(a, b) values ({idx}, {idx})")
expected_active_shards += 1
c.execute("refresh table t2")
c.execute("select count(*) from doc.t2")
self.assertEqual(c.fetchall()[0][0], count + 1)

'''
disable entirely due to https://github.com/crate/crate/issues/17753
# skip 5.5 -> 5.6 and later versions, they fail due to https://github.com/crate/crate/issues/17734
if int(path.from_version.split('.')[0]) >= 5 and int(path.to_version.split('.')[1]) < 5:
with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:
rc = replica_conn.cursor()
wait_for_active_shards(c)
wait_for_active_shards(rc)
# Ensure publishing to remote cluster works
rc.execute("select count(*) from doc.x")
count = rc.fetchall()[0][0]
c.execute("insert into doc.x values (1)")
time.sleep(3) # replication delay...
rc.execute("select count(*) from doc.x")
# self.assertEqual(rc.fetchall()[0][0], count + 1)
# Ensure subscription from remote cluster works
c.execute("select count(*) from doc.rx")
count = c.fetchall()[0][0]
rc.execute("insert into doc.rx values (1)")
time.sleep(3) # replication delay...
c.execute("select count(*) from doc.rx")
# self.assertEqual(c.fetchall()[0][0], count + 1)
'''

if int(path.from_version.split('.')[0]) >= 5 and int(path.from_version.split('.')[1]) >= 7:
with connect(replica_cluster.node().http_url, error_trace=True) as replica_conn:
rc = replica_conn.cursor()
wait_for_active_shards(c)
wait_for_active_shards(rc)
# Ensure FDW in source cluster is functional
rc.execute("select count(a) from doc.remote_y")
count = rc.fetchall()[0][0]
c.execute("insert into doc.y values (1)")
time.sleep(3) # account for delay
rc.execute("select count(a) from doc.remote_y")
# self.assertEqual(rc.fetchall()[0][0], count + 1)

# Ensure FDW in remote cluster is functional
c.execute("select count(a) from doc.remote_y")
count = c.fetchall()[0][0]
rc.execute("insert into doc.y values (1)")
time.sleep(3) # account for delay
c.execute("select count(a) from doc.remote_y")
# self.assertEqual(c.fetchall()[0][0], count + 1)
# Finally validate that all shards (primaries and replicas) of all partitions are started
# and writes into the partitioned table while upgrading were successful
with connect(cluster.node().http_url, error_trace=True) as conn:
@@ -239,3 +347,14 @@ def _test_rolling_upgrade(self, path, nodes):
''')
res = c.fetchone()
self.assertEqual(res[0], nodes + 1)

# Ensure Arthur can be dropped and re-added
c.execute("drop user arthur")
c.execute("select * from sys.privileges")
self.assertEqual(c.fetchall(), [])

# Ensure view 'v' can be dropped and re-added
c.execute("DROP VIEW doc.v1")
c.execute("CREATE VIEW doc.v1 AS SELECT 11")
c.execute("SELECT * FROM doc.v1")
self.assertEqual(c.fetchall(), [[11]])