Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
f965fb1
Switch to pydantic 2
dragomirp May 17, 2025
cbda477
Initial tls v4
dragomirp May 20, 2025
70a5e15
Linting
dragomirp May 20, 2025
7306e3a
IP sans
dragomirp May 20, 2025
0606d7c
Add ips to dns sans
dragomirp May 20, 2025
e838463
Add back cert update on ip change
dragomirp May 21, 2025
df0747f
Spaces addresses
dragomirp May 21, 2025
ac3d954
Add peer cert relation
dragomirp May 21, 2025
73f48d3
Fix rel check
dragomirp May 21, 2025
feceb4f
Revert to reload on cert change
dragomirp May 21, 2025
230ef0d
Get CA from the correct tls relation
dragomirp May 21, 2025
d0d7c3a
Apply suggestions from code review
dragomirp May 21, 2025
76d45b9
Add optimizer_cpu_tuple_cost constraints
dragomirp May 21, 2025
54a0072
Merge branch 'pydantic' into tlsv4
dragomirp May 21, 2025
6ccfaa7
Block on missing TLS rel
dragomirp May 22, 2025
391ec19
Fix tls test
dragomirp May 22, 2025
e5d69ce
Wrong key
dragomirp May 22, 2025
b6af020
Merge branch '16/edge' into pydantic
dragomirp May 23, 2025
a450c25
Async patroni wip
dragomirp May 25, 2025
7d58999
Dont update on rel mismatch
dragomirp May 25, 2025
102f522
Merge branch 'pydantic' into tlsv4
dragomirp May 25, 2025
05d6f0d
Parallel requests
dragomirp May 25, 2025
9b7c79c
Disable unit tests
dragomirp May 25, 2025
9d1df0e
Async checks
dragomirp May 25, 2025
2c61ec2
Linting
dragomirp May 25, 2025
d3ee0a5
Try to verify cert
dragomirp May 26, 2025
ea5cba7
Reenable network cut for arm
dragomirp May 26, 2025
7c1f6c0
Reduce httpx logging
dragomirp May 26, 2025
712adc3
Try wait first completed
dragomirp May 26, 2025
6e5948f
Replace httpx with aiohttp
dragomirp May 26, 2025
0bdb2e9
Add back alternative endpoints and coro as_completed
dragomirp May 26, 2025
f776858
Session for each async request
dragomirp May 26, 2025
23613ed
Back to tasks
dragomirp May 27, 2025
fb75670
Remove JujuVersion warning
dragomirp May 27, 2025
36a49b8
Merge branch 'pydantic' into tlsv4
dragomirp May 27, 2025
dbac958
Split tls enabled flags
dragomirp May 27, 2025
19f145e
Sync to dpl repo
dragomirp May 27, 2025
72ef410
Merge branch 'pydantic' into tlsv4
dragomirp May 27, 2025
7370bd9
Initial parallel observer
dragomirp May 28, 2025
71475ac
Merge branch '16/edge' into pydantic
dragomirp May 28, 2025
2f6e307
Merge branch '16/edge' into parallel-patroni-calls
dragomirp May 28, 2025
62d055a
Merge branch 'pydantic' into tlsv4
dragomirp May 28, 2025
9dc4ca2
Bump lib and fix peer enablement
dragomirp May 28, 2025
94760a7
Merge branch '16/edge' into tlsv4
dragomirp May 28, 2025
672a10d
Peer checks
dragomirp May 28, 2025
c022891
Internal cert
dragomirp May 28, 2025
16934cb
Fix internal ca check
dragomirp May 28, 2025
9e3924c
Try not to deffer peer change
dragomirp May 29, 2025
42c3c47
Missed http calls
dragomirp May 29, 2025
4ad28b2
Peer CAs bundle for requests
dragomirp May 29, 2025
8e31186
Patroni magic config
dragomirp May 30, 2025
0b5e447
Magic config for other users
dragomirp May 30, 2025
e6141b1
Disable upgrade tests
dragomirp May 30, 2025
5b6fc0a
Cache old cas
dragomirp May 30, 2025
94c42a8
Merge branch '16/edge' into tlsv4
dragomirp May 30, 2025
427abb1
Remove logger
dragomirp May 30, 2025
b0e5f23
Fix charm int test
dragomirp May 30, 2025
8a79798
Correct schema and tls unit test
dragomirp May 30, 2025
77ad9ef
Merge branch '16/edge' into tlsv4
dragomirp May 31, 2025
70e7776
Try to deffer if no certs
dragomirp May 31, 2025
611d723
Handle Retry errors
dragomirp May 31, 2025
61d998a
Update libs
dragomirp May 31, 2025
803dead
Revert cluster changes
dragomirp May 31, 2025
a86cb17
Try getting alternative endpoints
dragomirp Jun 1, 2025
3586af1
Merge branch '16/edge' into parallel-patroni-calls
dragomirp Jun 1, 2025
29194b7
Merge branch 'tlsv4' into parallel-patroni-calls-tlsv4
dragomirp Jun 1, 2025
f625efc
Move ip change block before conf validation
dragomirp Jun 2, 2025
ca4d8c5
Merge branch 'tlsv4' into parallel-patroni-calls-tlsv4
dragomirp Jun 2, 2025
0c73c02
Try to update IPs after potential deferrals
dragomirp Jun 2, 2025
e1a82de
Update log message
dragomirp Jun 2, 2025
d3c741d
Revert IP update tweaks
dragomirp Jun 2, 2025
9ca2287
Remove client cert
dragomirp Jun 3, 2025
fb27850
Revert "Remove client cert"
dragomirp Jun 3, 2025
b2ac81b
Squashed commit of the following:
dragomirp Jun 4, 2025
a58aeb8
Merge branch '16/edge' into tlsv4
dragomirp Jun 4, 2025
afa2fb9
Merge branch 'tlsv4' into parallel-patroni-calls-tlsv4
dragomirp Jun 4, 2025
74903f1
Merge branch '16/edge' into parallel-patroni-calls-tlsv4
dragomirp Jun 13, 2025
5078dd1
Switch back to httpx
dragomirp Jun 16, 2025
0862ace
Merge branch '16/edge' into parallel-patroni-calls-tlsv4
dragomirp Jun 16, 2025
e4baaf3
Fix httpx
dragomirp Jun 16, 2025
bd50fdd
Merge branch '16/edge' into parallel-patroni-calls-tlsv4
dragomirp Jun 25, 2025
05a400d
Retry error when unable to reach cluster status
dragomirp Jun 25, 2025
d45b125
Merge branch '16/edge' into parallel-patroni-calls-tlsv4
dragomirp Jun 25, 2025
c2b1c2f
Re-enable upgrade tests
dragomirp Jun 25, 2025
a6bc887
Merge branch '16/edge' into parallel-patroni-calls-tlsv4
dragomirp Jun 30, 2025
dc147c8
Try to mute asyncio message
dragomirp Jun 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ jinja2 = "^3.1.6"
pysyncobj = "^0.3.14"
psutil = "^7.0.0"
charm-refresh = "^3.0.0.3"
httpx = "^0.28.1"

