From 5efd7fd0c4fb298f5c8e33719793578b1ca3e1b2 Mon Sep 17 00:00:00 2001 From: Dragomir Penev Date: Tue, 14 Mar 2023 11:46:07 +0200 Subject: [PATCH 01/24] Enable sync mode --- poetry.lock | 42 ++++++++++++------------- pyproject.toml | 2 +- requirements.txt | 18 +++++------ src/charm.py | 9 ------ src/patroni.py | 17 ++-------- templates/patroni.yml.j2 | 4 +++ templates/postgresql.conf.j2 | 7 ----- tests/unit/test_charm.py | 22 +++---------- tests/unit/test_patroni.py | 61 ++---------------------------------- 9 files changed, 43 insertions(+), 139 deletions(-) delete mode 100644 templates/postgresql.conf.j2 diff --git a/poetry.lock b/poetry.lock index 6226742a92..dd65fcae2d 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry and should not be changed by hand. +# This file is automatically @generated by Poetry 1.4.0 and should not be changed by hand. [[package]] name = "anyio" @@ -155,18 +155,18 @@ uvloop = ["uvloop (>=0.15.2)"] [[package]] name = "boto3" -version = "1.26.87" +version = "1.26.90" description = "The AWS SDK for Python" category = "main" optional = false python-versions = ">= 3.7" files = [ - {file = "boto3-1.26.87-py3-none-any.whl", hash = "sha256:7cf46d6aa67487ae5dc43a7bd10e1ebe9b8c442c7f32f7fea259cd3df6eeb52d"}, - {file = "boto3-1.26.87.tar.gz", hash = "sha256:b0f7e801d6d5cb96ed89e1d39326bb072b5d9175bb6c986850c77b640474d297"}, + {file = "boto3-1.26.90-py3-none-any.whl", hash = "sha256:f1123076445f93fa85bf7ee956ae33041f2e077a6824e31929cc56ad31aeffc1"}, + {file = "boto3-1.26.90.tar.gz", hash = "sha256:1d33abca60643d14f90a9e77d94085ebfd8f8bf8f157f582466f6b3a141bab8c"}, ] [package.dependencies] -botocore = ">=1.29.87,<1.30.0" +botocore = ">=1.29.90,<1.30.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.6.0,<0.7.0" @@ -175,14 +175,14 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.29.87" +version = "1.29.90" description = "Low-level, data-driven core of boto 3." category = "main" optional = false python-versions = ">= 3.7" files = [ - {file = "botocore-1.29.87-py3-none-any.whl", hash = "sha256:f4066e1ce1fd8790e872c70deede5d7abd94c87dcdb12892b4359245fed927f9"}, - {file = "botocore-1.29.87.tar.gz", hash = "sha256:2b981c3ebb347bdf2a8c3c0ce7e7712c76bfe43ae87d30776a3ab469e305f469"}, + {file = "botocore-1.29.90-py3-none-any.whl", hash = "sha256:1b8c1b8c366875e65d39237a296842b9c0ea33af2ba4a2771db2ba6aefa663ef"}, + {file = "botocore-1.29.90.tar.gz", hash = "sha256:2dbbc2c7d93ddefcf9896268597212d446e5d416fbceb1b12c793660fa9f83f3"}, ] [package.dependencies] @@ -551,14 +551,14 @@ files = [ [[package]] name = "exceptiongroup" -version = "1.1.0" +version = "1.1.1" description = "Backport of PEP 654 (exception groups)" category = "dev" optional = false python-versions = ">=3.7" files = [ - {file = "exceptiongroup-1.1.0-py3-none-any.whl", hash = "sha256:327cbda3da756e2de031a3107b81ab7b3770a602c4d16ca618298c526f4bec1e"}, - {file = "exceptiongroup-1.1.0.tar.gz", hash = "sha256:bcb67d800a4497e1b404c2dd44fca47d3b7a5e5433dbab67f96c1a685cdfdf23"}, + {file = "exceptiongroup-1.1.1-py3-none-any.whl", hash = "sha256:232c37c63e4f682982c8b6459f33a8981039e5fb8756b2074364e5055c498c9e"}, + {file = "exceptiongroup-1.1.1.tar.gz", hash = "sha256:d484c3090ba2889ae2928419117447a14daf3c1231d5e30d0aae34f354f01785"}, ] [package.extras] @@ -773,14 +773,14 @@ files = [ [[package]] name = "ipdb" -version = "0.13.11" +version = "0.13.13" description = "IPython-enabled pdb" category = "dev" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" files = [ - {file = "ipdb-0.13.11-py3-none-any.whl", hash = "sha256:f74c2f741c18b909eaf89f19fde973f745ac721744aa1465888ce45813b63a9c"}, - {file = "ipdb-0.13.11.tar.gz", hash = "sha256:c23b6736f01fd4586cc2ecbebdf79a5eb454796853e1cd8f2ed3b7b91d4a3e93"}, + {file = "ipdb-0.13.13-py3-none-any.whl", hash = "sha256:45529994741c4ab6d2388bfa5d7b725c2cf7fe9deffabdb8a6113aa5ed449ed4"}, + {file = "ipdb-0.13.13.tar.gz", hash = "sha256:e3ac6018ef05126d442af680aad863006ec19d02290561ac88b8b1c0b0cfc726"}, ] [package.dependencies] @@ -1297,14 +1297,14 @@ files = [ [[package]] name = "platformdirs" -version = "3.1.0" +version = "3.1.1" description = "A small Python package for determining appropriate platform-specific dirs, e.g. a \"user data dir\"." category = "dev" optional = false python-versions = ">=3.7" files = [ - {file = "platformdirs-3.1.0-py3-none-any.whl", hash = "sha256:13b08a53ed71021350c9e300d4ea8668438fb0046ab3937ac9a29913a1a1350a"}, - {file = "platformdirs-3.1.0.tar.gz", hash = "sha256:accc3665857288317f32c7bebb5a8e482ba717b474f3fc1d18ca7f9214be0cef"}, + {file = "platformdirs-3.1.1-py3-none-any.whl", hash = "sha256:e5986afb596e4bb5bde29a79ac9061aa955b94fca2399b7aaac4090860920dd8"}, + {file = "platformdirs-3.1.1.tar.gz", hash = "sha256:024996549ee88ec1a9aa99ff7f8fc819bb59e2c3477b410d90a16d32d6e707aa"}, ] [package.extras] @@ -2037,14 +2037,14 @@ typing-extensions = ">=3.7.4" [[package]] name = "urllib3" -version = "1.26.14" +version = "1.26.15" description = "HTTP library with thread-safe connection pooling, file post, and more." category = "main" optional = false python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*" files = [ - {file = "urllib3-1.26.14-py2.py3-none-any.whl", hash = "sha256:75edcdc2f7d85b137124a6c3c9fc3933cdeaa12ecb9a6a959f22797a0feca7e1"}, - {file = "urllib3-1.26.14.tar.gz", hash = "sha256:076907bf8fd355cde77728471316625a4d2f7e713c125f51953bb5b3eecf4f72"}, + {file = "urllib3-1.26.15-py2.py3-none-any.whl", hash = "sha256:aa751d169e23c7479ce47a0cb0da579e3ede798f994f5816a74e4f4500dcea42"}, + {file = "urllib3-1.26.15.tar.gz", hash = "sha256:8a388717b9476f934a21484e8c8e61875ab60644d29b9b39e11e4b9dc1c6b305"}, ] [package.extras] @@ -2242,4 +2242,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = "^3.8.10" -content-hash = "e9129698bada348a5e3b9ba08bb299b4418d3d9c27dc8becf5f97da10754069c" +content-hash = "abc01933a2424ca1e80e31722c559e9b85f02e31f138b082878ffd04e9eefb6d" diff --git a/pyproject.toml b/pyproject.toml index c599d0718c..7f0897d689 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ jinja2 = "^3.1.2" lightkube = "^0.12.0" lightkube-models = "^1.26.0.4" requests = "^2.28.2" -boto3 = "^1.26.87" +boto3 = "^1.26.90" # psycopg2 = "^2.9.5" # Injected in charmcraft.yaml [tool.poetry.group.format] diff --git a/requirements.txt b/requirements.txt index 0a03f124b5..294c367fdb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,12 +4,12 @@ anyio==3.6.2 ; python_full_version >= "3.8.10" and python_full_version < "4.0.0" attrs==22.2.0 ; python_full_version >= "3.8.10" and python_full_version < "4.0.0" \ --hash=sha256:29e95c7f6778868dbd49170f98f8818f78f3dc5e0e37c0b1f474e3561b240836 \ --hash=sha256:c9227bfc2f01993c03f68db37d1d15c9690188323c067c641f1a35ca58185f99 -boto3==1.26.87 ; python_full_version >= "3.8.10" and python_full_version < "4.0.0" \ - --hash=sha256:7cf46d6aa67487ae5dc43a7bd10e1ebe9b8c442c7f32f7fea259cd3df6eeb52d \ - --hash=sha256:b0f7e801d6d5cb96ed89e1d39326bb072b5d9175bb6c986850c77b640474d297 -botocore==1.29.87 ; python_full_version >= "3.8.10" and python_full_version < "4.0.0" \ - --hash=sha256:2b981c3ebb347bdf2a8c3c0ce7e7712c76bfe43ae87d30776a3ab469e305f469 \ - --hash=sha256:f4066e1ce1fd8790e872c70deede5d7abd94c87dcdb12892b4359245fed927f9 +boto3==1.26.90 ; python_full_version >= "3.8.10" and python_full_version < "4.0.0" \ + --hash=sha256:1d33abca60643d14f90a9e77d94085ebfd8f8bf8f157f582466f6b3a141bab8c \ + --hash=sha256:f1123076445f93fa85bf7ee956ae33041f2e077a6824e31929cc56ad31aeffc1 +botocore==1.29.90 ; python_full_version >= "3.8.10" and python_full_version < "4.0.0" \ + --hash=sha256:1b8c1b8c366875e65d39237a296842b9c0ea33af2ba4a2771db2ba6aefa663ef \ + --hash=sha256:2dbbc2c7d93ddefcf9896268597212d446e5d416fbceb1b12c793660fa9f83f3 certifi==2022.12.7 ; python_full_version >= "3.8.10" and python_version < "4" \ --hash=sha256:35824b4c3a97115964b408844d64aa14db1cc518f6562e8d7261699d1350a9e3 \ --hash=sha256:4ad3232f5e926d6718ec31cfc1fcadfde020920e278684144551c91769c7bc18 @@ -361,9 +361,9 @@ sniffio==1.3.0 ; python_full_version >= "3.8.10" and python_full_version < "4.0. tenacity==8.2.2 ; python_full_version >= "3.8.10" and python_full_version < "4.0.0" \ --hash=sha256:2f277afb21b851637e8f52e6a613ff08734c347dc19ade928e519d7d2d8569b0 \ --hash=sha256:43af037822bd0029025877f3b2d97cc4d7bb0c2991000a3d59d71517c5c969e0 -urllib3==1.26.14 ; python_full_version >= "3.8.10" and python_version < "4" \ - --hash=sha256:076907bf8fd355cde77728471316625a4d2f7e713c125f51953bb5b3eecf4f72 \ - --hash=sha256:75edcdc2f7d85b137124a6c3c9fc3933cdeaa12ecb9a6a959f22797a0feca7e1 +urllib3==1.26.15 ; python_full_version >= "3.8.10" and python_version < "4" \ + --hash=sha256:8a388717b9476f934a21484e8c8e61875ab60644d29b9b39e11e4b9dc1c6b305 \ + --hash=sha256:aa751d169e23c7479ce47a0cb0da579e3ede798f994f5816a74e4f4500dcea42 websocket-client==1.5.1 ; python_full_version >= "3.8.10" and python_full_version < "4.0.0" \ --hash=sha256:3f09e6d8230892547132177f575a4e3e73cfdf06526e20cc02aa1c3b47184d40 \ --hash=sha256:cdf5877568b7e83aa7cf2244ab56a3213de587bbe0ce9d8b9600fc77b455d89e diff --git a/src/charm.py b/src/charm.py index be65745026..1d63dea388 100755 --- a/src/charm.py +++ b/src/charm.py @@ -75,7 +75,6 @@ def __init__(self, *args): self._context = {"namespace": self._namespace, "app_name": self._name} self.cluster_name = f"patroni-{self._name}" - self.framework.observe(self.on.install, self._on_install) self.framework.observe(self.on.config_changed, self._on_config_changed) self.framework.observe(self.on.leader_elected, self._on_leader_elected) self.framework.observe(self.on[PEER].relation_changed, self._on_peer_relation_changed) @@ -211,7 +210,6 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: self._remove_from_endpoints(endpoints_to_remove) # Update the replication configuration. - self._patroni.render_postgresql_conf_file() self._patroni.reload_patroni_configuration() def _on_peer_relation_changed(self, event: RelationChangedEvent) -> None: @@ -249,11 +247,6 @@ def _on_peer_relation_changed(self, event: RelationChangedEvent) -> None: self.unit.status = ActiveStatus() - def _on_install(self, _) -> None: - """Event handler for InstallEvent.""" - # Creates custom postgresql.conf file. - self._patroni.render_postgresql_conf_file() - def _on_config_changed(self, _) -> None: """Handle the config-changed event.""" # TODO: placeholder method to implement logic specific to configuration change. @@ -367,7 +360,6 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: self._add_members(event) # Update the replication configuration. - self._patroni.render_postgresql_conf_file() try: self._patroni.reload_patroni_configuration() except RetryError: @@ -839,7 +831,6 @@ def update_config(self) -> None: backup_id=self.app_peer_data.get("restoring-backup"), stanza=self.unit_peer_data.get("stanza"), ) - self._patroni.render_postgresql_conf_file() if not self._patroni.member_started: return diff --git a/src/patroni.py b/src/patroni.py index 3afddd9fd6..822352fc97 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -209,7 +209,7 @@ def render_patroni_yml_file( stanza: name of the stanza created by pgBackRest. backup_id: id of the backup that is being restored. """ - # Open the template postgresql.conf file. + # Open the template patroni.yml file. with open("templates/patroni.yml.j2", "r") as file: template = Template(file.read()) # Render the template file with the correct values. @@ -228,23 +228,10 @@ def render_patroni_yml_file( restoring_backup=backup_id is not None, backup_id=backup_id, stanza=stanza, + minority_count=self._members_count // 2, ) self._render_file(f"{self._storage_path}/patroni.yml", rendered, 0o644) - def render_postgresql_conf_file(self) -> None: - """Render the PostgreSQL configuration file.""" - # Open the template postgresql.conf file. - with open("templates/postgresql.conf.j2", "r") as file: - template = Template(file.read()) - # Render the template file with the correct values. - # TODO: add extra configurations here later. - rendered = template.render( - logging_collector="on", - synchronous_commit="on" if self._members_count > 1 else "off", - synchronous_standby_names="*", - ) - self._render_file(f"{self._storage_path}/postgresql-k8s-operator.conf", rendered, 0o644) - @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def reload_patroni_configuration(self) -> None: """Reloads the configuration after it was updated in the file.""" diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index a13fc2f141..f093ec0471 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -1,10 +1,14 @@ bootstrap: dcs: + synchronous_mode: true + synchronous_node_count: {{ minority_count }} postgresql: use_pg_rewind: true remove_data_directory_on_rewind_failure: true remove_data_directory_on_diverged_timelines: true parameters: + synchronous_commit: on + synchronous_standby_names: "*" {%- if enable_pgbackrest %} archive_command: 'pgbackrest --stanza={{ stanza }} archive-push %p' {% else %} diff --git a/templates/postgresql.conf.j2 b/templates/postgresql.conf.j2 deleted file mode 100644 index 213fdfe069..0000000000 --- a/templates/postgresql.conf.j2 +++ /dev/null @@ -1,7 +0,0 @@ -######################################################################################### -# [ WARNING ] -# postgresql configuration file maintained by the postgresql-k8s-operator -# local changes may be overwritten. -######################################################################################### -synchronous_commit = '{{ synchronous_commit }}' -synchronous_standby_names = '{{ synchronous_standby_names }}' \ No newline at end of file diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 90776870c7..6fc5504474 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -55,20 +55,10 @@ def setUp(self): self.rel_id = self.harness.add_relation(self._peer_relation, self.charm.app.name) - @patch_network_get(private_address="1.1.1.1") - @patch("charm.Patroni.render_postgresql_conf_file") - def test_on_install( - self, - _render_postgresql_conf_file, - ): - self.charm.on.install.emit() - _render_postgresql_conf_file.assert_called_once() - @patch("charm.Patroni.reload_patroni_configuration") - @patch("charm.Patroni.render_postgresql_conf_file") @patch("charm.PostgresqlOperatorCharm._patch_pod_labels") @patch("charm.PostgresqlOperatorCharm._create_resources") - def test_on_leader_elected(self, _, __, _render_postgresql_conf_file, ___): + def test_on_leader_elected(self, _, __, ___): # Assert that there is no password in the peer relation. self.assertIsNone(self.charm._peers.data[self.charm.app].get("postgres-password", None)) self.assertIsNone(self.charm._peers.data[self.charm.app].get("replication-password", None)) @@ -82,7 +72,6 @@ def test_on_leader_elected(self, _, __, _render_postgresql_conf_file, ___): "replication-password", None ) self.assertIsNotNone(replication_password) - _render_postgresql_conf_file.assert_called_once() # Trigger a new leader election and check that the password is still the same. self.harness.set_leader(False) @@ -402,10 +391,9 @@ def test_patch_pod_labels(self, _client): ) @patch("charm.Patroni.reload_patroni_configuration") - @patch("charm.Patroni.render_postgresql_conf_file") @patch("charm.PostgresqlOperatorCharm._patch_pod_labels") @patch("charm.PostgresqlOperatorCharm._create_resources") - def test_postgresql_layer(self, _, __, ___, ____): + def test_postgresql_layer(self, _, __, ___): # Test with the already generated password. self.harness.set_leader() plan = self.charm._postgresql_layer().to_dict() @@ -435,9 +423,8 @@ def test_postgresql_layer(self, _, __, ___, ____): self.assertDictEqual(plan, expected) @patch("charm.Patroni.reload_patroni_configuration") - @patch("charm.Patroni.render_postgresql_conf_file") @patch("charm.PostgresqlOperatorCharm._create_resources") - def test_get_secret(self, _, __, ___): + def test_get_secret(self, _, __): self.harness.set_leader() # Test application scope. @@ -455,9 +442,8 @@ def test_get_secret(self, _, __, ___): assert self.charm.get_secret("unit", "password") == "test-password" @patch("charm.Patroni.reload_patroni_configuration") - @patch("charm.Patroni.render_postgresql_conf_file") @patch("charm.PostgresqlOperatorCharm._create_resources") - def test_set_secret(self, _, __, ___): + def test_set_secret(self, _, __): self.harness.set_leader() # Test application scope. diff --git a/tests/unit/test_patroni.py b/tests/unit/test_patroni.py index fe4ede2102..47516e292f 100644 --- a/tests/unit/test_patroni.py +++ b/tests/unit/test_patroni.py @@ -92,6 +92,7 @@ def test_render_patroni_yml_file(self, _render_file): replication_password=self.patroni._replication_password, rewind_user=REWIND_USER, rewind_password=self.patroni._rewind_password, + minority_count=self.patroni._members_count // 2, ) # Setup a mock for the `open` method, set returned data to postgresql.conf template. @@ -125,6 +126,7 @@ def test_render_patroni_yml_file(self, _render_file): replication_password=self.patroni._replication_password, rewind_user=REWIND_USER, rewind_password=self.patroni._rewind_password, + minority_count=self.patroni._members_count // 2, ) self.assertNotEqual(expected_content_with_tls, expected_content) @@ -149,65 +151,6 @@ def test_render_patroni_yml_file(self, _render_file): ) self.assertIn("ssl_key_file: /var/lib/postgresql/data/key.pem", expected_content_with_tls) - @patch("charm.Patroni._render_file") - def test_render_postgresql_conf_file(self, _render_file): - # Get the expected content from a file. - with open("templates/postgresql.conf.j2") as file: - template = Template(file.read()) - expected_content = template.render( - logging_collector="on", - synchronous_commit="off", - synchronous_standby_names="*", - ) - - # Setup a mock for the `open` method, set returned data to postgresql.conf template. - with open("templates/postgresql.conf.j2", "r") as f: - mock = mock_open(read_data=f.read()) - - # Patch the `open` method with our mock. - with patch("builtins.open", mock, create=True): - # Call the method - self.patroni.render_postgresql_conf_file() - - # Check the template is opened read-only in the call to open. - self.assertEqual(mock.call_args_list[0][0], ("templates/postgresql.conf.j2", "r")) - # Ensure the correct rendered template is sent to _render_file method. - _render_file.assert_called_once_with( - f"{STORAGE_PATH}/postgresql-k8s-operator.conf", - expected_content, - 0o644, - ) - - # Also test with multiple planned units (synchronous_commit is turned on). - self.patroni = Patroni( - "postgresql-k8s-0", - ["postgresql-k8s-0", "postgresql-k8s-1"], - "postgresql-k8s-primary.dev.svc.cluster.local", - "test-model", - STORAGE_PATH, - "superuser-password", - "replication-password", - "rewind-password", - False, - ) - expected_content = template.render( - logging_collector="on", - synchronous_commit="on", - synchronous_standby_names="*", - ) - - # Patch the `open` method with our mock. - with patch("builtins.open", mock, create=True): - # Call the method - self.patroni.render_postgresql_conf_file() - - # Ensure the correct rendered template is sent to _render_file method. - _render_file.assert_called_with( - f"{STORAGE_PATH}/postgresql-k8s-operator.conf", - expected_content, - 0o644, - ) - @patch("requests.get") def test_primary_endpoint_ready(self, _get): # Test with an issue when trying to connect to the Patroni API. From 426a00c5a9de1df453f00778ea3d128b6e4d1e67 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 31 Mar 2023 16:14:12 -0300 Subject: [PATCH 02/24] Remove custom conf --- templates/patroni.yml.j2 | 1 - 1 file changed, 1 deletion(-) diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 906735e566..66fd4ef2c2 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -54,7 +54,6 @@ ctl: pod_ip: '{{ endpoint }}' postgresql: connect_address: '{{ endpoint }}:5432' - custom_conf: {{ storage_path }}/postgresql-k8s-operator.conf data_dir: {{ storage_path }}/pgdata listen: 0.0.0.0:5432 parameters: From aeec50db836adf26f5ac95c2182c3aaad92d0a5c Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Fri, 31 Mar 2023 16:49:38 -0300 Subject: [PATCH 03/24] Add new expected k8s endpoint to helper function --- tests/integration/helpers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 004d917de8..755f3648bb 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -446,6 +446,7 @@ def get_expected_k8s_resources(namespace: str, application: str) -> set: resources.update( [ f"Endpoints/patroni-{application}-config", + f"Endpoints/patroni-{application}-sync", f"Endpoints/patroni-{application}", f"Endpoints/{application}", f"Endpoints/{application}-primary", From ce1d6d9fa47a0320d9a04b6c3bc5128890a3b096 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 5 Apr 2023 09:53:19 -0300 Subject: [PATCH 04/24] Fix TLS test --- tests/integration/test_tls.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_tls.py b/tests/integration/test_tls.py index f40debebc2..5cbb4b9cf8 100644 --- a/tests/integration/test_tls.py +++ b/tests/integration/test_tls.py @@ -110,10 +110,16 @@ async def test_mattermost_db(ops_test: OpsTest) -> None: # Restart the initial primary and check the logs to ensure TLS is being used by pg_rewind. await run_command_on_unit(ops_test, primary, "/charm/bin/pebble start postgresql") - logs = await run_command_on_unit(ops_test, replica, "/charm/bin/pebble logs") - assert ( - "connection authorized: user=rewind database=postgres SSL enabled" in logs - ), "TLS is not being used on pg_rewind connections" + for attempt in Retrying( + stop=stop_after_delay(60 * 3), wait=wait_exponential(multiplier=1, min=2, max=30) + ): + with attempt: + logs = await run_command_on_unit( + ops_test, replica, "cat /var/log/postgresql/postgresql.log" + ) + assert ( + "connection authorized: user=rewind database=postgres SSL enabled" in logs + ), "TLS is not being used on pg_rewind connections" # Deploy and check Mattermost user and database existence. relation_id = await deploy_and_relate_application_with_postgresql( From 46374692df8ccd1791c7b0428c25b0c624e5e9fc Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 5 Apr 2023 16:57:27 -0300 Subject: [PATCH 05/24] Add pebble health check --- src/charm.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/charm.py b/src/charm.py index b65f31b940..16e82b7b1b 100755 --- a/src/charm.py +++ b/src/charm.py @@ -829,6 +829,15 @@ def _postgresql_layer(self) -> Layer: "group": WORKLOAD_OS_GROUP, }, }, + "checks": { + self._postgresql_service: { + "override": "replace", + "level": "ready", + "http": { + "url": f"{self._patroni._patroni_url}/health", + }, + } + }, } return Layer(layer_config) From 63cb1c2f11b51be06304be4e581601db68ec7f51 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 5 Apr 2023 17:23:55 -0300 Subject: [PATCH 06/24] Fix freeze db test --- .../ha_tests/application-charm/metadata.yaml | 4 + .../ha_tests/application-charm/src/charm.py | 153 +++++++++++++----- .../src/continuous_writes.py | 46 ++++-- tests/integration/ha_tests/conftest.py | 49 ++---- tests/integration/ha_tests/helpers.py | 96 +++++++++-- .../integration/ha_tests/test_self_healing.py | 92 +++++++---- 6 files changed, 300 insertions(+), 140 deletions(-) diff --git a/tests/integration/ha_tests/application-charm/metadata.yaml b/tests/integration/ha_tests/application-charm/metadata.yaml index d9ba6ef6f3..ee4cb703e7 100644 --- a/tests/integration/ha_tests/application-charm/metadata.yaml +++ b/tests/integration/ha_tests/application-charm/metadata.yaml @@ -11,3 +11,7 @@ requires: database: interface: postgresql_client limit: 1 + +peers: + application-peers: + interface: application-peers diff --git a/tests/integration/ha_tests/application-charm/src/charm.py b/tests/integration/ha_tests/application-charm/src/charm.py index c4b136e249..d40e4d91e9 100755 --- a/tests/integration/ha_tests/application-charm/src/charm.py +++ b/tests/integration/ha_tests/application-charm/src/charm.py @@ -9,24 +9,41 @@ """ import logging +import os +import signal import subprocess -from typing import Optional +from typing import Dict, Optional import psycopg2 from charms.data_platform_libs.v0.data_interfaces import DatabaseRequires from ops.charm import ActionEvent, CharmBase -from ops.framework import StoredState from ops.main import main -from ops.model import ActiveStatus +from ops.model import ActiveStatus, Relation from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed logger = logging.getLogger(__name__) +PEER = "application-peers" +LAST_WRITTEN_FILE = "/tmp/last_written_value" +CONFIG_FILE = "/tmp/continuous_writes_config" +PROC_PID_KEY = "proc-pid" + class ApplicationCharm(CharmBase): """Application charm that connects to PostgreSQL charm.""" - _stored = StoredState() + @property + def _peers(self) -> Optional[Relation]: + """Retrieve the peer relation (`ops.model.Relation`).""" + return self.model.get_relation(PEER) + + @property + def app_peer_data(self) -> Dict: + """Application peer relation data object.""" + if self._peers is None: + return {} + + return self._peers.data[self.app] def __init__(self, *args): super().__init__(*args) @@ -37,7 +54,6 @@ def __init__(self, *args): # Events related to the database that is requested. self.database_name = "application" self.database = DatabaseRequires(self, "database", self.database_name) - self.framework.observe(self.database.on.database_created, self._on_database_created) self.framework.observe(self.database.on.endpoints_changed, self._on_endpoints_changed) self.framework.observe( self.on.clear_continuous_writes_action, self._on_clear_continuous_writes_action @@ -49,9 +65,6 @@ def __init__(self, *args): self.on.stop_continuous_writes_action, self._on_stop_continuous_writes_action ) - # PID of the continuous writes OS process. - self._stored.set_default(continuous_writes_pid=None) - @property def _connection_string(self) -> Optional[str]: """Returns the PostgreSQL connection string.""" @@ -63,6 +76,10 @@ def _connection_string(self) -> Optional[str]: return None host = endpoints.split(":")[0] + + if not host or host == "None": + return None + return ( f"dbname='{self.database_name}' user='{username}'" f" host='{host}' password='{password}' connect_timeout=5" @@ -72,12 +89,23 @@ def _on_start(self, _) -> None: """Only sets an Active status.""" self.unit.status = ActiveStatus() - def _on_database_created(self, _) -> None: - """Event triggered when a database was created for this application.""" - self._start_continuous_writes(1) - def _on_endpoints_changed(self, _) -> None: """Event triggered when the read/write endpoints of the database change.""" + if self._connection_string is None: + return + + if not self.app_peer_data.get(PROC_PID_KEY): + return None + + with open(CONFIG_FILE, "w") as fd: + fd.write(self._connection_string) + os.fsync(fd) + + try: + os.kill(int(self.app_peer_data[PROC_PID_KEY]), signal.SIGKILL) + except ProcessLookupError: + del self.app_peer_data[PROC_PID_KEY] + return count = self._count_writes() self._start_continuous_writes(count + 1) @@ -91,18 +119,64 @@ def _count_writes(self) -> int: connection.close() return count - def _on_clear_continuous_writes_action(self, _) -> None: + def _on_clear_continuous_writes_action(self, event: ActionEvent) -> None: """Clears database writes.""" - self._stop_continuous_writes() - with psycopg2.connect( - self._connection_string - ) as connection, connection.cursor() as cursor: - cursor.execute("DROP TABLE continuous_writes;") - connection.close() + if self._connection_string is None: + event.set_results({"result": "False"}) + return + + try: + self._stop_continuous_writes() + except Exception as e: + event.set_results({"result": "False"}) + logger.exception("Unable to stop writes to drop table", exc_info=e) + return - def _on_start_continuous_writes_action(self, _) -> None: + try: + with psycopg2.connect( + self._connection_string + ) as connection, connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS continuous_writes;") + event.set_results({"result": "True"}) + except Exception as e: + event.set_results({"result": "False"}) + logger.exception("Unable to drop table", exc_info=e) + finally: + connection.close() + + def _on_start_continuous_writes_action(self, event: ActionEvent) -> None: """Start the continuous writes process.""" + if self._connection_string is None: + event.set_results({"result": "False"}) + return + + try: + self._stop_continuous_writes() + except Exception as e: + event.set_results({"result": "False"}) + logger.exception("Unable to stop writes to create table", exc_info=e) + return + + try: + # Create the table to write records on and also a unique index to prevent duplicate + # writes. + with psycopg2.connect( + self._connection_string + ) as connection, connection.cursor() as cursor: + connection.autocommit = True + cursor.execute("CREATE TABLE IF NOT EXISTS continuous_writes(number INTEGER);") + cursor.execute( + "CREATE UNIQUE INDEX IF NOT EXISTS number ON continuous_writes(number);" + ) + except Exception as e: + event.set_results({"result": "False"}) + logger.exception("Unable to create table", exc_info=e) + return + finally: + connection.close() + self._start_continuous_writes(1) + event.set_results({"result": "True"}) def _on_stop_continuous_writes_action(self, event: ActionEvent) -> None: """Stops the continuous writes process.""" @@ -117,47 +191,48 @@ def _start_continuous_writes(self, starting_number: int) -> None: # Stop any writes that might be going. self._stop_continuous_writes() + with open(CONFIG_FILE, "w") as fd: + fd.write(self._connection_string) + os.fsync(fd) + # Run continuous writes in the background. popen = subprocess.Popen( [ "/usr/bin/python3", "src/continuous_writes.py", - self._connection_string, str(starting_number), ] ) # Store the continuous writes process ID to stop the process later. - self._stored.continuous_writes_pid = popen.pid + self.app_peer_data[PROC_PID_KEY] = str(popen.pid) - def _stop_continuous_writes(self) -> int: + def _stop_continuous_writes(self) -> Optional[int]: """Stops continuous writes to PostgreSQL and returns the last written value.""" - # If there is no process running, returns -1. - if self._stored.continuous_writes_pid is None: - return -1 + if not self.app_peer_data.get(PROC_PID_KEY): + return None # Stop the process. - proc = subprocess.Popen(["pkill", "--signal", "SIGKILL", "-f", "src/continuous_writes.py"]) - - # Wait for process to be killed. - proc.communicate() + try: + os.kill(int(self.app_peer_data[PROC_PID_KEY]), signal.SIGTERM) + except ProcessLookupError: + del self.app_peer_data[PROC_PID_KEY] + return None - self._stored.continuous_writes_pid = None + del self.app_peer_data[PROC_PID_KEY] # Return the max written value (or -1 if it was not possible to get that value). try: - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(5)): with attempt: - with psycopg2.connect( - self._connection_string - ) as connection, connection.cursor() as cursor: - cursor.execute("SELECT MAX(number) FROM continuous_writes;") - last_written_value = int(cursor.fetchone()[0]) - connection.close() + with open(LAST_WRITTEN_FILE, "r") as fd: + last_written_value = int(fd.read()) except RetryError as e: - logger.exception(e) + logger.exception("Unable to read result", exc_info=e) return -1 + os.remove(LAST_WRITTEN_FILE) + os.remove(CONFIG_FILE) return last_written_value diff --git a/tests/integration/ha_tests/application-charm/src/continuous_writes.py b/tests/integration/ha_tests/application-charm/src/continuous_writes.py index 83ab585749..ab261e6b0a 100644 --- a/tests/integration/ha_tests/application-charm/src/continuous_writes.py +++ b/tests/integration/ha_tests/application-charm/src/continuous_writes.py @@ -2,12 +2,32 @@ # See LICENSE file for licensing details. """This file is meant to run in the background continuously writing entries to PostgreSQL.""" +import os +import signal import sys import psycopg2 as psycopg2 +run = True +connection_string = None -def continuous_writes(connection_string: str, starting_number: int): + +def sigterm_handler(_signo, _stack_frame): + global run + run = False + + +def sighup_handler(_signo, _stack_frame): + read_config_file() + + +def read_config_file(): + with open("/tmp/continuous_writes_config") as fd: + global connection_string + connection_string = fd.read().strip() + + +def continuous_writes(starting_number: int): """Continuously writes data do PostgreSQL database. Args: @@ -17,19 +37,10 @@ def continuous_writes(connection_string: str, starting_number: int): """ write_value = starting_number - try: - # Create the table to write records on and also a unique index to prevent duplicate writes. - with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: - connection.autocommit = True - cursor.execute("CREATE TABLE IF NOT EXISTS continuous_writes(number INTEGER);") - cursor.execute( - "CREATE UNIQUE INDEX IF NOT EXISTS number ON continuous_writes(number);" - ) - finally: - connection.close() + read_config_file() # Continuously write the record to the database (incrementing it at each iteration). - while True: + while run: try: with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: connection.autocommit = True @@ -53,12 +64,17 @@ def continuous_writes(connection_string: str, starting_number: int): write_value += 1 + with open("/tmp/last_written_value", "w") as fd: + fd.write(str(write_value - 1)) + os.fsync(fd) + def main(): - connection_string = sys.argv[1] - starting_number = int(sys.argv[2]) - continuous_writes(connection_string, starting_number) + starting_number = int(sys.argv[1]) + continuous_writes(starting_number) if __name__ == "__main__": + signal.signal(signal.SIGTERM, sigterm_handler) + signal.signal(signal.SIGHUP, sighup_handler) main() diff --git a/tests/integration/ha_tests/conftest.py b/tests/integration/ha_tests/conftest.py index d331e070fb..5439288eb2 100644 --- a/tests/integration/ha_tests/conftest.py +++ b/tests/integration/ha_tests/conftest.py @@ -3,14 +3,12 @@ # See LICENSE file for licensing details. import pytest as pytest from pytest_operator.plugin import OpsTest +from tenacity import Retrying, stop_after_delay, wait_fixed from tests.integration.ha_tests.helpers import ( - app_name, change_master_start_timeout, get_master_start_timeout, - stop_continuous_writes, ) -from tests.integration.helpers import CHARM_SERIES APPLICATION_NAME = "application" @@ -18,43 +16,20 @@ @pytest.fixture() async def continuous_writes(ops_test: OpsTest) -> None: """Deploy the charm that makes continuous writes to PostgreSQL.""" - # Deploy the continuous writes application charm if it wasn't already deployed. - async with ops_test.fast_forward(): - if await app_name(ops_test, APPLICATION_NAME) is None: - charm = await ops_test.build_charm("tests/integration/ha_tests/application-charm") - await ops_test.model.deploy( - charm, application_name=APPLICATION_NAME, series=CHARM_SERIES - ) - await ops_test.model.wait_for_idle(status="active", timeout=1000) - - # Start the continuous writes process by relating the application to the database or - # by calling the action if the relation already exists. - database_app = await app_name(ops_test) - relations = [ - relation - for relation in ops_test.model.applications[database_app].relations - if not relation.is_peer - and f"{relation.requires.application_name}:{relation.requires.name}" - == "application:database" - ] - if not relations: - await ops_test.model.relate(database_app, APPLICATION_NAME) - await ops_test.model.wait_for_idle(status="active", timeout=1000) - else: - action = await ops_test.model.units.get(f"{APPLICATION_NAME}/0").run_action( - "start-continuous-writes" - ) - await action.wait() yield - # Stop the continuous writes process and clear the written data at the end. - await stop_continuous_writes(ops_test) - action = await ops_test.model.units.get(f"{APPLICATION_NAME}/0").run_action( - "clear-continuous-writes" - ) - await action.wait() + # Clear the written data at the end. + for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True): + with attempt: + action = ( + await ops_test.model.applications[APPLICATION_NAME] + .units[0] + .run_action("clear-continuous-writes") + ) + await action.wait() + assert action.results["result"] == "True", "Unable to clear up continuous_writes table" -@pytest.fixture() +@pytest.fixture(scope="module") async def master_start_timeout(ops_test: OpsTest) -> None: """Temporary change the master start timeout configuration.""" # Change the parameter that makes the primary reelection faster. diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 3c649ad0ea..3cb4b61030 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -1,7 +1,7 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. from pathlib import Path -from typing import Optional +from typing import Dict, Optional import psycopg2 import requests @@ -37,6 +37,11 @@ class ProcessError(Exception): """Raised when a process fails.""" +def get_patroni_cluster(unit_ip: str) -> Dict[str, str]: + resp = requests.get(f"http://{unit_ip}:8008/cluster") + return resp.json() + + async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int]) -> None: """Change master start timeout configuration. @@ -55,6 +60,14 @@ async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int]) ) +async def check_writes(ops_test) -> int: + """Gets the total writes from the test charm and compares to the writes from db.""" + total_expected_writes = await stop_continuous_writes(ops_test) + actual_writes = await count_writes(ops_test) + assert total_expected_writes == actual_writes, "writes to the db were missed." + return total_expected_writes + + async def count_writes(ops_test: OpsTest, down_unit: str = None) -> int: """Count the number of writes in the database.""" app = await app_name(ops_test) @@ -62,23 +75,31 @@ async def count_writes(ops_test: OpsTest, down_unit: str = None) -> int: status = await ops_test.model.get_status() for unit_name, unit in status["applications"][app]["units"].items(): if unit_name != down_unit: - host = unit["address"] + cluster = get_patroni_cluster(unit["address"]) break + for member in cluster["members"]: + if member["role"] != "replica" and member["host"].split(".")[0] != ( + down_unit or "" + ).replace("/", "-"): + host = member["host"] + + # Translate the service hostname to an IP address. + print(host) + model = ops_test.model.info + client = Client(namespace=model.name) + service = client.get(Pod, name=host.split(".")[0]) + ip = service.status.podIP + connection_string = ( f"dbname='application' user='operator'" - f" host='{host}' password='{password}' connect_timeout=10" + f" host='{ip}' password='{password}' connect_timeout=10" ) - try: - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): - with attempt: - with psycopg2.connect( - connection_string - ) as connection, connection.cursor() as cursor: - cursor.execute("SELECT COUNT(number) FROM continuous_writes;") - count = cursor.fetchone()[0] - connection.close() - except RetryError: - return -1 + + with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: + cursor.execute("SELECT COUNT(number) FROM continuous_writes;") + count = cursor.fetchone()[0] + connection.close() + print(count) return count @@ -153,7 +174,7 @@ async def is_replica(ops_test: OpsTest, unit_name: str) -> bool: # A member that restarted has the DB process stopped may # take some time to know that a new primary was elected. - if role == "replica": + if role != "leader": return True else: raise MemberNotUpdatedOnClusterError() @@ -227,6 +248,19 @@ async def send_signal_to_process( await ops_test.model.applications[app].add_unit(count=1) await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) + # if signal == "SIGCONT": + # command = f"ssh {unit_name} pkill --signal {signal} -f {process}" + # return_code, _, _ = await ops_test.juju(*command.split()) + # + # if return_code != 0: + # raise ProcessError( + # "Expected command %s to succeed instead it failed: %s", + # command, + # return_code, + # ) + # + # return + # Load Kubernetes configuration to connect to the cluster. config.load_kube_config() @@ -256,6 +290,38 @@ async def send_signal_to_process( ) +async def start_continuous_writes(ops_test: OpsTest, app: str) -> None: + """Start continuous writes to PostgreSQL.""" + # Start the process by relating the application to the database or + # by calling the action if the relation already exists. + relations = [ + relation + for relation in ops_test.model.applications[app].relations + if not relation.is_peer + and f"{relation.requires.application_name}:{relation.requires.name}" + == "application:database" + ] + if not relations: + await ops_test.model.relate(app, "application") + await ops_test.model.wait_for_idle(status="active", timeout=1000) + else: + action = ( + await ops_test.model.applications["application"] + .units[0] + .run_action("start-continuous-writes") + ) + await action.wait() + for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True): + with attempt: + action = ( + await ops_test.model.applications["application"] + .units[0] + .run_action("start-continuous-writes") + ) + await action.wait() + assert action.results["result"] == "True", "Unable to create continuous_writes table" + + async def stop_continuous_writes(ops_test: OpsTest) -> int: """Stops continuous writes to PostgreSQL and returns the last written value.""" action = await ops_test.model.units.get("application/0").run_action("stop-continuous-writes") diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index a4e3012277..7132814c23 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -6,7 +6,10 @@ from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_delay, wait_fixed +from tests.integration.ha_tests.conftest import APPLICATION_NAME from tests.integration.ha_tests.helpers import ( + METADATA, + check_writes, count_writes, fetch_cluster_members, get_primary, @@ -14,10 +17,16 @@ postgresql_ready, secondary_up_to_date, send_signal_to_process, - stop_continuous_writes, + start_continuous_writes, +) +from tests.integration.helpers import ( + CHARM_SERIES, + app_name, + build_and_deploy, + get_unit_address, ) -from tests.integration.helpers import app_name, build_and_deploy, get_unit_address +APP_NAME = METADATA["name"] PATRONI_PROCESS = "/usr/local/bin/patroni" POSTGRESQL_PROCESS = "postgres" DB_PROCESSES = [POSTGRESQL_PROCESS, PATRONI_PROCESS] @@ -26,36 +35,58 @@ @pytest.mark.abort_on_fail async def test_build_and_deploy(ops_test: OpsTest) -> None: """Build and deploy three unit of PostgreSQL.""" - await build_and_deploy(ops_test, 3) + wait_for_apps = False + # It is possible for users to provide their own cluster for HA testing. Hence, check if there + # is a pre-existing cluster. + if not await app_name(ops_test): + wait_for_apps = True + await build_and_deploy(ops_test, 3, wait_for_idle=False) + # Deploy the continuous writes application charm if it wasn't already deployed. + if not await app_name(ops_test, APPLICATION_NAME): + wait_for_apps = True + async with ops_test.fast_forward(): + charm = await ops_test.build_charm("tests/integration/ha_tests/application-charm") + await ops_test.model.deploy( + charm, application_name=APPLICATION_NAME, series=CHARM_SERIES + ) + + if wait_for_apps: + async with ops_test.fast_forward(): + await ops_test.model.wait_for_idle(status="active", timeout=1000) -@pytest.mark.unstable @pytest.mark.parametrize("process", [POSTGRESQL_PROCESS]) -async def test_freeze_db_process( - ops_test: OpsTest, process: str, continuous_writes, master_start_timeout -) -> None: +async def test_freeze_db_process(ops_test: OpsTest, process: str, continuous_writes) -> None: # Locate primary unit. app = await app_name(ops_test) primary_name = await get_primary(ops_test, app) + # Start an application that continuously writes data to the database. + await start_continuous_writes(ops_test, app) + # Freeze the database process. await send_signal_to_process(ops_test, primary_name, process, "SIGSTOP") async with ops_test.fast_forward(): # Verify new writes are continuing by counting the number of writes before and after a - # 2 minutes wait (a db process freeze takes more time to trigger a fail-over). - writes = await count_writes(ops_test, primary_name) - for attempt in Retrying(stop=stop_after_delay(60 * 2), wait=wait_fixed(3)): - with attempt: - more_writes = await count_writes(ops_test, primary_name) - assert more_writes > writes, "writes not continuing to DB" - - # Verify that a new primary gets elected (ie old primary is secondary). - new_primary_name = await get_primary(ops_test, app, down_unit=primary_name) - assert new_primary_name != primary_name - - # Un-freeze the old primary. - await send_signal_to_process(ops_test, primary_name, process, "SIGCONT") + # 3 minutes wait (a db process freeze takes more time to trigger a fail-over). + try: + writes = await count_writes(ops_test, primary_name) + for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): + with attempt: + more_writes = await count_writes(ops_test, primary_name) + assert more_writes > writes, "writes not continuing to DB" + + # Verify that a new primary gets elected (ie old primary is secondary). + for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): + with attempt: + new_primary_name = await get_primary(ops_test, app, down_unit=primary_name) + assert new_primary_name != primary_name + finally: + # Un-freeze the old primary. + if process != PATRONI_PROCESS: + await send_signal_to_process(ops_test, primary_name, PATRONI_PROCESS, "SIGCONT") + await send_signal_to_process(ops_test, primary_name, process, "SIGCONT") # Verify that the database service got restarted and is ready in the old primary. assert await postgresql_ready(ops_test, primary_name) @@ -74,11 +105,7 @@ async def test_freeze_db_process( assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster." # Verify that no writes to the database were missed after stopping the writes. - total_expected_writes = await stop_continuous_writes(ops_test) - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): - with attempt: - actual_writes = await count_writes(ops_test) - assert total_expected_writes == actual_writes, "writes to the db were missed." + total_expected_writes = await check_writes(ops_test) # Verify that old primary is up-to-date. assert await secondary_up_to_date( @@ -86,15 +113,16 @@ async def test_freeze_db_process( ), "secondary not up to date with the cluster after restarting." -@pytest.mark.unstable @pytest.mark.parametrize("process", DB_PROCESSES) -async def test_restart_db_process( - ops_test: OpsTest, process: str, continuous_writes, master_start_timeout -) -> None: +async def test_restart_db_process(ops_test: OpsTest, process: str, continuous_writes) -> None: + return # Locate primary unit. app = await app_name(ops_test) primary_name = await get_primary(ops_test, app) + # Start an application that continuously writes data to the database. + await start_continuous_writes(ops_test, app) + # Restart the database process. await send_signal_to_process(ops_test, primary_name, process, "SIGTERM") @@ -128,11 +156,7 @@ async def test_restart_db_process( assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster." # Verify that no writes to the database were missed after stopping the writes. - total_expected_writes = await stop_continuous_writes(ops_test) - for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): - with attempt: - actual_writes = await count_writes(ops_test) - assert total_expected_writes == actual_writes, "writes to the db were missed." + total_expected_writes = await check_writes(ops_test) # Verify that old primary is up-to-date. assert await secondary_up_to_date( From ac75151f133f93fd900567aab331fe11d5cadbb7 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 5 Apr 2023 17:41:05 -0300 Subject: [PATCH 07/24] Enable restart db test --- tests/integration/ha_tests/helpers.py | 2 -- tests/integration/ha_tests/test_self_healing.py | 1 - 2 files changed, 3 deletions(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 3cb4b61030..dded8f95e4 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -84,7 +84,6 @@ async def count_writes(ops_test: OpsTest, down_unit: str = None) -> int: host = member["host"] # Translate the service hostname to an IP address. - print(host) model = ops_test.model.info client = Client(namespace=model.name) service = client.get(Pod, name=host.split(".")[0]) @@ -99,7 +98,6 @@ async def count_writes(ops_test: OpsTest, down_unit: str = None) -> int: cursor.execute("SELECT COUNT(number) FROM continuous_writes;") count = cursor.fetchone()[0] connection.close() - print(count) return count diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 7132814c23..29b8d45055 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -115,7 +115,6 @@ async def test_freeze_db_process(ops_test: OpsTest, process: str, continuous_wri @pytest.mark.parametrize("process", DB_PROCESSES) async def test_restart_db_process(ops_test: OpsTest, process: str, continuous_writes) -> None: - return # Locate primary unit. app = await app_name(ops_test) primary_name = await get_primary(ops_test, app) From 4fb09d9cf73e51619c76aef651ede79e1f6a3a9b Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 5 Apr 2023 17:42:02 -0300 Subject: [PATCH 08/24] Remove commented code --- tests/integration/ha_tests/helpers.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index dded8f95e4..bfa5b32620 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -246,19 +246,6 @@ async def send_signal_to_process( await ops_test.model.applications[app].add_unit(count=1) await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) - # if signal == "SIGCONT": - # command = f"ssh {unit_name} pkill --signal {signal} -f {process}" - # return_code, _, _ = await ops_test.juju(*command.split()) - # - # if return_code != 0: - # raise ProcessError( - # "Expected command %s to succeed instead it failed: %s", - # command, - # return_code, - # ) - # - # return - # Load Kubernetes configuration to connect to the cluster. config.load_kube_config() From 1ace69c4e937248178b8267f79a8457f169d2c1d Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Wed, 5 Apr 2023 17:45:17 -0300 Subject: [PATCH 09/24] Fix unit tests --- tests/unit/test_charm.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 2a513233df..976b3ca955 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -139,6 +139,7 @@ def test_on_postgresql_pebble_ready( expected = self.charm._postgresql_layer().to_dict() expected.pop("summary", "") expected.pop("description", "") + expected.pop("checks", "") # Check the plan is as expected. self.assertEqual(plan.to_dict(), expected) self.assertEqual(self.harness.model.unit.status, ActiveStatus()) @@ -434,6 +435,15 @@ def test_postgresql_layer(self, _, __, ___): "group": "postgres", }, }, + "checks": { + self._postgresql_service: { + "override": "replace", + "level": "ready", + "http": { + "url": "http://postgresql-k8s-0.postgresql-k8s-endpoints:8008/health", + }, + } + }, } self.assertDictEqual(plan, expected) From 0ef7f51ced9d14ecf501a3166e5f8d57ecbc1323 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Sat, 8 Apr 2023 16:03:01 -0300 Subject: [PATCH 10/24] Improve TLS management --- src/charm.py | 9 +++++++-- templates/patroni.yml.j2 | 3 +++ tests/integration/test_tls.py | 6 ++++-- 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/charm.py b/src/charm.py index e36229ad7d..dac939d1c7 100755 --- a/src/charm.py +++ b/src/charm.py @@ -13,7 +13,7 @@ PostgreSQLUpdateUserPasswordError, ) from charms.postgresql_k8s.v0.postgresql_tls import PostgreSQLTLS -from charms.rolling_ops.v0.rollingops import RollingOpsManager +from charms.rolling_ops.v0.rollingops import RollingOpsManager, RunWithLock from lightkube import ApiError, Client, codecs from lightkube.models.core_v1 import ServicePort from lightkube.resources.core_v1 import Endpoints, Pod, Service @@ -891,8 +891,13 @@ def push_tls_files_to_workload(self, container: Container = None) -> None: self.update_config() - def _restart(self, _) -> None: + def _restart(self, event: RunWithLock) -> None: """Restart PostgreSQL.""" + if not self._patroni.are_all_members_ready(): + logger.debug("Early exit _restart: not all members ready yet") + event.defer() + return + try: self._patroni.restart_postgresql() except RetryError: diff --git a/templates/patroni.yml.j2 b/templates/patroni.yml.j2 index 3b8bf9ffe0..af64a428ed 100644 --- a/templates/patroni.yml.j2 +++ b/templates/patroni.yml.j2 @@ -16,6 +16,9 @@ bootstrap: archive_command: /bin/true {%- endif %} archive_mode: {{ archive_mode }} + log_filename: 'postgresql.log' + log_directory: '/var/log/postgresql' + logging_collector: 'on' password_encryption: md5 wal_level: logical {%- if restoring_backup %} diff --git a/tests/integration/test_tls.py b/tests/integration/test_tls.py index 66d1aa1897..f22526d228 100644 --- a/tests/integration/test_tls.py +++ b/tests/integration/test_tls.py @@ -111,11 +111,13 @@ async def test_mattermost_db(ops_test: OpsTest) -> None: # Restart the initial primary and check the logs to ensure TLS is being used by pg_rewind. await run_command_on_unit(ops_test, primary, "/charm/bin/pebble start postgresql") for attempt in Retrying( - stop=stop_after_delay(60 * 3), wait=wait_exponential(multiplier=1, min=2, max=30) + stop=stop_after_delay(60), wait=wait_exponential(multiplier=1, min=2, max=30) ): with attempt: logs = await run_command_on_unit( - ops_test, replica, "cat /var/log/postgresql/postgresql.log" + ops_test, + replica, + 'bash -c "cat /var/log/postgresql/postgresql.log | grep rewind"', ) assert ( "connection authorized: user=rewind database=postgres SSL enabled" in logs From 17455b6a1b91fa8ffb1e129f65ba29349e7536bf Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 10 Apr 2023 07:07:31 -0300 Subject: [PATCH 11/24] Add health check update --- src/charm.py | 42 ++++++++++++++++++++++------------- tests/integration/test_tls.py | 7 +++++- 2 files changed, 33 insertions(+), 16 deletions(-) diff --git a/src/charm.py b/src/charm.py index dac939d1c7..43d6f23b95 100755 --- a/src/charm.py +++ b/src/charm.py @@ -402,9 +402,6 @@ def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None: # where the volume is mounted with more restrictive permissions. self._create_pgdata(container) - # Create a new config layer. - new_layer = self._postgresql_layer() - self.unit.set_workload_version(self._patroni.rock_postgresql_version) # Defer the initialization of the workload in the replicas @@ -428,18 +425,8 @@ def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None: event.defer() return - # Get the current layer. - current_layer = container.get_plan() - # Check if there are any changes to layer services. - if current_layer.services != new_layer.services: - # Changes were made, add the new layer. - container.add_layer(self._postgresql_service, new_layer, combine=True) - logging.info("Added updated layer 'postgresql' to Pebble plan") - # TODO: move this file generation to on config changed hook - # when adding configs to this charm. - # Restart it and report a new status to Juju. - container.restart(self._postgresql_service) - logging.info("Restarted postgresql service") + # Start the database service. + self._update_pebble_layers() # Ensure the member is up and running before marking the cluster as initialised. if not self._patroni.member_started: @@ -906,6 +893,9 @@ def _restart(self, event: RunWithLock) -> None: self.unit.status = BlockedStatus(error_message) return + # Update health check URL. + self._update_pebble_layers() + # Start or stop the pgBackRest TLS server service when TLS certificate change. self.backup.start_stop_pgbackrest_service() @@ -939,6 +929,28 @@ def update_config(self) -> None: if restart_postgresql: self.on[self.restart_manager.name].acquire_lock.emit() + def _update_pebble_layers(self) -> None: + """Update the pebble layers to keep the health check URL up-to-date.""" + container = self.unit.get_container("postgresql") + + # Get the current layer. + current_layer = container.get_plan() + + # Create a new config layer. + new_layer = self._postgresql_layer() + + # Check if there are any changes to layer services. + if current_layer.services != new_layer.services: + # Changes were made, add the new layer. + container.add_layer(self._postgresql_service, new_layer, combine=True) + logging.info("Added updated layer 'postgresql' to Pebble plan") + container.restart(self._postgresql_service) + logging.info("Restarted postgresql service") + if current_layer.checks != new_layer.checks: + # Changes were made, add the new layer. + container.add_layer(self._postgresql_service, new_layer, combine=True) + logging.info("Updated health checks") + def _unit_name_to_pod_name(self, unit_name: str) -> str: """Converts unit name to pod name. diff --git a/tests/integration/test_tls.py b/tests/integration/test_tls.py index f22526d228..818c590e5e 100644 --- a/tests/integration/test_tls.py +++ b/tests/integration/test_tls.py @@ -1,6 +1,8 @@ #!/usr/bin/env python3 # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. +import logging + import pytest as pytest from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_delay, wait_exponential @@ -22,6 +24,8 @@ run_command_on_unit, ) +logger = logging.getLogger(__name__) + MATTERMOST_APP_NAME = "mattermost" TLS_CERTIFICATES_APP_NAME = "tls-certificates-operator" APPLICATION_UNITS = 2 @@ -111,9 +115,10 @@ async def test_mattermost_db(ops_test: OpsTest) -> None: # Restart the initial primary and check the logs to ensure TLS is being used by pg_rewind. await run_command_on_unit(ops_test, primary, "/charm/bin/pebble start postgresql") for attempt in Retrying( - stop=stop_after_delay(60), wait=wait_exponential(multiplier=1, min=2, max=30) + stop=stop_after_delay(60 * 3), wait=wait_exponential(multiplier=1, min=2, max=30) ): with attempt: + logger.info(f"checking if pg_rewind used TLS on {replica}") logs = await run_command_on_unit( ops_test, replica, From 7e60fcb0da63d024824b54e259b0a482b42c7d39 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 10 Apr 2023 07:20:37 -0300 Subject: [PATCH 12/24] Remove health check --- src/charm.py | 51 ++++++++++++---------------------------- tests/unit/test_charm.py | 9 ------- 2 files changed, 15 insertions(+), 45 deletions(-) diff --git a/src/charm.py b/src/charm.py index 43d6f23b95..94853023ea 100755 --- a/src/charm.py +++ b/src/charm.py @@ -402,6 +402,9 @@ def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None: # where the volume is mounted with more restrictive permissions. self._create_pgdata(container) + # Create a new config layer. + new_layer = self._postgresql_layer() + self.unit.set_workload_version(self._patroni.rock_postgresql_version) # Defer the initialization of the workload in the replicas @@ -425,8 +428,18 @@ def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None: event.defer() return - # Start the database service. - self._update_pebble_layers() + # Get the current layer. + current_layer = container.get_plan() + # Check if there are any changes to layer services. + if current_layer.services != new_layer.services: + # Changes were made, add the new layer. + container.add_layer(self._postgresql_service, new_layer, combine=True) + logging.info("Added updated layer 'postgresql' to Pebble plan") + # TODO: move this file generation to on config changed hook + # when adding configs to this charm. + # Restart it and report a new status to Juju. + container.restart(self._postgresql_service) + logging.info("Restarted postgresql service") # Ensure the member is up and running before marking the cluster as initialised. if not self._patroni.member_started: @@ -819,15 +832,6 @@ def _postgresql_layer(self) -> Layer: "group": WORKLOAD_OS_GROUP, }, }, - "checks": { - self._postgresql_service: { - "override": "replace", - "level": "ready", - "http": { - "url": f"{self._patroni._patroni_url}/health", - }, - } - }, } return Layer(layer_config) @@ -893,9 +897,6 @@ def _restart(self, event: RunWithLock) -> None: self.unit.status = BlockedStatus(error_message) return - # Update health check URL. - self._update_pebble_layers() - # Start or stop the pgBackRest TLS server service when TLS certificate change. self.backup.start_stop_pgbackrest_service() @@ -929,28 +930,6 @@ def update_config(self) -> None: if restart_postgresql: self.on[self.restart_manager.name].acquire_lock.emit() - def _update_pebble_layers(self) -> None: - """Update the pebble layers to keep the health check URL up-to-date.""" - container = self.unit.get_container("postgresql") - - # Get the current layer. - current_layer = container.get_plan() - - # Create a new config layer. - new_layer = self._postgresql_layer() - - # Check if there are any changes to layer services. - if current_layer.services != new_layer.services: - # Changes were made, add the new layer. - container.add_layer(self._postgresql_service, new_layer, combine=True) - logging.info("Added updated layer 'postgresql' to Pebble plan") - container.restart(self._postgresql_service) - logging.info("Restarted postgresql service") - if current_layer.checks != new_layer.checks: - # Changes were made, add the new layer. - container.add_layer(self._postgresql_service, new_layer, combine=True) - logging.info("Updated health checks") - def _unit_name_to_pod_name(self, unit_name: str) -> str: """Converts unit name to pod name. diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 4c7a72073a..15f0f06c94 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -442,15 +442,6 @@ def test_postgresql_layer(self, _, __, ___): "group": "postgres", }, }, - "checks": { - self._postgresql_service: { - "override": "replace", - "level": "ready", - "http": { - "url": "http://postgresql-k8s-0.postgresql-k8s-endpoints:8008/health", - }, - } - }, } self.assertDictEqual(plan, expected) From f12651ea4c907967576574e3fa7db2d5499aae21 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 10 Apr 2023 10:17:57 -0300 Subject: [PATCH 13/24] Add health check --- src/charm.py | 20 +++ tests/integration/ha_tests/helpers.py | 19 ++- .../integration/ha_tests/test_self_healing.py | 115 ++++++++++-------- tests/integration/helpers.py | 2 +- tests/integration/test_tls.py | 17 ++- tests/unit/test_charm.py | 20 +++ 6 files changed, 133 insertions(+), 60 deletions(-) diff --git a/src/charm.py b/src/charm.py index 94853023ea..abcc5d1eed 100755 --- a/src/charm.py +++ b/src/charm.py @@ -832,6 +832,26 @@ def _postgresql_layer(self) -> Layer: "group": WORKLOAD_OS_GROUP, }, }, + "checks": { + "patroni": { + "override": "replace", + "level": "ready", + "exec": { + "command": f"patronictl -c {self._storage_path}/patroni.yml list {self.cluster_name}", + "user": WORKLOAD_OS_USER, + "group": WORKLOAD_OS_GROUP, + "environment": { + "PATRONI_KUBERNETES_LABELS": f"{{application: patroni, cluster-name: {self.cluster_name}}}", + "PATRONI_KUBERNETES_NAMESPACE": self._namespace, + "PATRONI_KUBERNETES_USE_ENDPOINTS": "true", + "PATRONI_NAME": pod_name, + "PATRONI_SCOPE": self.cluster_name, + "PATRONI_REPLICATION_USERNAME": REPLICATION_USER, + "PATRONI_SUPERUSER_USERNAME": USER, + }, + }, + } + }, } return Layer(layer_config) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index bfa5b32620..6805dcb40c 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -226,7 +226,7 @@ async def secondary_up_to_date(ops_test: OpsTest, unit_name: str, expected_write async def send_signal_to_process( - ops_test: OpsTest, unit_name: str, process: str, signal: str + ops_test: OpsTest, unit_name: str, process: str, signal: str, use_ssh: bool = False ) -> None: """Send a signal to an OS process on a specific unit. @@ -246,12 +246,25 @@ async def send_signal_to_process( await ops_test.model.applications[app].add_unit(count=1) await ops_test.model.wait_for_idle(apps=[app], status="active", timeout=1000) + pod_name = unit_name.replace("/", "-") + command = f"pkill --signal {signal} -f {process}" + + if use_ssh: + kill_cmd = f"ssh {unit_name} {command}" + return_code, _, stderr = await ops_test.juju(*kill_cmd.split()) + if return_code != 0: + print(stderr) + raise ProcessError( + "Expected command %s to succeed instead it failed: %s", + command, + return_code, + ) + return + # Load Kubernetes configuration to connect to the cluster. config.load_kube_config() # Send the signal. - pod_name = unit_name.replace("/", "-") - command = f"pkill --signal {signal} -f {process}" response = stream( core_v1_api.CoreV1Api().connect_get_namespaced_pod_exec, pod_name, diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 29b8d45055..0a84b125f7 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. +import logging import pytest from pytest_operator.plugin import OpsTest @@ -26,6 +27,8 @@ get_unit_address, ) +logger = logging.getLogger(__name__) + APP_NAME = METADATA["name"] PATRONI_PROCESS = "/usr/local/bin/patroni" POSTGRESQL_PROCESS = "postgres" @@ -85,8 +88,20 @@ async def test_freeze_db_process(ops_test: OpsTest, process: str, continuous_wri finally: # Un-freeze the old primary. if process != PATRONI_PROCESS: - await send_signal_to_process(ops_test, primary_name, PATRONI_PROCESS, "SIGCONT") - await send_signal_to_process(ops_test, primary_name, process, "SIGCONT") + for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): + with attempt: + use_ssh = (attempt.retry_state.attempt_number % 2) != 0 + logger.info(f"unfreezing {PATRONI_PROCESS}") + await send_signal_to_process( + ops_test, primary_name, PATRONI_PROCESS, "SIGCONT", use_ssh + ) + for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): + with attempt: + use_ssh = (attempt.retry_state.attempt_number % 2) != 0 + logger.info(f"unfreezing {process}") + await send_signal_to_process( + ops_test, primary_name, process, "SIGCONT", use_ssh + ) # Verify that the database service got restarted and is ready in the old primary. assert await postgresql_ready(ops_test, primary_name) @@ -113,51 +128,51 @@ async def test_freeze_db_process(ops_test: OpsTest, process: str, continuous_wri ), "secondary not up to date with the cluster after restarting." -@pytest.mark.parametrize("process", DB_PROCESSES) -async def test_restart_db_process(ops_test: OpsTest, process: str, continuous_writes) -> None: - # Locate primary unit. - app = await app_name(ops_test) - primary_name = await get_primary(ops_test, app) - - # Start an application that continuously writes data to the database. - await start_continuous_writes(ops_test, app) - - # Restart the database process. - await send_signal_to_process(ops_test, primary_name, process, "SIGTERM") - - async with ops_test.fast_forward(): - # Verify new writes are continuing by counting the number of writes before and after a - # 2 minutes wait (a db process freeze takes more time to trigger a fail-over). - writes = await count_writes(ops_test, primary_name) - for attempt in Retrying(stop=stop_after_delay(60 * 2), wait=wait_fixed(3)): - with attempt: - more_writes = await count_writes(ops_test, primary_name) - assert more_writes > writes, "writes not continuing to DB" - - # Verify that the database service got restarted and is ready in the old primary. - assert await postgresql_ready(ops_test, primary_name) - - # Verify that a new primary gets elected (ie old primary is secondary). - new_primary_name = await get_primary(ops_test, app, down_unit=primary_name) - assert new_primary_name != primary_name - - # Verify that the old primary is now a replica. - assert await is_replica( - ops_test, primary_name - ), "there are more than one primary in the cluster." - - # Verify that all units are part of the same cluster. - member_ips = await fetch_cluster_members(ops_test) - ip_addresses = [ - await get_unit_address(ops_test, unit.name) - for unit in ops_test.model.applications[app].units - ] - assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster." - - # Verify that no writes to the database were missed after stopping the writes. - total_expected_writes = await check_writes(ops_test) - - # Verify that old primary is up-to-date. - assert await secondary_up_to_date( - ops_test, primary_name, total_expected_writes - ), "secondary not up to date with the cluster after restarting." +# @pytest.mark.parametrize("process", DB_PROCESSES) +# async def test_restart_db_process(ops_test: OpsTest, process: str, continuous_writes) -> None: +# # Locate primary unit. +# app = await app_name(ops_test) +# primary_name = await get_primary(ops_test, app) +# +# # Start an application that continuously writes data to the database. +# await start_continuous_writes(ops_test, app) +# +# # Restart the database process. +# await send_signal_to_process(ops_test, primary_name, process, "SIGTERM") +# +# async with ops_test.fast_forward(): +# # Verify new writes are continuing by counting the number of writes before and after a +# # 2 minutes wait (a db process freeze takes more time to trigger a fail-over). +# writes = await count_writes(ops_test, primary_name) +# for attempt in Retrying(stop=stop_after_delay(60 * 2), wait=wait_fixed(3)): +# with attempt: +# more_writes = await count_writes(ops_test, primary_name) +# assert more_writes > writes, "writes not continuing to DB" +# +# # Verify that the database service got restarted and is ready in the old primary. +# assert await postgresql_ready(ops_test, primary_name) +# +# # Verify that a new primary gets elected (ie old primary is secondary). +# new_primary_name = await get_primary(ops_test, app, down_unit=primary_name) +# assert new_primary_name != primary_name +# +# # Verify that the old primary is now a replica. +# assert await is_replica( +# ops_test, primary_name +# ), "there are more than one primary in the cluster." +# +# # Verify that all units are part of the same cluster. +# member_ips = await fetch_cluster_members(ops_test) +# ip_addresses = [ +# await get_unit_address(ops_test, unit.name) +# for unit in ops_test.model.applications[app].units +# ] +# assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster." +# +# # Verify that no writes to the database were missed after stopping the writes. +# total_expected_writes = await check_writes(ops_test) +# +# # Verify that old primary is up-to-date. +# assert await secondary_up_to_date( +# ops_test, primary_name, total_expected_writes +# ), "secondary not up to date with the cluster after restarting." diff --git a/tests/integration/helpers.py b/tests/integration/helpers.py index 755f3648bb..804ce2a416 100644 --- a/tests/integration/helpers.py +++ b/tests/integration/helpers.py @@ -588,7 +588,7 @@ async def primary_changed(ops_test: OpsTest, old_primary: str) -> bool: """ application = old_primary.split("/")[0] primary = await get_primary(ops_test, application, down_unit=old_primary) - return primary != old_primary + return primary != old_primary and primary != "None" async def restart_patroni(ops_test: OpsTest, unit_name: str) -> None: diff --git a/tests/integration/test_tls.py b/tests/integration/test_tls.py index 818c590e5e..02c9591d83 100644 --- a/tests/integration/test_tls.py +++ b/tests/integration/test_tls.py @@ -4,6 +4,7 @@ import logging import pytest as pytest +import requests from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_delay, wait_exponential @@ -61,13 +62,15 @@ async def test_mattermost_db(ops_test: OpsTest) -> None: # Test TLS being used by pg_rewind. To accomplish that, get the primary unit # and a replica that will be promoted to primary (this should trigger a rewind - # operation when the old primary is started again). + # operation when the old primary is started again). 'verify=False' is used here + # because the unit IP that is used in the test doesn't match the certificate + # hostname (that is a k8s hostname). primary = await get_primary(ops_test) - replica = [ - unit.name - for unit in ops_test.model.applications[DATABASE_APP_NAME].units - if unit.name != primary - ][0] + primary_address = await get_unit_address(ops_test, primary) + cluster_info = requests.get(f"https://{primary_address}:8008/cluster", verify=False) + for member in cluster_info.json()["members"]: + if member["role"] == "replica": + replica = "/".join(member["name"].rsplit("-", 1)) # Enable additional logs on the PostgreSQL instance to check TLS # being used in a later step. @@ -107,12 +110,14 @@ async def test_mattermost_db(ops_test: OpsTest) -> None: connection.close() # Stop the initial primary. + logger.info(f"stopping database on {primary}") await run_command_on_unit(ops_test, primary, "/charm/bin/pebble stop postgresql") # Check that the primary changed. assert await primary_changed(ops_test, primary), "primary not changed" # Restart the initial primary and check the logs to ensure TLS is being used by pg_rewind. + logger.info(f"starting database on {primary}") await run_command_on_unit(ops_test, primary, "/charm/bin/pebble start postgresql") for attempt in Retrying( stop=stop_after_delay(60 * 3), wait=wait_exponential(multiplier=1, min=2, max=30) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 15f0f06c94..733e2636f6 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -442,6 +442,26 @@ def test_postgresql_layer(self, _, __, ___): "group": "postgres", }, }, + "checks": { + "patroni": { + "override": "replace", + "level": "ready", + "exec": { + "command": f"patronictl -c /var/lib/postgresql/data/patroni.yml list patroni-{self.charm._name}", + "user": "postgres", + "group": "postgres", + "environment": { + "PATRONI_KUBERNETES_LABELS": f"{{application: patroni, cluster-name: patroni-{self.charm._name}}}", + "PATRONI_KUBERNETES_NAMESPACE": self.charm._namespace, + "PATRONI_KUBERNETES_USE_ENDPOINTS": "true", + "PATRONI_NAME": "postgresql-k8s-0", + "PATRONI_SCOPE": f"patroni-{self.charm._name}", + "PATRONI_REPLICATION_USERNAME": "replication", + "PATRONI_SUPERUSER_USERNAME": "operator", + }, + }, + } + }, } self.assertDictEqual(plan, expected) From 8dedcce65192622d9d9294e5b409c31ae0d5b9ca Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 10 Apr 2023 14:11:22 -0300 Subject: [PATCH 14/24] Prevent multiple primaries --- src/charm.py | 76 ++++++------ tests/integration/ha_tests/helpers.py | 4 +- .../integration/ha_tests/test_self_healing.py | 114 +++++++++--------- tests/unit/test_charm.py | 17 +-- 4 files changed, 103 insertions(+), 108 deletions(-) diff --git a/src/charm.py b/src/charm.py index abcc5d1eed..72163ee2a0 100755 --- a/src/charm.py +++ b/src/charm.py @@ -402,9 +402,6 @@ def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None: # where the volume is mounted with more restrictive permissions. self._create_pgdata(container) - # Create a new config layer. - new_layer = self._postgresql_layer() - self.unit.set_workload_version(self._patroni.rock_postgresql_version) # Defer the initialization of the workload in the replicas @@ -428,18 +425,8 @@ def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None: event.defer() return - # Get the current layer. - current_layer = container.get_plan() - # Check if there are any changes to layer services. - if current_layer.services != new_layer.services: - # Changes were made, add the new layer. - container.add_layer(self._postgresql_service, new_layer, combine=True) - logging.info("Added updated layer 'postgresql' to Pebble plan") - # TODO: move this file generation to on config changed hook - # when adding configs to this charm. - # Restart it and report a new status to Juju. - container.restart(self._postgresql_service) - logging.info("Restarted postgresql service") + # Start the database service. + self._update_pebble_layers() # Ensure the member is up and running before marking the cluster as initialised. if not self._patroni.member_started: @@ -832,27 +819,21 @@ def _postgresql_layer(self) -> Layer: "group": WORKLOAD_OS_GROUP, }, }, - "checks": { - "patroni": { - "override": "replace", - "level": "ready", - "exec": { - "command": f"patronictl -c {self._storage_path}/patroni.yml list {self.cluster_name}", - "user": WORKLOAD_OS_USER, - "group": WORKLOAD_OS_GROUP, - "environment": { - "PATRONI_KUBERNETES_LABELS": f"{{application: patroni, cluster-name: {self.cluster_name}}}", - "PATRONI_KUBERNETES_NAMESPACE": self._namespace, - "PATRONI_KUBERNETES_USE_ENDPOINTS": "true", - "PATRONI_NAME": pod_name, - "PATRONI_SCOPE": self.cluster_name, - "PATRONI_REPLICATION_USERNAME": REPLICATION_USER, - "PATRONI_SUPERUSER_USERNAME": USER, - }, - }, - } - }, } + if "tls" not in self.unit_peer_data: + layer_config.update( + { + "checks": { + self._postgresql_service: { + "override": "replace", + "level": "ready", + "http": { + "url": f"{self._patroni._patroni_url}/health", + }, + } + } + } + ) return Layer(layer_config) @property @@ -917,6 +898,9 @@ def _restart(self, event: RunWithLock) -> None: self.unit.status = BlockedStatus(error_message) return + # Update health check URL. + self._update_pebble_layers() + # Start or stop the pgBackRest TLS server service when TLS certificate change. self.backup.start_stop_pgbackrest_service() @@ -950,6 +934,28 @@ def update_config(self) -> None: if restart_postgresql: self.on[self.restart_manager.name].acquire_lock.emit() + def _update_pebble_layers(self) -> None: + """Update the pebble layers to keep the health check URL up-to-date.""" + container = self.unit.get_container("postgresql") + + # Get the current layer. + current_layer = container.get_plan() + + # Create a new config layer. + new_layer = self._postgresql_layer() + + # Check if there are any changes to layer services. + if current_layer.services != new_layer.services: + # Changes were made, add the new layer. + container.add_layer(self._postgresql_service, new_layer, combine=True) + logging.info("Added updated layer 'postgresql' to Pebble plan") + container.restart(self._postgresql_service) + logging.info("Restarted postgresql service") + if current_layer.checks != new_layer.checks: + # Changes were made, add the new layer. + container.add_layer(self._postgresql_service, new_layer, combine=True) + logging.info("Updated health checks") + def _unit_name_to_pod_name(self, unit_name: str) -> str: """Converts unit name to pod name. diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 6805dcb40c..25fc9f357a 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -1,5 +1,6 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. +import asyncio from pathlib import Path from typing import Dict, Optional @@ -251,9 +252,8 @@ async def send_signal_to_process( if use_ssh: kill_cmd = f"ssh {unit_name} {command}" - return_code, _, stderr = await ops_test.juju(*kill_cmd.split()) + return_code, _, _ = await asyncio.wait_for(ops_test.juju(*kill_cmd.split()), 10) if return_code != 0: - print(stderr) raise ProcessError( "Expected command %s to succeed instead it failed: %s", command, diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 0a84b125f7..82ef79e923 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -30,7 +30,7 @@ logger = logging.getLogger(__name__) APP_NAME = METADATA["name"] -PATRONI_PROCESS = "/usr/local/bin/patroni" +PATRONI_PROCESS = "/usr/bin/patroni" POSTGRESQL_PROCESS = "postgres" DB_PROCESSES = [POSTGRESQL_PROCESS, PATRONI_PROCESS] @@ -87,21 +87,21 @@ async def test_freeze_db_process(ops_test: OpsTest, process: str, continuous_wri assert new_primary_name != primary_name finally: # Un-freeze the old primary. + for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): + with attempt: + use_ssh = (attempt.retry_state.attempt_number % 2) == 0 + logger.info(f"unfreezing {process}") + await send_signal_to_process( + ops_test, primary_name, process, "SIGCONT", use_ssh + ) if process != PATRONI_PROCESS: for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): with attempt: - use_ssh = (attempt.retry_state.attempt_number % 2) != 0 + use_ssh = (attempt.retry_state.attempt_number % 2) == 0 logger.info(f"unfreezing {PATRONI_PROCESS}") await send_signal_to_process( ops_test, primary_name, PATRONI_PROCESS, "SIGCONT", use_ssh ) - for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): - with attempt: - use_ssh = (attempt.retry_state.attempt_number % 2) != 0 - logger.info(f"unfreezing {process}") - await send_signal_to_process( - ops_test, primary_name, process, "SIGCONT", use_ssh - ) # Verify that the database service got restarted and is ready in the old primary. assert await postgresql_ready(ops_test, primary_name) @@ -128,51 +128,51 @@ async def test_freeze_db_process(ops_test: OpsTest, process: str, continuous_wri ), "secondary not up to date with the cluster after restarting." -# @pytest.mark.parametrize("process", DB_PROCESSES) -# async def test_restart_db_process(ops_test: OpsTest, process: str, continuous_writes) -> None: -# # Locate primary unit. -# app = await app_name(ops_test) -# primary_name = await get_primary(ops_test, app) -# -# # Start an application that continuously writes data to the database. -# await start_continuous_writes(ops_test, app) -# -# # Restart the database process. -# await send_signal_to_process(ops_test, primary_name, process, "SIGTERM") -# -# async with ops_test.fast_forward(): -# # Verify new writes are continuing by counting the number of writes before and after a -# # 2 minutes wait (a db process freeze takes more time to trigger a fail-over). -# writes = await count_writes(ops_test, primary_name) -# for attempt in Retrying(stop=stop_after_delay(60 * 2), wait=wait_fixed(3)): -# with attempt: -# more_writes = await count_writes(ops_test, primary_name) -# assert more_writes > writes, "writes not continuing to DB" -# -# # Verify that the database service got restarted and is ready in the old primary. -# assert await postgresql_ready(ops_test, primary_name) -# -# # Verify that a new primary gets elected (ie old primary is secondary). -# new_primary_name = await get_primary(ops_test, app, down_unit=primary_name) -# assert new_primary_name != primary_name -# -# # Verify that the old primary is now a replica. -# assert await is_replica( -# ops_test, primary_name -# ), "there are more than one primary in the cluster." -# -# # Verify that all units are part of the same cluster. -# member_ips = await fetch_cluster_members(ops_test) -# ip_addresses = [ -# await get_unit_address(ops_test, unit.name) -# for unit in ops_test.model.applications[app].units -# ] -# assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster." -# -# # Verify that no writes to the database were missed after stopping the writes. -# total_expected_writes = await check_writes(ops_test) -# -# # Verify that old primary is up-to-date. -# assert await secondary_up_to_date( -# ops_test, primary_name, total_expected_writes -# ), "secondary not up to date with the cluster after restarting." +@pytest.mark.parametrize("process", DB_PROCESSES) +async def test_restart_db_process(ops_test: OpsTest, process: str, continuous_writes) -> None: + # Locate primary unit. + app = await app_name(ops_test) + primary_name = await get_primary(ops_test, app) + + # Start an application that continuously writes data to the database. + await start_continuous_writes(ops_test, app) + + # Restart the database process. + await send_signal_to_process(ops_test, primary_name, process, "SIGTERM") + + async with ops_test.fast_forward(): + # Verify new writes are continuing by counting the number of writes before and after a + # 2 minutes wait (a db process freeze takes more time to trigger a fail-over). + writes = await count_writes(ops_test, primary_name) + for attempt in Retrying(stop=stop_after_delay(60 * 2), wait=wait_fixed(3)): + with attempt: + more_writes = await count_writes(ops_test, primary_name) + assert more_writes > writes, "writes not continuing to DB" + + # Verify that the database service got restarted and is ready in the old primary. + assert await postgresql_ready(ops_test, primary_name) + + # Verify that a new primary gets elected (ie old primary is secondary). + new_primary_name = await get_primary(ops_test, app, down_unit=primary_name) + assert new_primary_name != primary_name + + # Verify that the old primary is now a replica. + assert await is_replica( + ops_test, primary_name + ), "there are more than one primary in the cluster." + + # Verify that all units are part of the same cluster. + member_ips = await fetch_cluster_members(ops_test) + ip_addresses = [ + await get_unit_address(ops_test, unit.name) + for unit in ops_test.model.applications[app].units + ] + assert set(member_ips) == set(ip_addresses), "not all units are part of the same cluster." + + # Verify that no writes to the database were missed after stopping the writes. + total_expected_writes = await check_writes(ops_test) + + # Verify that old primary is up-to-date. + assert await secondary_up_to_date( + ops_test, primary_name, total_expected_writes + ), "secondary not up to date with the cluster after restarting." diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 733e2636f6..4c7a72073a 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -443,22 +443,11 @@ def test_postgresql_layer(self, _, __, ___): }, }, "checks": { - "patroni": { + self._postgresql_service: { "override": "replace", "level": "ready", - "exec": { - "command": f"patronictl -c /var/lib/postgresql/data/patroni.yml list patroni-{self.charm._name}", - "user": "postgres", - "group": "postgres", - "environment": { - "PATRONI_KUBERNETES_LABELS": f"{{application: patroni, cluster-name: patroni-{self.charm._name}}}", - "PATRONI_KUBERNETES_NAMESPACE": self.charm._namespace, - "PATRONI_KUBERNETES_USE_ENDPOINTS": "true", - "PATRONI_NAME": "postgresql-k8s-0", - "PATRONI_SCOPE": f"patroni-{self.charm._name}", - "PATRONI_REPLICATION_USERNAME": "replication", - "PATRONI_SUPERUSER_USERNAME": "operator", - }, + "http": { + "url": "http://postgresql-k8s-0.postgresql-k8s-endpoints:8008/health", }, } }, From 4efaf74770a3dc947b3eaa94007d10fa3856cc97 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Mon, 10 Apr 2023 14:46:49 -0300 Subject: [PATCH 15/24] Remove unused code --- src/charm.py | 9 -------- .../ha_tests/application-charm/src/charm.py | 21 ------------------- .../src/continuous_writes.py | 5 ----- 3 files changed, 35 deletions(-) diff --git a/src/charm.py b/src/charm.py index 72163ee2a0..1a9c6b0b09 100755 --- a/src/charm.py +++ b/src/charm.py @@ -210,9 +210,6 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None: self.postgresql_client_relation.update_read_only_endpoint() self._remove_from_endpoints(endpoints_to_remove) - # Update the replication configuration. - self._patroni.reload_patroni_configuration() - def _on_peer_relation_changed(self, event: RelationChangedEvent) -> None: """Reconfigure cluster members.""" # The cluster must be initialized first in the leader unit @@ -377,12 +374,6 @@ def _on_leader_elected(self, event: LeaderElectedEvent) -> None: self._add_members(event) - # Update the replication configuration. - try: - self._patroni.reload_patroni_configuration() - except RetryError: - pass # This error can happen in the first leader election, as Patroni is not running yet. - def _create_pgdata(self, container: Container): """Create the PostgreSQL data directory.""" path = f"{self._storage_path}/pgdata" diff --git a/tests/integration/ha_tests/application-charm/src/charm.py b/tests/integration/ha_tests/application-charm/src/charm.py index d40e4d91e9..7d96a1b59b 100755 --- a/tests/integration/ha_tests/application-charm/src/charm.py +++ b/tests/integration/ha_tests/application-charm/src/charm.py @@ -54,7 +54,6 @@ def __init__(self, *args): # Events related to the database that is requested. self.database_name = "application" self.database = DatabaseRequires(self, "database", self.database_name) - self.framework.observe(self.database.on.endpoints_changed, self._on_endpoints_changed) self.framework.observe( self.on.clear_continuous_writes_action, self._on_clear_continuous_writes_action ) @@ -89,26 +88,6 @@ def _on_start(self, _) -> None: """Only sets an Active status.""" self.unit.status = ActiveStatus() - def _on_endpoints_changed(self, _) -> None: - """Event triggered when the read/write endpoints of the database change.""" - if self._connection_string is None: - return - - if not self.app_peer_data.get(PROC_PID_KEY): - return None - - with open(CONFIG_FILE, "w") as fd: - fd.write(self._connection_string) - os.fsync(fd) - - try: - os.kill(int(self.app_peer_data[PROC_PID_KEY]), signal.SIGKILL) - except ProcessLookupError: - del self.app_peer_data[PROC_PID_KEY] - return - count = self._count_writes() - self._start_continuous_writes(count + 1) - def _count_writes(self) -> int: """Count the number of records in the continuous_writes table.""" with psycopg2.connect( diff --git a/tests/integration/ha_tests/application-charm/src/continuous_writes.py b/tests/integration/ha_tests/application-charm/src/continuous_writes.py index ab261e6b0a..c392e9221f 100644 --- a/tests/integration/ha_tests/application-charm/src/continuous_writes.py +++ b/tests/integration/ha_tests/application-charm/src/continuous_writes.py @@ -17,10 +17,6 @@ def sigterm_handler(_signo, _stack_frame): run = False -def sighup_handler(_signo, _stack_frame): - read_config_file() - - def read_config_file(): with open("/tmp/continuous_writes_config") as fd: global connection_string @@ -76,5 +72,4 @@ def main(): if __name__ == "__main__": signal.signal(signal.SIGTERM, sigterm_handler) - signal.signal(signal.SIGHUP, sighup_handler) main() From d01c7e8f87eee56ad2caef3e57b4a448cfe2377a Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Apr 2023 07:50:21 -0300 Subject: [PATCH 16/24] Check max number written --- tests/integration/ha_tests/helpers.py | 17 +++++++++++------ tests/integration/ha_tests/test_self_healing.py | 8 ++++---- 2 files changed, 15 insertions(+), 10 deletions(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 25fc9f357a..936d67618d 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -2,7 +2,7 @@ # See LICENSE file for licensing details. import asyncio from pathlib import Path -from typing import Dict, Optional +from typing import Dict, Optional, Tuple import psycopg2 import requests @@ -64,12 +64,15 @@ async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int]) async def check_writes(ops_test) -> int: """Gets the total writes from the test charm and compares to the writes from db.""" total_expected_writes = await stop_continuous_writes(ops_test) - actual_writes = await count_writes(ops_test) + actual_writes, max_number_written = await count_writes(ops_test) + assert ( + actual_writes == max_number_written + ), "writes to the db were missed: count of actual writes different from the max number written." assert total_expected_writes == actual_writes, "writes to the db were missed." return total_expected_writes -async def count_writes(ops_test: OpsTest, down_unit: str = None) -> int: +async def count_writes(ops_test: OpsTest, down_unit: str = None) -> Tuple[int, int]: """Count the number of writes in the database.""" app = await app_name(ops_test) password = await get_password(ops_test, database_app_name=app, down_unit=down_unit) @@ -96,10 +99,12 @@ async def count_writes(ops_test: OpsTest, down_unit: str = None) -> int: ) with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: - cursor.execute("SELECT COUNT(number) FROM continuous_writes;") - count = cursor.fetchone()[0] + cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") + results = cursor.fetchone() + count = results[0] + max = results[1] connection.close() - return count + return count, max async def fetch_cluster_members(ops_test: OpsTest): diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 82ef79e923..fa9707aaca 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -74,10 +74,10 @@ async def test_freeze_db_process(ops_test: OpsTest, process: str, continuous_wri # Verify new writes are continuing by counting the number of writes before and after a # 3 minutes wait (a db process freeze takes more time to trigger a fail-over). try: - writes = await count_writes(ops_test, primary_name) + writes, _ = await count_writes(ops_test, primary_name) for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): with attempt: - more_writes = await count_writes(ops_test, primary_name) + more_writes, _ = await count_writes(ops_test, primary_name) assert more_writes > writes, "writes not continuing to DB" # Verify that a new primary gets elected (ie old primary is secondary). @@ -143,10 +143,10 @@ async def test_restart_db_process(ops_test: OpsTest, process: str, continuous_wr async with ops_test.fast_forward(): # Verify new writes are continuing by counting the number of writes before and after a # 2 minutes wait (a db process freeze takes more time to trigger a fail-over). - writes = await count_writes(ops_test, primary_name) + writes, _ = await count_writes(ops_test, primary_name) for attempt in Retrying(stop=stop_after_delay(60 * 2), wait=wait_fixed(3)): with attempt: - more_writes = await count_writes(ops_test, primary_name) + more_writes, _ = await count_writes(ops_test, primary_name) assert more_writes > writes, "writes not continuing to DB" # Verify that the database service got restarted and is ready in the old primary. From 3cc2f06b0984e8e95a513a869ac90c4ed2d28bad Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Apr 2023 08:26:41 -0300 Subject: [PATCH 17/24] Check writes on all instances --- tests/integration/ha_tests/helpers.py | 55 ++++++++++++------- .../integration/ha_tests/test_self_healing.py | 18 +----- 2 files changed, 38 insertions(+), 35 deletions(-) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index 936d67618d..f5bce5c459 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -65,14 +65,27 @@ async def check_writes(ops_test) -> int: """Gets the total writes from the test charm and compares to the writes from db.""" total_expected_writes = await stop_continuous_writes(ops_test) actual_writes, max_number_written = await count_writes(ops_test) - assert ( - actual_writes == max_number_written - ), "writes to the db were missed: count of actual writes different from the max number written." - assert total_expected_writes == actual_writes, "writes to the db were missed." + for member, count in actual_writes.items(): + assert ( + count == max_number_written[member] + ), f"{member}: writes to the db were missed: count of actual writes different from the max number written." + assert total_expected_writes == count, f"{member}: writes to the db were missed." return total_expected_writes -async def count_writes(ops_test: OpsTest, down_unit: str = None) -> Tuple[int, int]: +async def check_writes_are_increasing(ops_test, down_unit: str) -> None: + """Verify new writes are continuing by counting the number of writes.""" + writes, _ = await count_writes(ops_test, down_unit=down_unit) + for member, count in writes.items(): + for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)): + with attempt: + more_writes, _ = await count_writes(ops_test, down_unit=down_unit) + assert more_writes[member] > count, f"{member}: writes not continuing to DB" + + +async def count_writes( + ops_test: OpsTest, down_unit: str = None +) -> Tuple[Dict[str, int], Dict[str, int]]: """Count the number of writes in the database.""" app = await app_name(ops_test) password = await get_password(ops_test, database_app_name=app, down_unit=down_unit) @@ -81,29 +94,31 @@ async def count_writes(ops_test: OpsTest, down_unit: str = None) -> Tuple[int, i if unit_name != down_unit: cluster = get_patroni_cluster(unit["address"]) break + count = {} + max = {} for member in cluster["members"]: if member["role"] != "replica" and member["host"].split(".")[0] != ( down_unit or "" ).replace("/", "-"): host = member["host"] - # Translate the service hostname to an IP address. - model = ops_test.model.info - client = Client(namespace=model.name) - service = client.get(Pod, name=host.split(".")[0]) - ip = service.status.podIP + # Translate the service hostname to an IP address. + model = ops_test.model.info + client = Client(namespace=model.name) + service = client.get(Pod, name=host.split(".")[0]) + ip = service.status.podIP - connection_string = ( - f"dbname='application' user='operator'" - f" host='{ip}' password='{password}' connect_timeout=10" - ) + connection_string = ( + f"dbname='application' user='operator'" + f" host='{ip}' password='{password}' connect_timeout=10" + ) - with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: - cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") - results = cursor.fetchone() - count = results[0] - max = results[1] - connection.close() + with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor: + cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;") + results = cursor.fetchone() + count[member["name"]] = results[0] + max[member["name"]] = results[1] + connection.close() return count, max diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index fa9707aaca..9125101155 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -11,7 +11,7 @@ from tests.integration.ha_tests.helpers import ( METADATA, check_writes, - count_writes, + check_writes_are_increasing, fetch_cluster_members, get_primary, is_replica, @@ -71,14 +71,8 @@ async def test_freeze_db_process(ops_test: OpsTest, process: str, continuous_wri await send_signal_to_process(ops_test, primary_name, process, "SIGSTOP") async with ops_test.fast_forward(): - # Verify new writes are continuing by counting the number of writes before and after a - # 3 minutes wait (a db process freeze takes more time to trigger a fail-over). try: - writes, _ = await count_writes(ops_test, primary_name) - for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): - with attempt: - more_writes, _ = await count_writes(ops_test, primary_name) - assert more_writes > writes, "writes not continuing to DB" + await check_writes_are_increasing(ops_test, primary_name) # Verify that a new primary gets elected (ie old primary is secondary). for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): @@ -141,13 +135,7 @@ async def test_restart_db_process(ops_test: OpsTest, process: str, continuous_wr await send_signal_to_process(ops_test, primary_name, process, "SIGTERM") async with ops_test.fast_forward(): - # Verify new writes are continuing by counting the number of writes before and after a - # 2 minutes wait (a db process freeze takes more time to trigger a fail-over). - writes, _ = await count_writes(ops_test, primary_name) - for attempt in Retrying(stop=stop_after_delay(60 * 2), wait=wait_fixed(3)): - with attempt: - more_writes, _ = await count_writes(ops_test, primary_name) - assert more_writes > writes, "writes not continuing to DB" + await check_writes_are_increasing(ops_test, primary_name) # Verify that the database service got restarted and is ready in the old primary. assert await postgresql_ready(ops_test, primary_name) From 84d810d13eaedb5dc22e0cd2bc8faf6489dd0484 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Apr 2023 08:54:06 -0300 Subject: [PATCH 18/24] Remove master_start_timeout setting --- tests/integration/ha_tests/conftest.py | 16 ------- tests/integration/ha_tests/helpers.py | 46 +------------------ .../integration/ha_tests/test_self_healing.py | 2 +- 3 files changed, 3 insertions(+), 61 deletions(-) diff --git a/tests/integration/ha_tests/conftest.py b/tests/integration/ha_tests/conftest.py index 5439288eb2..9956d3e196 100644 --- a/tests/integration/ha_tests/conftest.py +++ b/tests/integration/ha_tests/conftest.py @@ -5,11 +5,6 @@ from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_delay, wait_fixed -from tests.integration.ha_tests.helpers import ( - change_master_start_timeout, - get_master_start_timeout, -) - APPLICATION_NAME = "application" @@ -27,14 +22,3 @@ async def continuous_writes(ops_test: OpsTest) -> None: ) await action.wait() assert action.results["result"] == "True", "Unable to clear up continuous_writes table" - - -@pytest.fixture(scope="module") -async def master_start_timeout(ops_test: OpsTest) -> None: - """Temporary change the master start timeout configuration.""" - # Change the parameter that makes the primary reelection faster. - initial_master_start_timeout = await get_master_start_timeout(ops_test) - await change_master_start_timeout(ops_test, 0) - yield - # Rollback to the initial configuration. - await change_master_start_timeout(ops_test, initial_master_start_timeout) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index f5bce5c459..d0a939867c 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -2,7 +2,7 @@ # See LICENSE file for licensing details. import asyncio from pathlib import Path -from typing import Dict, Optional, Tuple +from typing import Dict, Tuple import psycopg2 import requests @@ -15,12 +15,7 @@ from pytest_operator.plugin import OpsTest from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed -from tests.integration.helpers import ( - app_name, - get_password, - get_primary, - get_unit_address, -) +from tests.integration.helpers import app_name, get_password, get_unit_address METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) PORT = 5432 @@ -43,24 +38,6 @@ def get_patroni_cluster(unit_ip: str) -> Dict[str, str]: return resp.json() -async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int]) -> None: - """Change master start timeout configuration. - - Args: - ops_test: ops_test instance. - seconds: number of seconds to set in master_start_timeout configuration. - """ - for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)): - with attempt: - app = await app_name(ops_test) - primary_name = await get_primary(ops_test, app) - unit_ip = await get_unit_address(ops_test, primary_name) - requests.patch( - f"http://{unit_ip}:8008/config", - json={"master_start_timeout": seconds}, - ) - - async def check_writes(ops_test) -> int: """Gets the total writes from the test charm and compares to the writes from db.""" total_expected_writes = await stop_continuous_writes(ops_test) @@ -152,25 +129,6 @@ def get_host_ip(host: str) -> str: return member_ips -async def get_master_start_timeout(ops_test: OpsTest) -> Optional[int]: - """Get the master start timeout configuration. - - Args: - ops_test: ops_test instance. - - Returns: - master start timeout in seconds or None if it's using the default value. - """ - for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)): - with attempt: - app = await app_name(ops_test) - primary_name = await get_primary(ops_test, app) - unit_ip = await get_unit_address(ops_test, primary_name) - configuration_info = requests.get(f"http://{unit_ip}:8008/config") - master_start_timeout = configuration_info.json().get("master_start_timeout") - return int(master_start_timeout) if master_start_timeout is not None else None - - async def is_replica(ops_test: OpsTest, unit_name: str) -> bool: """Returns whether the unit a replica in the cluster.""" unit_ip = await get_unit_address(ops_test, unit_name) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 9125101155..2b7d2983ca 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -13,7 +13,6 @@ check_writes, check_writes_are_increasing, fetch_cluster_members, - get_primary, is_replica, postgresql_ready, secondary_up_to_date, @@ -24,6 +23,7 @@ CHARM_SERIES, app_name, build_and_deploy, + get_primary, get_unit_address, ) From 2278f803665b6ad034673525cb9db86e15b75cbc Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Apr 2023 10:24:27 -0300 Subject: [PATCH 19/24] Improve logs retrieval --- tests/integration/test_tls.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_tls.py b/tests/integration/test_tls.py index 02c9591d83..2bb020f355 100644 --- a/tests/integration/test_tls.py +++ b/tests/integration/test_tls.py @@ -127,7 +127,7 @@ async def test_mattermost_db(ops_test: OpsTest) -> None: logs = await run_command_on_unit( ops_test, replica, - 'bash -c "cat /var/log/postgresql/postgresql.log | grep rewind"', + "grep rewind /var/log/postgresql/postgresql.log", ) assert ( "connection authorized: user=rewind database=postgres SSL enabled" in logs From 364adae7f174bbfe307007e3facd5458e518d5c7 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Apr 2023 12:51:09 -0300 Subject: [PATCH 20/24] Add CA chain to trusted certificates --- src/charm.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/src/charm.py b/src/charm.py index 1a9c6b0b09..200d2eddae 100755 --- a/src/charm.py +++ b/src/charm.py @@ -810,21 +810,16 @@ def _postgresql_layer(self) -> Layer: "group": WORKLOAD_OS_GROUP, }, }, - } - if "tls" not in self.unit_peer_data: - layer_config.update( - { - "checks": { - self._postgresql_service: { - "override": "replace", - "level": "ready", - "http": { - "url": f"{self._patroni._patroni_url}/health", - }, - } - } + "checks": { + self._postgresql_service: { + "override": "replace", + "level": "ready", + "http": { + "url": f"{self._patroni._patroni_url}/health", + }, } - ) + }, + } return Layer(layer_config) @property @@ -862,6 +857,15 @@ def push_tls_files_to_workload(self, container: Container = None) -> None: user=WORKLOAD_OS_USER, group=WORKLOAD_OS_GROUP, ) + container.push( + "/usr/local/share/ca-certificates/ca.crt", + ca, + make_dirs=True, + permissions=0o400, + user=WORKLOAD_OS_USER, + group=WORKLOAD_OS_GROUP, + ) + container.exec(["update-ca-certificates"]).wait() if cert is not None: container.push( f"{self._storage_path}/{TLS_CERT_FILE}", From b79aa19b4a9f197c575d25345c780b47bb9f2d0d Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Apr 2023 15:47:58 -0300 Subject: [PATCH 21/24] Revert "Remove master_start_timeout setting" This reverts commit 84d810d13eaedb5dc22e0cd2bc8faf6489dd0484. --- tests/integration/ha_tests/conftest.py | 16 +++++++ tests/integration/ha_tests/helpers.py | 46 ++++++++++++++++++- .../integration/ha_tests/test_self_healing.py | 2 +- 3 files changed, 61 insertions(+), 3 deletions(-) diff --git a/tests/integration/ha_tests/conftest.py b/tests/integration/ha_tests/conftest.py index 9956d3e196..5439288eb2 100644 --- a/tests/integration/ha_tests/conftest.py +++ b/tests/integration/ha_tests/conftest.py @@ -5,6 +5,11 @@ from pytest_operator.plugin import OpsTest from tenacity import Retrying, stop_after_delay, wait_fixed +from tests.integration.ha_tests.helpers import ( + change_master_start_timeout, + get_master_start_timeout, +) + APPLICATION_NAME = "application" @@ -22,3 +27,14 @@ async def continuous_writes(ops_test: OpsTest) -> None: ) await action.wait() assert action.results["result"] == "True", "Unable to clear up continuous_writes table" + + +@pytest.fixture(scope="module") +async def master_start_timeout(ops_test: OpsTest) -> None: + """Temporary change the master start timeout configuration.""" + # Change the parameter that makes the primary reelection faster. + initial_master_start_timeout = await get_master_start_timeout(ops_test) + await change_master_start_timeout(ops_test, 0) + yield + # Rollback to the initial configuration. + await change_master_start_timeout(ops_test, initial_master_start_timeout) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index d0a939867c..f5bce5c459 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -2,7 +2,7 @@ # See LICENSE file for licensing details. import asyncio from pathlib import Path -from typing import Dict, Tuple +from typing import Dict, Optional, Tuple import psycopg2 import requests @@ -15,7 +15,12 @@ from pytest_operator.plugin import OpsTest from tenacity import RetryError, Retrying, stop_after_delay, wait_fixed -from tests.integration.helpers import app_name, get_password, get_unit_address +from tests.integration.helpers import ( + app_name, + get_password, + get_primary, + get_unit_address, +) METADATA = yaml.safe_load(Path("./metadata.yaml").read_text()) PORT = 5432 @@ -38,6 +43,24 @@ def get_patroni_cluster(unit_ip: str) -> Dict[str, str]: return resp.json() +async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int]) -> None: + """Change master start timeout configuration. + + Args: + ops_test: ops_test instance. + seconds: number of seconds to set in master_start_timeout configuration. + """ + for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)): + with attempt: + app = await app_name(ops_test) + primary_name = await get_primary(ops_test, app) + unit_ip = await get_unit_address(ops_test, primary_name) + requests.patch( + f"http://{unit_ip}:8008/config", + json={"master_start_timeout": seconds}, + ) + + async def check_writes(ops_test) -> int: """Gets the total writes from the test charm and compares to the writes from db.""" total_expected_writes = await stop_continuous_writes(ops_test) @@ -129,6 +152,25 @@ def get_host_ip(host: str) -> str: return member_ips +async def get_master_start_timeout(ops_test: OpsTest) -> Optional[int]: + """Get the master start timeout configuration. + + Args: + ops_test: ops_test instance. + + Returns: + master start timeout in seconds or None if it's using the default value. + """ + for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)): + with attempt: + app = await app_name(ops_test) + primary_name = await get_primary(ops_test, app) + unit_ip = await get_unit_address(ops_test, primary_name) + configuration_info = requests.get(f"http://{unit_ip}:8008/config") + master_start_timeout = configuration_info.json().get("master_start_timeout") + return int(master_start_timeout) if master_start_timeout is not None else None + + async def is_replica(ops_test: OpsTest, unit_name: str) -> bool: """Returns whether the unit a replica in the cluster.""" unit_ip = await get_unit_address(ops_test, unit_name) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 2b7d2983ca..9125101155 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -13,6 +13,7 @@ check_writes, check_writes_are_increasing, fetch_cluster_members, + get_primary, is_replica, postgresql_ready, secondary_up_to_date, @@ -23,7 +24,6 @@ CHARM_SERIES, app_name, build_and_deploy, - get_primary, get_unit_address, ) From ed77452370fede9360cad43451740a62eeaf360a Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Apr 2023 17:22:05 -0300 Subject: [PATCH 22/24] Use the right PG process in the HA tests --- tests/integration/ha_tests/conftest.py | 14 +++++----- tests/integration/ha_tests/helpers.py | 18 ++++++------ .../integration/ha_tests/test_self_healing.py | 28 +++++++++++++++---- 3 files changed, 39 insertions(+), 21 deletions(-) diff --git a/tests/integration/ha_tests/conftest.py b/tests/integration/ha_tests/conftest.py index 5439288eb2..db7f902b0a 100644 --- a/tests/integration/ha_tests/conftest.py +++ b/tests/integration/ha_tests/conftest.py @@ -6,8 +6,8 @@ from tenacity import Retrying, stop_after_delay, wait_fixed from tests.integration.ha_tests.helpers import ( - change_master_start_timeout, - get_master_start_timeout, + change_primary_start_timeout, + get_primary_start_timeout, ) APPLICATION_NAME = "application" @@ -30,11 +30,11 @@ async def continuous_writes(ops_test: OpsTest) -> None: @pytest.fixture(scope="module") -async def master_start_timeout(ops_test: OpsTest) -> None: - """Temporary change the master start timeout configuration.""" +async def primary_start_timeout(ops_test: OpsTest) -> None: + """Temporary change the primary start timeout configuration.""" # Change the parameter that makes the primary reelection faster. - initial_master_start_timeout = await get_master_start_timeout(ops_test) - await change_master_start_timeout(ops_test, 0) + initial_primary_start_timeout = await get_primary_start_timeout(ops_test) + await change_primary_start_timeout(ops_test, 0) yield # Rollback to the initial configuration. - await change_master_start_timeout(ops_test, initial_master_start_timeout) + await change_primary_start_timeout(ops_test, initial_primary_start_timeout) diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index f5bce5c459..3caa1c8448 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -43,12 +43,12 @@ def get_patroni_cluster(unit_ip: str) -> Dict[str, str]: return resp.json() -async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int]) -> None: - """Change master start timeout configuration. +async def change_primary_start_timeout(ops_test: OpsTest, seconds: Optional[int]) -> None: + """Change primary start timeout configuration. Args: ops_test: ops_test instance. - seconds: number of seconds to set in master_start_timeout configuration. + seconds: number of seconds to set in primary_start_timeout configuration. """ for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)): with attempt: @@ -57,7 +57,7 @@ async def change_master_start_timeout(ops_test: OpsTest, seconds: Optional[int]) unit_ip = await get_unit_address(ops_test, primary_name) requests.patch( f"http://{unit_ip}:8008/config", - json={"master_start_timeout": seconds}, + json={"primary_start_timeout": seconds}, ) @@ -152,14 +152,14 @@ def get_host_ip(host: str) -> str: return member_ips -async def get_master_start_timeout(ops_test: OpsTest) -> Optional[int]: - """Get the master start timeout configuration. +async def get_primary_start_timeout(ops_test: OpsTest) -> Optional[int]: + """Get the primary start timeout configuration. Args: ops_test: ops_test instance. Returns: - master start timeout in seconds or None if it's using the default value. + primary start timeout in seconds or None if it's using the default value. """ for attempt in Retrying(stop=stop_after_delay(30 * 2), wait=wait_fixed(3)): with attempt: @@ -167,8 +167,8 @@ async def get_master_start_timeout(ops_test: OpsTest) -> Optional[int]: primary_name = await get_primary(ops_test, app) unit_ip = await get_unit_address(ops_test, primary_name) configuration_info = requests.get(f"http://{unit_ip}:8008/config") - master_start_timeout = configuration_info.json().get("master_start_timeout") - return int(master_start_timeout) if master_start_timeout is not None else None + primary_start_timeout = configuration_info.json().get("primary_start_timeout") + return int(primary_start_timeout) if primary_start_timeout is not None else None async def is_replica(ops_test: OpsTest, unit_name: str) -> bool: diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 9125101155..86a8bb89ff 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -2,6 +2,7 @@ # Copyright 2022 Canonical Ltd. # See LICENSE file for licensing details. import logging +from time import sleep import pytest from pytest_operator.plugin import OpsTest @@ -31,8 +32,9 @@ APP_NAME = METADATA["name"] PATRONI_PROCESS = "/usr/bin/patroni" -POSTGRESQL_PROCESS = "postgres" +POSTGRESQL_PROCESS = "/usr/lib/postgresql/14/bin/postgres" DB_PROCESSES = [POSTGRESQL_PROCESS, PATRONI_PROCESS] +MEDIAN_ELECTION_TIME = 10 @pytest.mark.abort_on_fail @@ -58,8 +60,10 @@ async def test_build_and_deploy(ops_test: OpsTest) -> None: await ops_test.model.wait_for_idle(status="active", timeout=1000) -@pytest.mark.parametrize("process", [POSTGRESQL_PROCESS]) -async def test_freeze_db_process(ops_test: OpsTest, process: str, continuous_writes) -> None: +@pytest.mark.parametrize("process", [PATRONI_PROCESS]) +async def test_freeze_db_process( + ops_test: OpsTest, process: str, continuous_writes, primary_start_timeout +) -> None: # Locate primary unit. app = await app_name(ops_test) primary_name = await get_primary(ops_test, app) @@ -70,6 +74,9 @@ async def test_freeze_db_process(ops_test: OpsTest, process: str, continuous_wri # Freeze the database process. await send_signal_to_process(ops_test, primary_name, process, "SIGSTOP") + # Wait some time to elect a new primary. + sleep(MEDIAN_ELECTION_TIME * 2) + async with ops_test.fast_forward(): try: await check_writes_are_increasing(ops_test, primary_name) @@ -123,16 +130,27 @@ async def test_freeze_db_process(ops_test: OpsTest, process: str, continuous_wri @pytest.mark.parametrize("process", DB_PROCESSES) -async def test_restart_db_process(ops_test: OpsTest, process: str, continuous_writes) -> None: +async def test_restart_db_process( + ops_test: OpsTest, process: str, continuous_writes, primary_start_timeout +) -> None: + # Set signal based on the process + if process == PATRONI_PROCESS: + signal = "SIGTERM" + else: + signal = "SIGINT" + # Locate primary unit. app = await app_name(ops_test) primary_name = await get_primary(ops_test, app) + # Wait some time to elect a new primary. + sleep(MEDIAN_ELECTION_TIME * 2) + # Start an application that continuously writes data to the database. await start_continuous_writes(ops_test, app) # Restart the database process. - await send_signal_to_process(ops_test, primary_name, process, "SIGTERM") + await send_signal_to_process(ops_test, primary_name, process, signal) async with ops_test.fast_forward(): await check_writes_are_increasing(ops_test, primary_name) From f89342eecf8c50ea2a0b786d3afc89108db0c2d1 Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Apr 2023 17:50:07 -0300 Subject: [PATCH 23/24] Fix order in test --- tests/integration/ha_tests/test_self_healing.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 86a8bb89ff..09ee256580 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -143,15 +143,15 @@ async def test_restart_db_process( app = await app_name(ops_test) primary_name = await get_primary(ops_test, app) - # Wait some time to elect a new primary. - sleep(MEDIAN_ELECTION_TIME * 2) - # Start an application that continuously writes data to the database. await start_continuous_writes(ops_test, app) # Restart the database process. await send_signal_to_process(ops_test, primary_name, process, signal) + # Wait some time to elect a new primary. + sleep(MEDIAN_ELECTION_TIME * 2) + async with ops_test.fast_forward(): await check_writes_are_increasing(ops_test, primary_name) From c29e5cd37540872c7e19973598d8c77576d5388a Mon Sep 17 00:00:00 2001 From: Marcelo Henrique Neppel Date: Tue, 11 Apr 2023 18:19:10 -0300 Subject: [PATCH 24/24] Remove unused call to SIGCONT --- tests/integration/ha_tests/test_self_healing.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tests/integration/ha_tests/test_self_healing.py b/tests/integration/ha_tests/test_self_healing.py index 09ee256580..8cbd8536f8 100644 --- a/tests/integration/ha_tests/test_self_healing.py +++ b/tests/integration/ha_tests/test_self_healing.py @@ -95,14 +95,6 @@ async def test_freeze_db_process( await send_signal_to_process( ops_test, primary_name, process, "SIGCONT", use_ssh ) - if process != PATRONI_PROCESS: - for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3)): - with attempt: - use_ssh = (attempt.retry_state.attempt_number % 2) == 0 - logger.info(f"unfreezing {PATRONI_PROCESS}") - await send_signal_to_process( - ops_test, primary_name, PATRONI_PROCESS, "SIGCONT", use_ssh - ) # Verify that the database service got restarted and is ready in the old primary. assert await postgresql_ready(ops_test, primary_name)