Skip to content

Commit 79f810f

Browse files
committed
Merge branch 'main' into sync16
2 parents e3df392 + 6677560 commit 79f810f

File tree

12 files changed

+129
-74
lines changed

12 files changed

+129
-74
lines changed

.github/workflows/approve_renovate_pr.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,6 @@ on:
1010
jobs:
1111
approve-pr:
1212
name: Approve Renovate pull request
13-
uses: canonical/data-platform-workflows/.github/workflows/approve_renovate_pr.yaml@v31.0.0
13+
uses: canonical/data-platform-workflows/.github/workflows/approve_renovate_pr.yaml@v31.0.1
1414
permissions:
1515
pull-requests: write # Needed to approve PR

.github/workflows/check_pr.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,4 @@ on:
1616
jobs:
1717
check-pr:
1818
name: Check pull request
19-
uses: canonical/data-platform-workflows/.github/workflows/check_charm_pr.yaml@v31.0.0
19+
uses: canonical/data-platform-workflows/.github/workflows/check_charm_pr.yaml@v31.0.1

.github/workflows/ci.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ on:
2727
jobs:
2828
lint:
2929
name: Lint
30-
uses: canonical/data-platform-workflows/.github/workflows/lint.yaml@v31.0.0
30+
uses: canonical/data-platform-workflows/.github/workflows/lint.yaml@v31.0.1
3131

3232
unit-test:
3333
name: Unit test charm
@@ -49,7 +49,7 @@ jobs:
4949

5050
build:
5151
name: Build charm
52-
uses: canonical/data-platform-workflows/.github/workflows/build_charm.yaml@v31.0.0
52+
uses: canonical/data-platform-workflows/.github/workflows/build_charm.yaml@v31.0.1
5353

5454
integration-test:
5555
name: Integration test charm

.github/workflows/promote.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ on:
2525
jobs:
2626
promote:
2727
name: Promote charm
28-
uses: canonical/data-platform-workflows/.github/workflows/_promote_charm.yaml@v31.0.0
28+
uses: canonical/data-platform-workflows/.github/workflows/_promote_charm.yaml@v31.0.1
2929
with:
3030
track: '16'
3131
from-risk: ${{ inputs.from-risk }}

.github/workflows/release.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
name: Release charm
2828
needs:
2929
- ci-tests
30-
uses: canonical/data-platform-workflows/.github/workflows/release_charm.yaml@v31.0.0
30+
uses: canonical/data-platform-workflows/.github/workflows/release_charm.yaml@v31.0.1
3131
with:
3232
channel: ${{ github.ref_name }}
3333
artifact-prefix: ${{ needs.ci-tests.outputs.artifact-prefix }}

.github/workflows/sync_docs.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ on:
1010
jobs:
1111
sync-docs:
1212
name: Sync docs from Discourse
13-
uses: canonical/data-platform-workflows/.github/workflows/sync_docs.yaml@v31.0.0
13+
uses: canonical/data-platform-workflows/.github/workflows/sync_docs.yaml@v31.0.1
1414
with:
1515
reviewers: a-velasco,izmalk
1616
permissions:

scripts/cluster_topology_observer.py

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import sys
99
from ssl import CERT_NONE, create_default_context
1010
from time import sleep
11+
from urllib.parse import urljoin
1112
from urllib.request import urlopen
1213

1314
API_REQUEST_TIMEOUT = 5
@@ -17,6 +18,10 @@
1718
LOG_FILE_PATH = "/var/log/cluster_topology_observer.log"
1819

1920