[tool.poetry.group.charm-libs.dependencies]
# data_platform_libs/v0/data_interfaces.py
Expand Down
50 changes: 33 additions & 17 deletions scripts/cluster_topology_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import json
import subprocess
import sys
from asyncio import as_completed, get_running_loop, run, wait
from contextlib import suppress
from os import environ
from ssl import CERT_NONE, create_default_context
from ssl import create_default_context
from time import sleep
from urllib.parse import urljoin
from urllib.request import urlopen
Expand All @@ -16,6 +18,10 @@

API_REQUEST_TIMEOUT = 5
PATRONI_CLUSTER_STATUS_ENDPOINT = "cluster"
TLS_CA_BUNDLE_FILE = "peer_ca_bundle.pem"
SNAP_CURRENT_PATH = "/var/snap/charmed-postgresql/current"
SNAP_CONF_PATH = f"{SNAP_CURRENT_PATH}/etc"
PATRONI_CONF_PATH = f"{SNAP_CONF_PATH}/patroni"

# File path for the spawned cluster topology observer process to write logs.
LOG_FILE_PATH = "/var/log/cluster_topology_observer.log"
Expand All @@ -25,6 +31,20 @@ class UnreachableUnitsError(Exception):
"""Cannot reach any known cluster member."""


def call_url(url, context):
"""Task handler for calling an url."""
try:
# Scheme is generated by the charm
resp = urlopen( # noqa: S310
url,
timeout=API_REQUEST_TIMEOUT,
context=context,
)
return json.loads(resp.read())
except Exception as e:
print(f"Failed to contact {url} with {e}")


