Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions .github/renovate.json5
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dual branch config, to reuse the PR.

Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,15 @@
reviewers: [
'team:data-platform-postgresql',
],
"baseBranches": ["main", "/^*\\/edge$/"],
packageRules: [
{
matchPackageNames: [
'pydantic',
],
allowedVersions: '<2.0.0',
},
{
matchManagers: [
'custom.regex',
],
matchDepNames: [
'juju',
],
matchDatasources: [
'pypi',
],
allowedVersions: '<3',
groupName: 'Juju agents',
},
],
customManagers: [
{
customType: 'regex',
fileMatch: [
'^\\.github/workflows/[^/]+\\.ya?ml$',
],
matchStrings: [
'(libjuju: )==(?<currentValue>.*?) +# renovate: latest libjuju 2',
],
depNameTemplate: 'juju',
datasourceTemplate: 'pypi',
versioningTemplate: 'loose',
},
],
}
1 change: 1 addition & 0 deletions .github/workflows/check_pr.yaml
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dual branch config, to reuse the PR.

Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ on:
- edited
branches:
- main
- '*/edge'

jobs:
check-pr:
Expand Down
45 changes: 34 additions & 11 deletions scripts/cluster_topology_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
from ssl import CERT_NONE, create_default_context
from time import sleep
from urllib.parse import urljoin
from urllib.request import urlopen

API_REQUEST_TIMEOUT = 5
Expand All @@ -17,6 +18,10 @@
LOG_FILE_PATH = "/var/log/cluster_topology_observer.log"


class UnreachableUnitsError(Exception):
"""Cannot reach any known cluster member."""


def dispatch(run_cmd, unit, charm_dir):
"""Use the input juju-run command to dispatch a :class:`ClusterTopologyChangeEvent`."""
dispatch_sub_cmd = "JUJU_DISPATCH_PATH=hooks/cluster_topology_change {}/dispatch"
Expand All @@ -29,25 +34,43 @@ def main():

