From 7686dd0278adea33e74197b1e4a1105e14b19084 Mon Sep 17 00:00:00 2001 From: Dragomir Penev <6687393+dragomirp@users.noreply.github.com> Date: Tue, 25 Mar 2025 04:48:39 +0200 Subject: [PATCH 1/3] [MISC] Use latest/stable lxd (#804) * Use latest stable lxd * Test tweaks * Test tweaks --- concierge.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/concierge.yaml b/concierge.yaml index 15a78cc947..29d78b95b5 100644 --- a/concierge.yaml +++ b/concierge.yaml @@ -5,6 +5,7 @@ providers: lxd: enable: true bootstrap: true + channel: latest/stable host: snaps: jhack: From 5b34439898bfe201294268504d7a2176cccffc39 Mon Sep 17 00:00:00 2001 From: "renovate[bot]" <29139614+renovate[bot]@users.noreply.github.com> Date: Wed, 26 Mar 2025 23:23:32 +0100 Subject: [PATCH 2/3] Update canonical/data-platform-workflows action to v31.0.1 (#805) Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com> --- .github/workflows/approve_renovate_pr.yaml | 2 +- .github/workflows/check_pr.yaml | 2 +- .github/workflows/ci.yaml | 4 ++-- .github/workflows/promote.yaml | 2 +- .github/workflows/release.yaml | 2 +- .github/workflows/sync_docs.yaml | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/approve_renovate_pr.yaml b/.github/workflows/approve_renovate_pr.yaml index 422766f800..450111576b 100644 --- a/.github/workflows/approve_renovate_pr.yaml +++ b/.github/workflows/approve_renovate_pr.yaml @@ -10,6 +10,6 @@ on: jobs: approve-pr: name: Approve Renovate pull request - uses: canonical/data-platform-workflows/.github/workflows/approve_renovate_pr.yaml@v31.0.0 + uses: canonical/data-platform-workflows/.github/workflows/approve_renovate_pr.yaml@v31.0.1 permissions: pull-requests: write # Needed to approve PR diff --git a/.github/workflows/check_pr.yaml b/.github/workflows/check_pr.yaml index beaa1541a3..84c56d20c7 100644 --- a/.github/workflows/check_pr.yaml +++ b/.github/workflows/check_pr.yaml @@ -15,4 +15,4 @@ on: jobs: check-pr: name: Check pull request - uses: canonical/data-platform-workflows/.github/workflows/check_charm_pr.yaml@v31.0.0 + uses: canonical/data-platform-workflows/.github/workflows/check_charm_pr.yaml@v31.0.1 diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 9d203b8633..cdf5a0e2e6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -27,7 +27,7 @@ on: jobs: lint: name: Lint - uses: canonical/data-platform-workflows/.github/workflows/lint.yaml@v31.0.0 + uses: canonical/data-platform-workflows/.github/workflows/lint.yaml@v31.0.1 unit-test: name: Unit test charm @@ -49,7 +49,7 @@ jobs: build: name: Build charm - uses: canonical/data-platform-workflows/.github/workflows/build_charm.yaml@v31.0.0 + uses: canonical/data-platform-workflows/.github/workflows/build_charm.yaml@v31.0.1 integration-test: name: Integration test charm diff --git a/.github/workflows/promote.yaml b/.github/workflows/promote.yaml index 7b4c329c6f..03e7bc7a29 100644 --- a/.github/workflows/promote.yaml +++ b/.github/workflows/promote.yaml @@ -25,7 +25,7 @@ on: jobs: promote: name: Promote charm - uses: canonical/data-platform-workflows/.github/workflows/_promote_charm.yaml@v31.0.0 + uses: canonical/data-platform-workflows/.github/workflows/_promote_charm.yaml@v31.0.1 with: track: '14' from-risk: ${{ inputs.from-risk }} diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 5e893a873a..28b9ddc5e1 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -27,7 +27,7 @@ jobs: name: Release charm needs: - ci-tests - uses: canonical/data-platform-workflows/.github/workflows/release_charm.yaml@v31.0.0 + uses: canonical/data-platform-workflows/.github/workflows/release_charm.yaml@v31.0.1 with: channel: 14/edge artifact-prefix: ${{ needs.ci-tests.outputs.artifact-prefix }} diff --git a/.github/workflows/sync_docs.yaml b/.github/workflows/sync_docs.yaml index 475fe4dae0..4b6e361979 100644 --- a/.github/workflows/sync_docs.yaml +++ b/.github/workflows/sync_docs.yaml @@ -10,7 +10,7 @@ on: jobs: sync-docs: name: Sync docs from Discourse - uses: canonical/data-platform-workflows/.github/workflows/sync_docs.yaml@v31.0.0 + uses: canonical/data-platform-workflows/.github/workflows/sync_docs.yaml@v31.0.1 with: reviewers: a-velasco,izmalk permissions: From 667756029e3e92e97e0745b3023647fd845ccee1 Mon Sep 17 00:00:00 2001 From: Dragomir Penev <6687393+dragomirp@users.noreply.github.com> Date: Fri, 28 Mar 2025 15:15:44 +0200 Subject: [PATCH 3/3] [DPE-6874] Poll all members in the cluster topology script (#810) * Poll all members in the cluster topology script * Dual branch config * Unit tests and bugfixes * Add peers when starting the observer * Retry sync up checks --- .github/renovate.json5 | 26 +------ .github/workflows/check_pr.yaml | 1 + scripts/cluster_topology_observer.py | 45 +++++++++--- src/charm.py | 42 +++++------ src/cluster_topology_observer.py | 7 +- tests/integration/ha_tests/helpers.py | 12 ++- tests/unit/test_charm.py | 6 -- tests/unit/test_cluster_topology_observer.py | 77 ++++++++++++++------ 8 files changed, 124 insertions(+), 92 deletions(-) diff --git a/.github/renovate.json5 b/.github/renovate.json5 index 34085c9225..cd60ef68a5 100644 --- a/.github/renovate.json5 +++ b/.github/renovate.json5 @@ -6,6 +6,7 @@ reviewers: [ 'team:data-platform-postgresql', ], + "baseBranches": ["main", "/^*\\/edge$/"], packageRules: [ { matchPackageNames: [ @@ -13,32 +14,7 @@ ], allowedVersions: '<2.0.0', }, - { - matchManagers: [ - 'custom.regex', - ], - matchDepNames: [ - 'juju', - ], - matchDatasources: [ - 'pypi', - ], - allowedVersions: '<3', - groupName: 'Juju agents', - }, ], customManagers: [ - { - customType: 'regex', - fileMatch: [ - '^\\.github/workflows/[^/]+\\.ya?ml$', - ], - matchStrings: [ - '(libjuju: )==(?.*?) +# renovate: latest libjuju 2', - ], - depNameTemplate: 'juju', - datasourceTemplate: 'pypi', - versioningTemplate: 'loose', - }, ], } diff --git a/.github/workflows/check_pr.yaml b/.github/workflows/check_pr.yaml index 84c56d20c7..f613a6aed9 100644 --- a/.github/workflows/check_pr.yaml +++ b/.github/workflows/check_pr.yaml @@ -11,6 +11,7 @@ on: - edited branches: - main + - '*/edge' jobs: check-pr: diff --git a/scripts/cluster_topology_observer.py b/scripts/cluster_topology_observer.py index 346d461319..c41ee34207 100644 --- a/scripts/cluster_topology_observer.py +++ b/scripts/cluster_topology_observer.py @@ -8,6 +8,7 @@ import sys from ssl import CERT_NONE, create_default_context from time import sleep +from urllib.parse import urljoin from urllib.request import urlopen API_REQUEST_TIMEOUT = 5 @@ -17,6 +18,10 @@ LOG_FILE_PATH = "/var/log/cluster_topology_observer.log" +class UnreachableUnitsError(Exception): + """Cannot reach any known cluster member.""" + + def dispatch(run_cmd, unit, charm_dir): """Use the input juju-run command to dispatch a :class:`ClusterTopologyChangeEvent`.""" dispatch_sub_cmd = "JUJU_DISPATCH_PATH=hooks/cluster_topology_change {}/dispatch" @@ -29,25 +34,43 @@ def main(): Watch the Patroni API cluster info. When changes are detected, dispatch the change event. """ - patroni_url, run_cmd, unit, charm_dir = sys.argv[1:] + patroni_urls, run_cmd, unit, charm_dir = sys.argv[1:] previous_cluster_topology = {} + urls = [urljoin(url, PATRONI_CLUSTER_STATUS_ENDPOINT) for url in patroni_urls.split(",")] + member_name = unit.replace("/", "-") while True: # Disable TLS chain verification context = create_default_context() context.check_hostname = False context.verify_mode = CERT_NONE - # Scheme is generated by the charm - resp = urlopen( # noqa: S310 - f"{patroni_url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}", - timeout=API_REQUEST_TIMEOUT, - context=context, - ) - cluster_status = json.loads(resp.read()) - current_cluster_topology = { - member["name"]: member["role"] for member in cluster_status["members"] - } + cluster_status = None + for url in urls: + try: + # Scheme is generated by the charm + resp = urlopen( # noqa: S310 + url, + timeout=API_REQUEST_TIMEOUT, + context=context, + ) + cluster_status = json.loads(resp.read()) + break + except Exception as e: + print(f"Failed to contact {url} with {e}") + continue + if not cluster_status: + raise UnreachableUnitsError("Unable to reach cluster members") + current_cluster_topology = {} + urls = [] + for member in cluster_status["members"]: + current_cluster_topology[member["name"]] = member["role"] + member_url = urljoin(member["api_url"], PATRONI_CLUSTER_STATUS_ENDPOINT) + # Call the current unit first + if member["name"] == member_name: + urls.insert(0, member_url) + else: + urls.append(member_url) # If it's the first time the cluster topology was retrieved, then store it and use # it for subsequent checks. diff --git a/src/charm.py b/src/charm.py index 30ca8117a9..ad4f6c2d1a 100755 --- a/src/charm.py +++ b/src/charm.py @@ -385,30 +385,22 @@ def primary_endpoint(self) -> str | None: logger.debug("primary endpoint early exit: Peer relation not joined yet.") return None try: - for attempt in Retrying(stop=stop_after_delay(5), wait=wait_fixed(3)): - with attempt: - primary = self._patroni.get_primary() - if primary is None and (standby_leader := self._patroni.get_standby_leader()): - primary = standby_leader - primary_endpoint = self._patroni.get_member_ip(primary) - # Force a retry if there is no primary or the member that was - # returned is not in the list of the current cluster members - # (like when the cluster was not updated yet after a failed switchover). - if not primary_endpoint or primary_endpoint not in self._units_ips: - # TODO figure out why peer data is not available - if ( - primary_endpoint - and len(self._units_ips) == 1 - and len(self._peers.units) > 1 - ): - logger.warning( - "Possibly incoplete peer data: Will not map primary IP to unit IP" - ) - return primary_endpoint - logger.debug( - "primary endpoint early exit: Primary IP not in cached peer list." - ) - primary_endpoint = None + primary = self._patroni.get_primary() + if primary is None and (standby_leader := self._patroni.get_standby_leader()): + primary = standby_leader + primary_endpoint = self._patroni.get_member_ip(primary) + # Force a retry if there is no primary or the member that was + # returned is not in the list of the current cluster members + # (like when the cluster was not updated yet after a failed switchover). + if not primary_endpoint or primary_endpoint not in self._units_ips: + # TODO figure out why peer data is not available + if primary_endpoint and len(self._units_ips) == 1 and len(self._peers.units) > 1: + logger.warning( + "Possibly incoplete peer data: Will not map primary IP to unit IP" + ) + return primary_endpoint + logger.debug("primary endpoint early exit: Primary IP not in cached peer list.") + primary_endpoint = None except RetryError: return None else: @@ -952,6 +944,8 @@ def _units_ips(self) -> set[str]: # Get all members IPs and remove the current unit IP from the list. addresses = {self._get_unit_ip(unit) for unit in self._peers.units} addresses.add(self._unit_ip) + if None in addresses: + addresses.remove(None) return addresses @property diff --git a/src/cluster_topology_observer.py b/src/cluster_topology_observer.py index 6aff9b29c3..aa85a0e48e 100644 --- a/src/cluster_topology_observer.py +++ b/src/cluster_topology_observer.py @@ -70,12 +70,17 @@ def start_observer(self): if "JUJU_CONTEXT_ID" in new_env: new_env.pop("JUJU_CONTEXT_ID") + urls = [self._charm._patroni._patroni_url] + [ + self._charm._patroni._patroni_url.replace(self._charm._patroni.unit_ip, peer) + for peer in list(self._charm._patroni.peers_ips) + ] + # Input is generated by the charm pid = subprocess.Popen( # noqa: S603 [ "/usr/bin/python3", "scripts/cluster_topology_observer.py", - self._charm._patroni._patroni_url, + ",".join(urls), self._run_cmd, self._charm.unit.name, self._charm.charm_dir, diff --git a/tests/integration/ha_tests/helpers.py b/tests/integration/ha_tests/helpers.py index d9ea25543d..c22cc85c2c 100644 --- a/tests/integration/ha_tests/helpers.py +++ b/tests/integration/ha_tests/helpers.py @@ -240,13 +240,17 @@ async def is_cluster_updated( # Verify that no writes to the database were missed after stopping the writes. logger.info("checking that no writes to the database were missed after stopping the writes") - total_expected_writes = await check_writes(ops_test, use_ip_from_inside) + for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True): + with attempt: + total_expected_writes = await check_writes(ops_test, use_ip_from_inside) # Verify that old primary is up-to-date. logger.info("checking that the former primary is up to date with the cluster after restarting") - assert await is_secondary_up_to_date( - ops_test, primary_name, total_expected_writes, use_ip_from_inside - ), "secondary not up to date with the cluster after restarting." + for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True): + with attempt: + assert await is_secondary_up_to_date( + ops_test, primary_name, total_expected_writes, use_ip_from_inside + ), "secondary not up to date with the cluster after restarting." async def check_writes( diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 7de1a502cf..e500a8d099 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -177,8 +177,6 @@ def test_patroni_scrape_config_tls(harness): def test_primary_endpoint(harness): with ( - patch("charm.stop_after_delay", new_callable=PropertyMock) as _stop_after_delay, - patch("charm.wait_fixed", new_callable=PropertyMock) as _wait_fixed, patch( "charm.PostgresqlOperatorCharm._units_ips", new_callable=PropertyMock, @@ -190,10 +188,6 @@ def test_primary_endpoint(harness): _patroni.return_value.get_primary.return_value = sentinel.primary assert harness.charm.primary_endpoint == "1.1.1.1" - # Check needed to ensure a fast charm deployment. - _stop_after_delay.assert_called_once_with(5) - _wait_fixed.assert_called_once_with(3) - _patroni.return_value.get_member_ip.assert_called_once_with(sentinel.primary) _patroni.return_value.get_primary.assert_called_once_with() diff --git a/tests/unit/test_cluster_topology_observer.py b/tests/unit/test_cluster_topology_observer.py index f079990251..3d0495b8eb 100644 --- a/tests/unit/test_cluster_topology_observer.py +++ b/tests/unit/test_cluster_topology_observer.py @@ -1,7 +1,9 @@ # Copyright 2023 Canonical Ltd. # See LICENSE file for licensing details. import signal -from unittest.mock import Mock, PropertyMock, patch +import sys +from json import dumps +from unittest.mock import Mock, PropertyMock, call, patch, sentinel import pytest from ops.charm import CharmBase @@ -13,25 +15,7 @@ ClusterTopologyChangeCharmEvents, ClusterTopologyObserver, ) -from scripts.cluster_topology_observer import dispatch - - -# This method will be used by the mock to replace requests.get -def mocked_requests_get(*args, **kwargs): - class MockResponse: - def __init__(self, json_data): - self.json_data = json_data - - def json(self): - return self.json_data - - data = { - "http://server1/cluster": { - "members": [{"name": "postgresql-0", "host": "1.1.1.1", "role": "leader", "lag": "1"}] - } - } - if args[0] in data: - return MockResponse(data[args[0]]) +from scripts.cluster_topology_observer import UnreachableUnitsError, dispatch, main class MockCharm(CharmBase): @@ -48,7 +32,7 @@ def _on_cluster_topology_change(self, _) -> None: @property def _patroni(self) -> Patroni: - return Mock(_patroni_url="http://1.1.1.1:8008/", verify=True) + return Mock(_patroni_url="http://1.1.1.1:8008/", peers_ips={}, verify=True) @property def _peers(self) -> Relation | None: @@ -153,3 +137,54 @@ def test_dispatch(harness): harness.charm.unit.name, f"JUJU_DISPATCH_PATH=hooks/cluster_topology_change {charm_dir}/dispatch", ]) + + +def test_main(): + with ( + patch.object( + sys, + "argv", + ["cmd", "http://server1:8008,http://server2:8008", "run_cmd", "unit/0", "charm_dir"], + ), + patch("scripts.cluster_topology_observer.sleep", return_value=None), + patch("scripts.cluster_topology_observer.urlopen") as _urlopen, + patch("scripts.cluster_topology_observer.subprocess") as _subprocess, + patch( + "scripts.cluster_topology_observer.create_default_context", + return_value=sentinel.sslcontext, + ), + ): + response1 = { + "members": [ + {"name": "unit-2", "api_url": "http://server3:8008/patroni", "role": "standby"}, + {"name": "unit-0", "api_url": "http://server1:8008/patroni", "role": "leader"}, + ] + } + mock1 = Mock() + mock1.read.return_value = dumps(response1) + response2 = { + "members": [ + {"name": "unit-2", "api_url": "https://server3:8008/patroni", "role": "leader"}, + ] + } + mock2 = Mock() + mock2.read.return_value = dumps(response2) + _urlopen.side_effect = [mock1, Exception, mock2] + with pytest.raises(UnreachableUnitsError): + main() + assert _urlopen.call_args_list == [ + # Iteration 1. server2 is not called + call("http://server1:8008/cluster", timeout=5, context=sentinel.sslcontext), + # Iteration 2 local unit server1 is called first + call("http://server1:8008/cluster", timeout=5, context=sentinel.sslcontext), + call("http://server3:8008/cluster", timeout=5, context=sentinel.sslcontext), + # Iteration 3 Last known member is server3 + call("https://server3:8008/cluster", timeout=5, context=sentinel.sslcontext), + ] + + _subprocess.run.assert_called_once_with([ + "run_cmd", + "-u", + "unit/0", + "JUJU_DISPATCH_PATH=hooks/cluster_topology_change charm_dir/dispatch", + ])