21+
class UnreachableUnitsError(Exception):
22+
"""Cannot reach any known cluster member."""
23+
24+
2025
def dispatch(run_cmd, unit, charm_dir):
2126
"""Use the input juju-run command to dispatch a :class:`ClusterTopologyChangeEvent`."""
2227
dispatch_sub_cmd = "JUJU_DISPATCH_PATH=hooks/cluster_topology_change {}/dispatch"
@@ -29,25 +34,43 @@ def main():
2934
3035
Watch the Patroni API cluster info. When changes are detected, dispatch the change event.
3136
"""
32-
patroni_url, run_cmd, unit, charm_dir = sys.argv[1:]
37+
patroni_urls, run_cmd, unit, charm_dir = sys.argv[1:]
3338

3439
previous_cluster_topology = {}
40+
urls = [urljoin(url, PATRONI_CLUSTER_STATUS_ENDPOINT) for url in patroni_urls.split(",")]
41+
member_name = unit.replace("/", "-")
3542
while True:
3643
# Disable TLS chain verification
3744
context = create_default_context()
3845
context.check_hostname = False
3946
context.verify_mode = CERT_NONE
4047

41-
# Scheme is generated by the charm
42-
resp = urlopen( # noqa: S310
43-
f"{patroni_url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
44-
timeout=API_REQUEST_TIMEOUT,
45-
context=context,
46-
)
47-
cluster_status = json.loads(resp.read())
48-
current_cluster_topology = {
49-
member["name"]: member["role"] for member in cluster_status["members"]
50-
}
48+
cluster_status = None
49+
for url in urls:
50+
try:
51+
# Scheme is generated by the charm
52+
resp = urlopen( # noqa: S310
53+
url,
54+
timeout=API_REQUEST_TIMEOUT,
55+
context=context,
56+
)
57+
cluster_status = json.loads(resp.read())
58+
break
59+
except Exception as e:
60+
print(f"Failed to contact {url} with {e}")
61+
continue
62+
if not cluster_status:
63+
raise UnreachableUnitsError("Unable to reach cluster members")
64+
current_cluster_topology = {}
65+
urls = []
66+
for member in cluster_status["members"]:
67+
current_cluster_topology[member["name"]] = member["role"]
68+
member_url = urljoin(member["api_url"], PATRONI_CLUSTER_STATUS_ENDPOINT)
69+
# Call the current unit first
70+
if member["name"] == member_name:
71+
urls.insert(0, member_url)
72+
else:
73+
urls.append(member_url)
5174

5275
# If it's the first time the cluster topology was retrieved, then store it and use
5376
# it for subsequent checks.

src/charm.py

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -372,30 +372,22 @@ def primary_endpoint(self) -> str | None:
372372
logger.debug("primary endpoint early exit: Peer relation not joined yet.")
373373
return None
374374
try:
375-
for attempt in Retrying(stop=stop_after_delay(5), wait=wait_fixed(3)):
376-
with attempt:
377-
primary = self._patroni.get_primary()
378-
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
379-
primary = standby_leader
380-
primary_endpoint = self._patroni.get_member_ip(primary)
381-
# Force a retry if there is no primary or the member that was
382-
# returned is not in the list of the current cluster members
383-
# (like when the cluster was not updated yet after a failed switchover).
384-
if not primary_endpoint or primary_endpoint not in self._units_ips:
385-
# TODO figure out why peer data is not available
386-
if (
387-
primary_endpoint
388-
and len(self._units_ips) == 1
389-
and len(self._peers.units) > 1
390-
):
391-
logger.warning(
392-
"Possibly incoplete peer data: Will not map primary IP to unit IP"
393-
)
394-
return primary_endpoint
395-
logger.debug(
396-
"primary endpoint early exit: Primary IP not in cached peer list."
397-
)
398-
primary_endpoint = None
375+
primary = self._patroni.get_primary()
376+
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
377+
primary = standby_leader
378+
primary_endpoint = self._patroni.get_member_ip(primary)
379+
# Force a retry if there is no primary or the member that was
380+
# returned is not in the list of the current cluster members
381+
# (like when the cluster was not updated yet after a failed switchover).
382+
if not primary_endpoint or primary_endpoint not in self._units_ips:
383+
# TODO figure out why peer data is not available
384+
if primary_endpoint and len(self._units_ips) == 1 and len(self._peers.units) > 1:
385+
logger.warning(
386+
"Possibly incoplete peer data: Will not map primary IP to unit IP"
387+
)
388+
return primary_endpoint
389+
logger.debug("primary endpoint early exit: Primary IP not in cached peer list.")
390+
primary_endpoint = None
399391
except RetryError:
400392
return None
401393
else:
@@ -939,6 +931,8 @@ def _units_ips(self) -> set[str]:
939931
# Get all members IPs and remove the current unit IP from the list.
940932
addresses = {self._get_unit_ip(unit) for unit in self._peers.units}
941933
addresses.add(self._unit_ip)
934+
if None in addresses:
935+
addresses.remove(None)
942936
return addresses
943937

944938
@property

src/cluster_topology_observer.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,17 @@ def start_observer(self):
7070
if "JUJU_CONTEXT_ID" in new_env:
7171
new_env.pop("JUJU_CONTEXT_ID")
7272

73+
urls = [self._charm._patroni._patroni_url] + [
74+
self._charm._patroni._patroni_url.replace(self._charm._patroni.unit_ip, peer)
75+
for peer in list(self._charm._patroni.peers_ips)
76+
]
77+
7378
# Input is generated by the charm
7479
pid = subprocess.Popen( # noqa: S603
7580
[
7681
"/usr/bin/python3",
7782
"scripts/cluster_topology_observer.py",
78-
self._charm._patroni._patroni_url,
83+
",".join(urls),
7984
self._run_cmd,
8085
self._charm.unit.name,
8186
self._charm.charm_dir,

tests/integration/ha_tests/helpers.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,13 +240,17 @@ async def is_cluster_updated(
240240

241241
# Verify that no writes to the database were missed after stopping the writes.
242242
logger.info("checking that no writes to the database were missed after stopping the writes")
243-
total_expected_writes = await check_writes(ops_test, use_ip_from_inside)
243+
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
244+
with attempt:
245+
total_expected_writes = await check_writes(ops_test, use_ip_from_inside)
244246

245247
# Verify that old primary is up-to-date.
246248
logger.info("checking that the former primary is up to date with the cluster after restarting")
247-
assert await is_secondary_up_to_date(
248-
ops_test, primary_name, total_expected_writes, use_ip_from_inside
249-
), "secondary not up to date with the cluster after restarting."
249+
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
250+
with attempt:
251+
assert await is_secondary_up_to_date(
252+
ops_test, primary_name, total_expected_writes, use_ip_from_inside
253+
), "secondary not up to date with the cluster after restarting."
250254

251255

252256
async def check_writes(

0 commit comments

Comments
 (0)