Watch the Patroni API cluster info. When changes are detected, dispatch the change event.
"""
patroni_url, run_cmd, unit, charm_dir = sys.argv[1:]
patroni_urls, run_cmd, unit, charm_dir = sys.argv[1:]

previous_cluster_topology = {}
urls = [urljoin(url, PATRONI_CLUSTER_STATUS_ENDPOINT) for url in patroni_urls.split(",")]
member_name = unit.replace("/", "-")
while True:
# Disable TLS chain verification
context = create_default_context()
context.check_hostname = False
context.verify_mode = CERT_NONE
Comment on lines 43 to 46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please create backlog to revise this. IMHO, we should not skip TLS:

  • if CA is trusted, should be fine
  • if CA is selfsigned, should be locally available => trusted
  • if CA/cert expired is only the problem, IMHO => fallback to disable TLS? Point to discuss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd guess we'll also have the case where the cert is signed for a domain or vIP. which will mismatch when calling the individual unit's REST APIs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


# Scheme is generated by the charm
resp = urlopen( # noqa: S310
f"{patroni_url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
timeout=API_REQUEST_TIMEOUT,
context=context,
)
cluster_status = json.loads(resp.read())
current_cluster_topology = {
member["name"]: member["role"] for member in cluster_status["members"]
}
cluster_status = None
for url in urls:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going forward, we can most probably start and asyncio event loop and call all the units together, instead of going one by one. We should be able to do the same inside the charm as well, where we poll various endpoints.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, on a cluster with 9 nodes it might take a lot of time to switch if half of the cluster gone (5 sec API_REQUEST_TIMEOUT for each unit).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that it might be a good improvement.

try:
# Scheme is generated by the charm
resp = urlopen( # noqa: S310
url,
timeout=API_REQUEST_TIMEOUT,
context=context,
)
cluster_status = json.loads(resp.read())
break
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we got a response, don't call the other known members.

except Exception as e:
print(f"Failed to contact {url} with {e}")
continue
if not cluster_status:
raise UnreachableUnitsError("Unable to reach cluster members")
current_cluster_topology = {}
urls = []
for member in cluster_status["members"]:
current_cluster_topology[member["name"]] = member["role"]
member_url = urljoin(member["api_url"], PATRONI_CLUSTER_STATUS_ENDPOINT)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

api_url is the /patroni endpoint.

# Call the current unit first
if member["name"] == member_name:
urls.insert(0, member_url)
Comment on lines +70 to +71
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the current unit is a cluster member, add it first, so that it's called first on the next loop.

else:
urls.append(member_url)

# If it's the first time the cluster topology was retrieved, then store it and use
# it for subsequent checks.
Expand Down
42 changes: 18 additions & 24 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,30 +385,22 @@ def primary_endpoint(self) -> str | None:
logger.debug("primary endpoint early exit: Peer relation not joined yet.")
return None
try:
for attempt in Retrying(stop=stop_after_delay(5), wait=wait_fixed(3)):
with attempt:
Comment on lines -388 to -389
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get primary endpoint already retries.

primary = self._patroni.get_primary()
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
primary = standby_leader
primary_endpoint = self._patroni.get_member_ip(primary)
# Force a retry if there is no primary or the member that was
# returned is not in the list of the current cluster members
# (like when the cluster was not updated yet after a failed switchover).
if not primary_endpoint or primary_endpoint not in self._units_ips:
# TODO figure out why peer data is not available
if (
primary_endpoint
and len(self._units_ips) == 1
and len(self._peers.units) > 1
):
logger.warning(
"Possibly incoplete peer data: Will not map primary IP to unit IP"
)
return primary_endpoint
logger.debug(
"primary endpoint early exit: Primary IP not in cached peer list."
)
primary_endpoint = None
primary = self._patroni.get_primary()
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
primary = standby_leader
primary_endpoint = self._patroni.get_member_ip(primary)
# Force a retry if there is no primary or the member that was
# returned is not in the list of the current cluster members
# (like when the cluster was not updated yet after a failed switchover).
if not primary_endpoint or primary_endpoint not in self._units_ips:
# TODO figure out why peer data is not available
if primary_endpoint and len(self._units_ips) == 1 and len(self._peers.units) > 1:
logger.warning(
"Possibly incoplete peer data: Will not map primary IP to unit IP"
)
return primary_endpoint
logger.debug("primary endpoint early exit: Primary IP not in cached peer list.")
primary_endpoint = None
except RetryError:
return None
else:
Expand Down Expand Up @@ -952,6 +944,8 @@ def _units_ips(self) -> set[str]:
# Get all members IPs and remove the current unit IP from the list.
addresses = {self._get_unit_ip(unit) for unit in self._peers.units}
addresses.add(self._unit_ip)
if None in addresses:
addresses.remove(None)
return addresses

@property
Expand Down
7 changes: 6 additions & 1 deletion src/cluster_topology_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,17 @@ def start_observer(self):
if "JUJU_CONTEXT_ID" in new_env:
new_env.pop("JUJU_CONTEXT_ID")

urls = [self._charm._patroni._patroni_url] + [
self._charm._patroni._patroni_url.replace(self._charm._patroni.unit_ip, peer)
for peer in list(self._charm._patroni.peers_ips)
]

# Input is generated by the charm
pid = subprocess.Popen( # noqa: S603
[
"/usr/bin/python3",
"scripts/cluster_topology_observer.py",
self._charm._patroni._patroni_url,
",".join(urls),
self._run_cmd,
self._charm.unit.name,
self._charm.charm_dir,
Expand Down
12 changes: 8 additions & 4 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,13 +240,17 @@ async def is_cluster_updated(

# Verify that no writes to the database were missed after stopping the writes.
logger.info("checking that no writes to the database were missed after stopping the writes")
total_expected_writes = await check_writes(ops_test, use_ip_from_inside)
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
total_expected_writes = await check_writes(ops_test, use_ip_from_inside)
Comment on lines +243 to +245
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fails frequently on CI. Retry will give the other units some more time to sync up after stopping continuous writes.


# Verify that old primary is up-to-date.
logger.info("checking that the former primary is up to date with the cluster after restarting")
assert await is_secondary_up_to_date(
ops_test, primary_name, total_expected_writes, use_ip_from_inside
), "secondary not up to date with the cluster after restarting."
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(5), reraise=True):
with attempt:
assert await is_secondary_up_to_date(
ops_test, primary_name, total_expected_writes, use_ip_from_inside
), "secondary not up to date with the cluster after restarting."


async def check_writes(
Expand Down
6 changes: 0 additions & 6 deletions tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,6 @@ def test_patroni_scrape_config_tls(harness):

def test_primary_endpoint(harness):
with (
patch("charm.stop_after_delay", new_callable=PropertyMock) as _stop_after_delay,
patch("charm.wait_fixed", new_callable=PropertyMock) as _wait_fixed,
patch(
"charm.PostgresqlOperatorCharm._units_ips",
new_callable=PropertyMock,
Expand All @@ -190,10 +188,6 @@ def test_primary_endpoint(harness):
_patroni.return_value.get_primary.return_value = sentinel.primary
assert harness.charm.primary_endpoint == "1.1.1.1"

# Check needed to ensure a fast charm deployment.
_stop_after_delay.assert_called_once_with(5)
_wait_fixed.assert_called_once_with(3)

_patroni.return_value.get_member_ip.assert_called_once_with(sentinel.primary)
_patroni.return_value.get_primary.assert_called_once_with()

Expand Down
77 changes: 56 additions & 21 deletions tests/unit/test_cluster_topology_observer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
import signal
from unittest.mock import Mock, PropertyMock, patch
import sys
from json import dumps
from unittest.mock import Mock, PropertyMock, call, patch, sentinel

import pytest
from ops.charm import CharmBase
Expand All @@ -13,25 +15,7 @@
ClusterTopologyChangeCharmEvents,
ClusterTopologyObserver,
)
from scripts.cluster_topology_observer import dispatch


# This method will be used by the mock to replace requests.get
def mocked_requests_get(*args, **kwargs):
class MockResponse:
def __init__(self, json_data):
self.json_data = json_data

def json(self):
return self.json_data

data = {
"http://server1/cluster": {
"members": [{"name": "postgresql-0", "host": "1.1.1.1", "role": "leader", "lag": "1"}]
}
}
if args[0] in data:
return MockResponse(data[args[0]])
from scripts.cluster_topology_observer import UnreachableUnitsError, dispatch, main


class MockCharm(CharmBase):
Expand All @@ -48,7 +32,7 @@ def _on_cluster_topology_change(self, _) -> None:

@property
def _patroni(self) -> Patroni:
return Mock(_patroni_url="http://1.1.1.1:8008/", verify=True)
return Mock(_patroni_url="http://1.1.1.1:8008/", peers_ips={}, verify=True)

@property
def _peers(self) -> Relation | None:
Expand Down Expand Up @@ -153,3 +137,54 @@ def test_dispatch(harness):
harness.charm.unit.name,
f"JUJU_DISPATCH_PATH=hooks/cluster_topology_change {charm_dir}/dispatch",
])


def test_main():
with (
patch.object(
sys,
"argv",
["cmd", "http://server1:8008,http://server2:8008", "run_cmd", "unit/0", "charm_dir"],
),
patch("scripts.cluster_topology_observer.sleep", return_value=None),
patch("scripts.cluster_topology_observer.urlopen") as _urlopen,
patch("scripts.cluster_topology_observer.subprocess") as _subprocess,
patch(
"scripts.cluster_topology_observer.create_default_context",
return_value=sentinel.sslcontext,
),
):
response1 = {
"members": [
{"name": "unit-2", "api_url": "http://server3:8008/patroni", "role": "standby"},
{"name": "unit-0", "api_url": "http://server1:8008/patroni", "role": "leader"},
]
}
mock1 = Mock()
mock1.read.return_value = dumps(response1)
response2 = {
"members": [
{"name": "unit-2", "api_url": "https://server3:8008/patroni", "role": "leader"},
]
}
mock2 = Mock()
mock2.read.return_value = dumps(response2)
_urlopen.side_effect = [mock1, Exception, mock2]
with pytest.raises(UnreachableUnitsError):
main()
assert _urlopen.call_args_list == [
# Iteration 1. server2 is not called
call("http://server1:8008/cluster", timeout=5, context=sentinel.sslcontext),
# Iteration 2 local unit server1 is called first
call("http://server1:8008/cluster", timeout=5, context=sentinel.sslcontext),
call("http://server3:8008/cluster", timeout=5, context=sentinel.sslcontext),
# Iteration 3 Last known member is server3
call("https://server3:8008/cluster", timeout=5, context=sentinel.sslcontext),
]

_subprocess.run.assert_called_once_with([
"run_cmd",
"-u",
"unit/0",
"JUJU_DISPATCH_PATH=hooks/cluster_topology_change charm_dir/dispatch",
])