def check_for_authorisation_rules_changes(run_cmd, unit, charm_dir, previous_authorisation_rules):
"""Check for changes in the authorisation rules.

Expand Down Expand Up @@ -120,7 +140,7 @@ def dispatch(run_cmd, unit, charm_dir, custom_event):
subprocess.run([run_cmd, "-u", unit, dispatch_sub_cmd.format(custom_event, charm_dir)]) # noqa: S603


def main():
async def main():
"""Main watch and dispatch loop.

Watch the Patroni API cluster info. When changes are detected, dispatch the change event.
Expand All @@ -135,23 +155,19 @@ def main():
while True:
# Disable TLS chain verification
context = create_default_context()
context.check_hostname = False
context.verify_mode = CERT_NONE
with suppress(FileNotFoundError):
context.load_verify_locations(cafile=f"{PATRONI_CONF_PATH}/{TLS_CA_BUNDLE_FILE}")

cluster_status = None
for url in urls:
try:
# Scheme is generated by the charm
resp = urlopen( # noqa: S310
url,
timeout=API_REQUEST_TIMEOUT,
context=context,
)
cluster_status = json.loads(resp.read())
loop = get_running_loop()
tasks = [loop.run_in_executor(None, call_url, url, context) for url in urls]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No deps when executing the script, so running urllib requests in an executor.

for task in as_completed(tasks):
if result := await task:
for task in tasks:
task.cancel()
await wait(tasks)
cluster_status = result
break
except Exception as e:
print(f"Failed to contact {url} with {e}")
continue
if not cluster_status:
raise UnreachableUnitsError("Unable to reach cluster members")
current_cluster_topology = {}
Expand Down Expand Up @@ -186,4 +202,4 @@ def main():


if __name__ == "__main__":
main()
run(main())
1 change: 1 addition & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
logger = logging.getLogger(__name__)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logging.getLogger("asyncio").setLevel(logging.WARNING)

PRIMARY_NOT_REACHABLE_MESSAGE = "waiting for primary to be reachable from this unit"
EXTENSIONS_DEPENDENCY_MESSAGE = "Unsatisfied plugin dependencies. Please check the logs"
Expand Down
114 changes: 62 additions & 52 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,21 @@
import re
import shutil
import subprocess
from asyncio import as_completed, create_task, run, wait
from contextlib import suppress
from pathlib import Path
from ssl import CERT_NONE, create_default_context
from typing import TYPE_CHECKING, Any, TypedDict

