Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/approve_renovate_pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ on:
jobs:
approve-pr:
name: Approve Renovate pull request
uses: canonical/data-platform-workflows/.github/workflows/approve_renovate_pr.yaml@v31.0.0
uses: canonical/data-platform-workflows/.github/workflows/approve_renovate_pr.yaml@v31.0.1
permissions:
pull-requests: write # Needed to approve PR
2 changes: 1 addition & 1 deletion .github/workflows/check_pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@ on:
jobs:
check-pr:
name: Check pull request
uses: canonical/data-platform-workflows/.github/workflows/check_charm_pr.yaml@v31.0.0
uses: canonical/data-platform-workflows/.github/workflows/check_charm_pr.yaml@v31.0.1
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ on:
jobs:
lint:
name: Lint
uses: canonical/data-platform-workflows/.github/workflows/lint.yaml@v31.0.0
uses: canonical/data-platform-workflows/.github/workflows/lint.yaml@v31.0.1

unit-test:
name: Unit test charm
Expand All @@ -49,7 +49,7 @@ jobs:

build:
name: Build charm
uses: canonical/data-platform-workflows/.github/workflows/build_charm.yaml@v31.0.0
uses: canonical/data-platform-workflows/.github/workflows/build_charm.yaml@v31.0.1

integration-test:
name: Integration test charm
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/promote.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ on:
jobs:
promote:
name: Promote charm
uses: canonical/data-platform-workflows/.github/workflows/_promote_charm.yaml@v31.0.0
uses: canonical/data-platform-workflows/.github/workflows/_promote_charm.yaml@v31.0.1
with:
track: '16'
from-risk: ${{ inputs.from-risk }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
name: Release charm
needs:
- ci-tests
uses: canonical/data-platform-workflows/.github/workflows/release_charm.yaml@v31.0.0
uses: canonical/data-platform-workflows/.github/workflows/release_charm.yaml@v31.0.1
with:
channel: ${{ github.ref_name }}
artifact-prefix: ${{ needs.ci-tests.outputs.artifact-prefix }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/sync_docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ on:
jobs:
sync-docs:
name: Sync docs from Discourse
uses: canonical/data-platform-workflows/.github/workflows/sync_docs.yaml@v31.0.0
uses: canonical/data-platform-workflows/.github/workflows/sync_docs.yaml@v31.0.1
with:
reviewers: a-velasco,izmalk
permissions:
Expand Down
45 changes: 34 additions & 11 deletions scripts/cluster_topology_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
from ssl import CERT_NONE, create_default_context
from time import sleep
from urllib.parse import urljoin
from urllib.request import urlopen

API_REQUEST_TIMEOUT = 5
Expand All @@ -17,6 +18,10 @@
LOG_FILE_PATH = "/var/log/cluster_topology_observer.log"


class UnreachableUnitsError(Exception):
"""Cannot reach any known cluster member."""


def dispatch(run_cmd, unit, charm_dir):
"""Use the input juju-run command to dispatch a :class:`ClusterTopologyChangeEvent`."""
dispatch_sub_cmd = "JUJU_DISPATCH_PATH=hooks/cluster_topology_change {}/dispatch"
Expand All @@ -29,25 +34,43 @@ def main():

Watch the Patroni API cluster info. When changes are detected, dispatch the change event.
"""
patroni_url, run_cmd, unit, charm_dir = sys.argv[1:]
patroni_urls, run_cmd, unit, charm_dir = sys.argv[1:]

previous_cluster_topology = {}
urls = [urljoin(url, PATRONI_CLUSTER_STATUS_ENDPOINT) for url in patroni_urls.split(",")]
member_name = unit.replace("/", "-")
while True:
# Disable TLS chain verification
context = create_default_context()
context.check_hostname = False
context.verify_mode = CERT_NONE

# Scheme is generated by the charm
resp = urlopen( # noqa: S310
f"{patroni_url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
timeout=API_REQUEST_TIMEOUT,
context=context,
)
cluster_status = json.loads(resp.read())
current_cluster_topology = {
member["name"]: member["role"] for member in cluster_status["members"]
}
cluster_status = None
for url in urls:
try:
# Scheme is generated by the charm
resp = urlopen( # noqa: S310
url,
timeout=API_REQUEST_TIMEOUT,
context=context,
)
cluster_status = json.loads(resp.read())
break
except Exception as e:
print(f"Failed to contact {url} with {e}")
continue
if not cluster_status:
raise UnreachableUnitsError("Unable to reach cluster members")
current_cluster_topology = {}
urls = []
for member in cluster_status["members"]:
current_cluster_topology[member["name"]] = member["role"]
member_url = urljoin(member["api_url"], PATRONI_CLUSTER_STATUS_ENDPOINT)
# Call the current unit first
if member["name"] == member_name:
urls.insert(0, member_url)
else:
urls.append(member_url)

# If it's the first time the cluster topology was retrieved, then store it and use
# it for subsequent checks.
Expand Down
42 changes: 18 additions & 24 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,30 +372,22 @@ def primary_endpoint(self) -> str | None:
logger.debug("primary endpoint early exit: Peer relation not joined yet.")
return None
try:
for attempt in Retrying(stop=stop_after_delay(5), wait=wait_fixed(3)):
with attempt:
primary = self._patroni.get_primary()
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
primary = standby_leader
primary_endpoint = self._patroni.get_member_ip(primary)
# Force a retry if there is no primary or the member that was
# returned is not in the list of the current cluster members
# (like when the cluster was not updated yet after a failed switchover).
if not primary_endpoint or primary_endpoint not in self._units_ips:
# TODO figure out why peer data is not available
if (
primary_endpoint
and len(self._units_ips) == 1
and len(self._peers.units) > 1
):
logger.warning(
"Possibly incoplete peer data: Will not map primary IP to unit IP"
)
return primary_endpoint
logger.debug(
"primary endpoint early exit: Primary IP not in cached peer list."
)
primary_endpoint = None
primary = self._patroni.get_primary()
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
primary = standby_leader
primary_endpoint = self._patroni.get_member_ip(primary)
# Force a retry if there is no primary or the member that was
# returned is not in the list of the current cluster members
# (like when the cluster was not updated yet after a failed switchover).
if not primary_endpoint or primary_endpoint not in self._units_ips:
# TODO figure out why peer data is not available
if primary_endpoint and len(self._units_ips) == 1 and len(self._peers.units) > 1:
logger.warning(
"Possibly incoplete peer data: Will not map primary IP to unit IP"
)
return primary_endpoint
logger.debug("primary endpoint early exit: Primary IP not in cached peer list.")
primary_endpoint = None
except RetryError:
return None
else:
Expand Down Expand Up @@ -939,6 +931,8 @@ def _units_ips(self) -> set[str]:
# Get all members IPs and remove the current unit IP from the list.
addresses = {self._get_unit_ip(unit) for unit in self._peers.units}
addresses.add(self._unit_ip)
if None in addresses:
addresses.remove(None)
return addresses

@property
Expand Down
7 changes: 6 additions & 1 deletion src/cluster_topology_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,17 @@ def start_observer(self):
if "JUJU_CONTEXT_ID" in new_env:
new_env.pop("JUJU_CONTEXT_ID")

urls = [self._charm._patroni._patroni_url] + [
self._charm._patroni._patroni_url.replace(self._charm._patroni.unit_ip, peer)
for peer in list(self._charm._patroni.peers_ips)
]

# Input is generated by the charm
pid = subprocess.Popen( # noqa: S603
[
"/usr/bin/python3",
"scripts/cluster_topology_observer.py",
self._charm._patroni._patroni_url,
",".join(urls),
self._run_cmd,
self._charm.unit.name,
self._charm.charm_dir,
Expand Down
12 changes: 8 additions & 4 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,17 @@ async def is_cluster_updated(

# Verify that no writes to the database were missed after stopping the writes.
logger.info("checking that no writes to the database were missed after stopping the writes")
total_expected_writes = await check_writes(ops_test, use_ip_from_inside)
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
total_expected_writes = await check_writes(ops_test, use_ip_from_inside)

# Verify that old primary is up-to-date.
logger.info("checking that the former primary is up to date with the cluster after restarting")
assert await is_secondary_up_to_date(
ops_test, primary_name, total_expected_writes, use_ip_from_inside
), "secondary not up to date with the cluster after restarting."
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
assert await is_secondary_up_to_date(
ops_test, primary_name, total_expected_writes, use_ip_from_inside
), "secondary not up to date with the cluster after restarting."


async def check_writes(
Expand Down
6 changes: 0 additions & 6 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ def test_patroni_scrape_config_tls(harness):

def test_primary_endpoint(harness):
with (
patch("charm.stop_after_delay", new_callable=PropertyMock) as _stop_after_delay,
patch("charm.wait_fixed", new_callable=PropertyMock) as _wait_fixed,
patch(
"charm.PostgresqlOperatorCharm._units_ips",
new_callable=PropertyMock,
Expand All @@ -190,10 +188,6 @@ def test_primary_endpoint(harness):
_patroni.return_value.get_primary.return_value = sentinel.primary
assert harness.charm.primary_endpoint == "1.1.1.1"

# Check needed to ensure a fast charm deployment.
_stop_after_delay.assert_called_once_with(5)
_wait_fixed.assert_called_once_with(3)

_patroni.return_value.get_member_ip.assert_called_once_with(sentinel.primary)
_patroni.return_value.get_primary.assert_called_once_with()

Expand Down
77 changes: 56 additions & 21 deletions tests/unit/test_cluster_topology_observer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import signal
from unittest.mock import Mock, PropertyMock, patch
import sys
from json import dumps
from unittest.mock import Mock, PropertyMock, call, patch, sentinel

import pytest
from ops.charm import CharmBase
Expand All @@ -13,25 +15,7 @@
ClusterTopologyChangeCharmEvents,
ClusterTopologyObserver,
)
from scripts.cluster_topology_observer import dispatch


# This method will be used by the mock to replace requests.get
def mocked_requests_get(*args, **kwargs):
class MockResponse:
def __init__(self, json_data):
self.json_data = json_data

def json(self):
return self.json_data

data = {
"http://server1/cluster": {
"members": [{"name": "postgresql-0", "host": "1.1.1.1", "role": "leader", "lag": "1"}]
}
}
if args[0] in data:
return MockResponse(data[args[0]])
from scripts.cluster_topology_observer import UnreachableUnitsError, dispatch, main


class MockCharm(CharmBase):
Expand All @@ -48,7 +32,7 @@ def _on_cluster_topology_change(self, _) -> None:

@property
def _patroni(self) -> Patroni:
return Mock(_patroni_url="http://1.1.1.1:8008/", verify=True)
return Mock(_patroni_url="http://1.1.1.1:8008/", peers_ips={}, verify=True)

@property
def _peers(self) -> Relation | None:
Expand Down Expand Up @@ -153,3 +137,54 @@ def test_dispatch(harness):
harness.charm.unit.name,
f"JUJU_DISPATCH_PATH=hooks/cluster_topology_change {charm_dir}/dispatch",
])


def test_main():
with (
patch.object(
sys,
"argv",
["cmd", "http://server1:8008,http://server2:8008", "run_cmd", "unit/0", "charm_dir"],
),
patch("scripts.cluster_topology_observer.sleep", return_value=None),
patch("scripts.cluster_topology_observer.urlopen") as _urlopen,
patch("scripts.cluster_topology_observer.subprocess") as _subprocess,
patch(
"scripts.cluster_topology_observer.create_default_context",
return_value=sentinel.sslcontext,
),
):
response1 = {
"members": [
{"name": "unit-2", "api_url": "http://server3:8008/patroni", "role": "standby"},
{"name": "unit-0", "api_url": "http://server1:8008/patroni", "role": "leader"},
]
}
mock1 = Mock()
mock1.read.return_value = dumps(response1)
response2 = {
"members": [
{"name": "unit-2", "api_url": "https://server3:8008/patroni", "role": "leader"},
]
}
mock2 = Mock()
mock2.read.return_value = dumps(response2)
_urlopen.side_effect = [mock1, Exception, mock2]
with pytest.raises(UnreachableUnitsError):
main()
assert _urlopen.call_args_list == [
# Iteration 1. server2 is not called
call("http://server1:8008/cluster", timeout=5, context=sentinel.sslcontext),
# Iteration 2 local unit server1 is called first
call("http://server1:8008/cluster", timeout=5, context=sentinel.sslcontext),
call("http://server3:8008/cluster", timeout=5, context=sentinel.sslcontext),
# Iteration 3 Last known member is server3
call("https://server3:8008/cluster", timeout=5, context=sentinel.sslcontext),
]

_subprocess.run.assert_called_once_with([
"run_cmd",
"-u",
"unit/0",
"JUJU_DISPATCH_PATH=hooks/cluster_topology_change charm_dir/dispatch",
])