Skip to content

Commit 44b479d

Browse files
committed
Merge branch 'main' into sync16
2 parents ddc06ed + 6677560 commit 44b479d

File tree

6 files changed

+122
-67
lines changed

6 files changed

+122
-67
lines changed

scripts/cluster_topology_observer.py

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import sys
99
from ssl import CERT_NONE, create_default_context
1010
from time import sleep
11+
from urllib.parse import urljoin
1112
from urllib.request import urlopen
1213

1314
API_REQUEST_TIMEOUT = 5
@@ -17,6 +18,10 @@
1718
LOG_FILE_PATH = "/var/log/cluster_topology_observer.log"
1819

1920

21+
class UnreachableUnitsError(Exception):
22+
"""Cannot reach any known cluster member."""
23+
24+
2025
def dispatch(run_cmd, unit, charm_dir):
2126
"""Use the input juju-run command to dispatch a :class:`ClusterTopologyChangeEvent`."""
2227
dispatch_sub_cmd = "JUJU_DISPATCH_PATH=hooks/cluster_topology_change {}/dispatch"
@@ -29,25 +34,43 @@ def main():
2934
3035
Watch the Patroni API cluster info. When changes are detected, dispatch the change event.
3136
"""
32-
patroni_url, run_cmd, unit, charm_dir = sys.argv[1:]
37+
patroni_urls, run_cmd, unit, charm_dir = sys.argv[1:]
3338

3439
previous_cluster_topology = {}
40+
urls = [urljoin(url, PATRONI_CLUSTER_STATUS_ENDPOINT) for url in patroni_urls.split(",")]
41+
member_name = unit.replace("/", "-")
3542
while True:
3643
# Disable TLS chain verification
3744
context = create_default_context()
3845
context.check_hostname = False
3946
context.verify_mode = CERT_NONE
4047

41-
# Scheme is generated by the charm
42-
resp = urlopen( # noqa: S310
43-
f"{patroni_url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
44-
timeout=API_REQUEST_TIMEOUT,
45-
context=context,
46-
)
47-
cluster_status = json.loads(resp.read())
48-
current_cluster_topology = {
49-
member["name"]: member["role"] for member in cluster_status["members"]
50-
}
48+
cluster_status = None
49+
for url in urls:
50+
try:
51+
# Scheme is generated by the charm
52+
resp = urlopen( # noqa: S310
53+
url,
54+
timeout=API_REQUEST_TIMEOUT,
55+
context=context,
56+
)
57+
cluster_status = json.loads(resp.read())
58+
break
59+
except Exception as e:
60+
print(f"Failed to contact {url} with {e}")
61+
continue
62+
if not cluster_status:
63+
raise UnreachableUnitsError("Unable to reach cluster members")
64+
current_cluster_topology = {}
65+
urls = []
66+
for member in cluster_status["members"]:
67+
current_cluster_topology[member["name"]] = member["role"]
68+
member_url = urljoin(member["api_url"], PATRONI_CLUSTER_STATUS_ENDPOINT)
69+
# Call the current unit first
70+
if member["name"] == member_name:
71+
urls.insert(0, member_url)
72+
else:
73+
urls.append(member_url)
5174

5275
# If it's the first time the cluster topology was retrieved, then store it and use
5376
# it for subsequent checks.

src/charm.py

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -371,30 +371,22 @@ def primary_endpoint(self) -> str | None:
371371
logger.debug("primary endpoint early exit: Peer relation not joined yet.")
372372
return None
373373
try:
374-
for attempt in Retrying(stop=stop_after_delay(5), wait=wait_fixed(3)):
375-
with attempt:
376-
primary = self._patroni.get_primary()
377-
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
378-
primary = standby_leader
379-
primary_endpoint = self._patroni.get_member_ip(primary)
380-
# Force a retry if there is no primary or the member that was
381-
# returned is not in the list of the current cluster members
382-
# (like when the cluster was not updated yet after a failed switchover).
383-
if not primary_endpoint or primary_endpoint not in self._units_ips:
384-
# TODO figure out why peer data is not available
385-
if (
386-
primary_endpoint
387-
and len(self._units_ips) == 1
388-
and len(self._peers.units) > 1
389-
):
390-
logger.warning(
391-
"Possibly incoplete peer data: Will not map primary IP to unit IP"
392-
)
393-
return primary_endpoint
394-
logger.debug(
395-
"primary endpoint early exit: Primary IP not in cached peer list."
396-
)
397-
primary_endpoint = None
374+
primary = self._patroni.get_primary()
375+
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
376+
primary = standby_leader
377+
primary_endpoint = self._patroni.get_member_ip(primary)
378+
# Force a retry if there is no primary or the member that was
379+
# returned is not in the list of the current cluster members
380+
# (like when the cluster was not updated yet after a failed switchover).
381+
if not primary_endpoint or primary_endpoint not in self._units_ips:
382+
# TODO figure out why peer data is not available
383+
if primary_endpoint and len(self._units_ips) == 1 and len(self._peers.units) > 1:
384+
logger.warning(
385+
"Possibly incoplete peer data: Will not map primary IP to unit IP"
386+
)
387+
return primary_endpoint
388+
logger.debug("primary endpoint early exit: Primary IP not in cached peer list.")
389+
primary_endpoint = None
398390
except RetryError:
399391
return None
400392
else:
@@ -938,6 +930,8 @@ def _units_ips(self) -> set[str]:
938930
# Get all members IPs and remove the current unit IP from the list.
939931
addresses = {self._get_unit_ip(unit) for unit in self._peers.units}
940932
addresses.add(self._unit_ip)
933+
if None in addresses:
934+
addresses.remove(None)
941935
return addresses
942936

943937
@property

src/cluster_topology_observer.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,17 @@ def start_observer(self):
7070
if "JUJU_CONTEXT_ID" in new_env:
7171
new_env.pop("JUJU_CONTEXT_ID")
7272

73+
urls = [self._charm._patroni._patroni_url] + [
74+
self._charm._patroni._patroni_url.replace(self._charm._patroni.unit_ip, peer)
75+
for peer in list(self._charm._patroni.peers_ips)
76+
]
77+
7378
# Input is generated by the charm
7479
pid = subprocess.Popen( # noqa: S603
7580
[
7681
"/usr/bin/python3",
7782
"scripts/cluster_topology_observer.py",
78-
self._charm._patroni._patroni_url,
83+
",".join(urls),
7984
self._run_cmd,
8085
self._charm.unit.name,
8186
self._charm.charm_dir,

tests/integration/ha_tests/helpers.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,17 @@ async def is_cluster_updated(
240240

241241
# Verify that no writes to the database were missed after stopping the writes.
242242
logger.info("checking that no writes to the database were missed after stopping the writes")
243-
total_expected_writes = await check_writes(ops_test, use_ip_from_inside)
243+
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
244+
with attempt:
245+
total_expected_writes = await check_writes(ops_test, use_ip_from_inside)
244246

245247
# Verify that old primary is up-to-date.
246248
logger.info("checking that the former primary is up to date with the cluster after restarting")
247-
assert await is_secondary_up_to_date(
248-
ops_test, primary_name, total_expected_writes, use_ip_from_inside
249-
), "secondary not up to date with the cluster after restarting."
249+
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
250+
with attempt:
251+
assert await is_secondary_up_to_date(
252+
ops_test, primary_name, total_expected_writes, use_ip_from_inside
253+
), "secondary not up to date with the cluster after restarting."
250254

251255

252256
async def check_writes(

tests/unit/test_charm.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,6 @@ def test_patroni_scrape_config_tls(harness):
177177

178178
def test_primary_endpoint(harness):
179179
with (
180-
patch("charm.stop_after_delay", new_callable=PropertyMock) as _stop_after_delay,
181-
patch("charm.wait_fixed", new_callable=PropertyMock) as _wait_fixed,
182180
patch(
183181
"charm.PostgresqlOperatorCharm._units_ips",
184182
new_callable=PropertyMock,
@@ -190,10 +188,6 @@ def test_primary_endpoint(harness):
190188
_patroni.return_value.get_primary.return_value = sentinel.primary
191189
assert harness.charm.primary_endpoint == "1.1.1.1"
192190

193-
# Check needed to ensure a fast charm deployment.
194-
_stop_after_delay.assert_called_once_with(5)
195-
_wait_fixed.assert_called_once_with(3)
196-
197191
_patroni.return_value.get_member_ip.assert_called_once_with(sentinel.primary)
198192
_patroni.return_value.get_primary.assert_called_once_with()
199193

tests/unit/test_cluster_topology_observer.py

Lines changed: 56 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
# Copyright 2023 Canonical Ltd.
22
# See LICENSE file for licensing details.
33
import signal
4-
from unittest.mock import Mock, PropertyMock, patch
4+
import sys
5+
from json import dumps
6+
from unittest.mock import Mock, PropertyMock, call, patch, sentinel
57

68
import pytest
79
from ops.charm import CharmBase
@@ -13,25 +15,7 @@
1315
ClusterTopologyChangeCharmEvents,
1416
ClusterTopologyObserver,
1517
)
16-
from scripts.cluster_topology_observer import dispatch
17-
18-
19-
# This method will be used by the mock to replace requests.get
20-
def mocked_requests_get(*args, **kwargs):
21-
class MockResponse:
22-
def __init__(self, json_data):
23-
self.json_data = json_data
24-
25-
def json(self):
26-
return self.json_data
27-
28-
data = {
29-
"http://server1/cluster": {
30-
"members": [{"name": "postgresql-0", "host": "1.1.1.1", "role": "leader", "lag": "1"}]
31-
}
32-
}
33-
if args[0] in data:
34-
return MockResponse(data[args[0]])
18+
from scripts.cluster_topology_observer import UnreachableUnitsError, dispatch, main
3519

3620

3721
class MockCharm(CharmBase):
@@ -48,7 +32,7 @@ def _on_cluster_topology_change(self, _) -> None:
4832

4933
@property
5034
def _patroni(self) -> Patroni:
51-
return Mock(_patroni_url="http://1.1.1.1:8008/", verify=True)
35+
return Mock(_patroni_url="http://1.1.1.1:8008/", peers_ips={}, verify=True)
5236

5337
@property
5438
def _peers(self) -> Relation | None:
@@ -153,3 +137,54 @@ def test_dispatch(harness):
153137
harness.charm.unit.name,
154138
f"JUJU_DISPATCH_PATH=hooks/cluster_topology_change {charm_dir}/dispatch",
155139
])
140+
141+
142+
def test_main():
143+
with (
144+
patch.object(
145+
sys,
146+
"argv",
147+
["cmd", "http://server1:8008,http://server2:8008", "run_cmd", "unit/0", "charm_dir"],
148+
),
149+
patch("scripts.cluster_topology_observer.sleep", return_value=None),
150+
patch("scripts.cluster_topology_observer.urlopen") as _urlopen,
151+
patch("scripts.cluster_topology_observer.subprocess") as _subprocess,
152+
patch(
153+
"scripts.cluster_topology_observer.create_default_context",
154+
return_value=sentinel.sslcontext,
155+
),
156+
):
157+
response1 = {
158+
"members": [
159+
{"name": "unit-2", "api_url": "http://server3:8008/patroni", "role": "standby"},
160+
{"name": "unit-0", "api_url": "http://server1:8008/patroni", "role": "leader"},
161+
]
162+
}
163+
mock1 = Mock()
164+
mock1.read.return_value = dumps(response1)
165+
response2 = {
166+
"members": [
167+
{"name": "unit-2", "api_url": "https://server3:8008/patroni", "role": "leader"},
168+
]
169+
}
170+
mock2 = Mock()
171+
mock2.read.return_value = dumps(response2)
172+
_urlopen.side_effect = [mock1, Exception, mock2]
173+
with pytest.raises(UnreachableUnitsError):
174+
main()
175+
assert _urlopen.call_args_list == [
176+
# Iteration 1. server2 is not called
177+
call("http://server1:8008/cluster", timeout=5, context=sentinel.sslcontext),
178+
# Iteration 2 local unit server1 is called first
179+
call("http://server1:8008/cluster", timeout=5, context=sentinel.sslcontext),
180+
call("http://server3:8008/cluster", timeout=5, context=sentinel.sslcontext),
181+
# Iteration 3 Last known member is server3
182+
call("https://server3:8008/cluster", timeout=5, context=sentinel.sslcontext),
183+
]
184+
185+
_subprocess.run.assert_called_once_with([
186+
"run_cmd",
187+
"-u",
188+
"unit/0",
189+
"JUJU_DISPATCH_PATH=hooks/cluster_topology_change charm_dir/dispatch",
190+
])

0 commit comments

Comments
 (0)