import charm_refresh
import psutil
import requests
from charms.operator_libs_linux.v2 import snap
from httpx import AsyncClient, BasicAuth, HTTPError
from jinja2 import Template
from ops import BlockedStatus
from pysyncobj.utility import TcpUtility, UtilityException
from tenacity import (
AttemptManager,
RetryError,
Retrying,
retry,
Expand Down Expand Up @@ -172,6 +175,10 @@ def __init__(
def _patroni_auth(self) -> requests.auth.HTTPBasicAuth:
return requests.auth.HTTPBasicAuth("patroni", self.patroni_password)

@property
def _patroni_async_auth(self) -> BasicAuth:
return BasicAuth("patroni", password=self.patroni_password)

@property
def _patroni_url(self) -> str:
"""Patroni REST API URL."""
Expand Down Expand Up @@ -249,28 +256,14 @@ def get_postgresql_version(self) -> str:
if snp["name"] == charm_refresh.snap_name():
return snp["version"]

def cluster_status(
self, alternative_endpoints: list | None = None
) -> list[ClusterMember] | None:
def cluster_status(self, alternative_endpoints: list | None = None) -> list[ClusterMember]:
"""Query the cluster status."""
# Request info from cluster endpoint (which returns all members of the cluster).
# TODO we don't know the other cluster's ca
verify = self.verify if not alternative_endpoints else False
for attempt in Retrying(
stop=stop_after_attempt(
len(alternative_endpoints) if alternative_endpoints else len(self.peers_ips)
)
if response := self.parallel_patroni_get_request(
f"/{PATRONI_CLUSTER_STATUS_ENDPOINT}", alternative_endpoints
):
with attempt:
request_url = self._get_alternative_patroni_url(attempt, alternative_endpoints)

cluster_status = requests.get(
f"{request_url}/{PATRONI_CLUSTER_STATUS_ENDPOINT}",
verify=verify,
timeout=API_REQUEST_TIMEOUT,
auth=self._patroni_auth,
)
return cluster_status.json()["members"]
return response["members"]
raise RetryError(last_attempt=Exception("Unable to reach any units"))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most existing code should handle RetryErrors instead of empty lists.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great move!


def get_member_ip(self, member_name: str) -> str | None:
"""Get cluster member IP address.
Expand All @@ -281,13 +274,14 @@ def get_member_ip(self, member_name: str) -> str | None:
Returns:
IP address of the cluster member.
"""
cluster_status = self.cluster_status()
if not cluster_status:
return
try:
cluster_status = self.cluster_status()

for member in cluster_status:
if member["name"] == member_name:
return member["host"]
for member in cluster_status:
if member["name"] == member_name:
return member["host"]
except RetryError:
logger.debug("Unable to get IP. Cluster status unreachable")

def get_member_status(self, member_name: str) -> str:
"""Get cluster member status.
Expand All @@ -307,6 +301,44 @@ def get_member_status(self, member_name: str) -> str:
return member["state"]
return ""

async def _httpx_get_request(self, url: str, verify: bool = True):
ssl_ctx = create_default_context()
if verify:
with suppress(FileNotFoundError):
ssl_ctx.load_verify_locations(cafile=f"{PATRONI_CONF_PATH}/{TLS_CA_BUNDLE_FILE}")
else:
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = CERT_NONE
async with AsyncClient(
auth=self._patroni_async_auth, timeout=API_REQUEST_TIMEOUT, verify=ssl_ctx
) as client:
try:
return (await client.get(url)).json()
except (HTTPError, ValueError):
return None

async def _async_get_request(self, uri: str, endpoints: list[str], verify: bool = True):
tasks = [
create_task(self._httpx_get_request(f"https://{ip}:8008{uri}", verify))
for ip in endpoints
]
for task in as_completed(tasks):
if result := await task:
for task in tasks:
task.cancel()
await wait(tasks)
Comment on lines +326 to +329
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Get the first result, cancel the other requests.

return result

def parallel_patroni_get_request(self, uri: str, endpoints: list[str] | None = None) -> dict:
"""Call all possible patroni endpoints in parallel."""
if not endpoints:
endpoints = (self.unit_ip, *self.peers_ips)
verify = True
else:
# TODO we don't know the other cluster's ca
verify = False
return run(self._async_get_request(uri, endpoints, verify))

def get_primary(
self, unit_name_pattern=False, alternative_endpoints: list[str] | None = None
) -> str | None:
Expand All @@ -320,14 +352,17 @@ def get_primary(
primary pod or unit name.
"""
# Request info from cluster endpoint (which returns all members of the cluster).
if cluster_status := self.cluster_status(alternative_endpoints):
try:
cluster_status = self.cluster_status(alternative_endpoints)
for member in cluster_status:
if member["role"] == "leader":
primary = member["name"]
if unit_name_pattern:
# Change the last dash to / in order to match unit name pattern.
primary = label2name(primary)
return primary
except RetryError:
logger.debug("Unable to get primary. Cluster status unreachable")

def get_standby_leader(
self, unit_name_pattern=False, check_whether_is_running: bool = False
Expand Down Expand Up @@ -366,31 +401,6 @@ def get_sync_standby_names(self) -> list[str]:
sync_standbys.append(label2name(member["name"]))
return sync_standbys

def _get_alternative_patroni_url(
self, attempt: AttemptManager, alternative_endpoints: list[str] | None = None
) -> str:
"""Get an alternative REST API URL from another member each time.

When the Patroni process is not running in the current unit it's needed
to use a URL from another cluster member REST API to do some operations.
"""
if alternative_endpoints is not None:
return self._patroni_url.replace(
self.unit_ip, alternative_endpoints[attempt.retry_state.attempt_number - 1]
)
attempt_number = attempt.retry_state.attempt_number
if attempt_number > 1:
url = self._patroni_url
if (attempt_number - 1) <= len(self.peers_ips):
unit_number = attempt_number - 2
else:
unit_number = attempt_number - 2 - len(self.peers_ips)
other_unit_ip = list(self.peers_ips)[unit_number]
url = url.replace(self.unit_ip, other_unit_ip)
else:
url = self._patroni_url
return url

def are_all_members_ready(self) -> bool:
"""Check if all members are correctly running Patroni and PostgreSQL.

Expand Down
2 changes: 1 addition & 1 deletion tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ def test_on_start(harness):
patch(
"charm.PostgresqlOperatorCharm._is_storage_attached",
side_effect=[False, True, True, True, True, True],
) as _is_storage_attached,
),
patch(
"charm.PostgresqlOperatorCharm._can_connect_to_postgresql",
new_callable=PropertyMock,
Expand Down
Loading
Loading