Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,21 @@ set-tls-private-key:
private-key:
type: string
description: The content of private key for communications with clients. Content will be auto-generated if this option is not specified.

promote-standby-cluster:
description: Promotes the standby cluster of choice to a leader. Must be ran against the charm unit leader of the standby cluster.
params:
force:
type: boolean
default: False
description: |
WARNING: this option set to True WILL WIPE OUT your current primary cluster!
If this option and "force-really-really-mean-it" are set both to true, then this unit will take over the primary role.
It only works in the case of cross-cluster replication, where both clusters are connected to each other in the async-primary.
force-really-really-mean-it:
type: boolean
default: False
description: |
WARNING: this option set to True WILL WIPE OUT your current primary cluster!
If this option and "force" are set both to true, then this unit will take over the primary role.
It only works in the case of cross-cluster replication, where both clusters are connected to each other in the async-primary.
5 changes: 5 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ peers:
interface: upgrade

provides:
async-primary:
interface: async_replication
database:
interface: postgresql_client
db:
Expand All @@ -51,6 +53,9 @@ provides:
interface: grafana_dashboard

requires:
async-replica:
interface: async_replication
limit: 1
certificates:
interface: tls-certificates
limit: 1
Expand Down
2 changes: 2 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
WORKLOAD_OS_USER,
)
from patroni import NotReadyError, Patroni
from relations.async_replication import PostgreSQLAsyncReplication
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
from upgrade import PostgreSQLUpgrade, get_postgresql_k8s_dependencies_model
Expand Down Expand Up @@ -152,6 +153,7 @@ def __init__(self, *args):
postgresql_db_port = ServicePort(5432, name="database")
patroni_api_port = ServicePort(8008, name="api")
self.service_patcher = KubernetesServicePatch(self, [postgresql_db_port, patroni_api_port])
self.async_manager = PostgreSQLAsyncReplication(self)

def _generate_metrics_jobs(self, enable_tls: bool) -> Dict:
"""Generate spec for Prometheus scraping."""
Expand Down
217 changes: 217 additions & 0 deletions src/coordinator_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.

"""The coordinated ops is a class that ensures a certain activity is ran together.

The concept is similar to the "cohort" in snaps, where all units wait until they can
proceed to execute a certain activity, for example, restarting your service.

The process starts with the leader issuing a new coordination request. Effectively,
that is implemented as the _<relation-name>_coord_counter is increased +1 in the app level.
_<relation-name>_coord_approved is set to "False".

Each unit receives a relation-changed, which is then re-issued as a _coordinator_requested
event. Once the unit done its task, it should ack the request.
Each unit should ack the request by equaling its own _<relation-name>_coord_counter
to the app's value.

Once all units ack'ed the _<relation-name>_coord_counter, then the leader switches the
_<relation-name>_coord_approved to "True". All units then will process that new change as a
"coordinator-approved" event and execute the activity they have been waiting.

If there is a need to coordinate several activities in sequence, e.g. coordinated stop and then
coordinated start, it is recommended that the leader unit publishes twice a _requested, as follows:


class MyCharm:

def __init__(self, *args):
self.stop_coordinator = CoordinatedOpsManager(relation, tag="_stop_my_charm")
self.start_coordinator = CoordinatedOpsManager(relation, tag="_start_my_charm")

self.framework.observe(
self.stop_coordinator.on.coordinator_requested,
self._on_coordinator_requested
)
self.framework.observe(
self.stop_coordinator.on.coordinator_approved,
self._on_coordinator_approved
)
self.framework.observe(
self.start_coordinator.on.coordinator_requested,
self._on_coordinator_requested
)
self.framework.observe(
self.start_coordinator.on.coordinator_approved,
self._on_coordinator_approved
)

def _a_method():
# A method that kick starts the restarting coordination
......
if self.charm.unit.is_leader():
self.stop_coordinator.coordinate()

def _on_coordinator_requested(self, event):
if self.service_is_running and event.tag == "_stop_my_charm":
# We are in the stop-phase
self.service.stop()
self.stop_coordinator.acknowledge(event)
elif event.tag == "_start_my_charm":
# we are in the starting-phase
self.service.start()
self.start_coordinator.acknowledge(event)

def _on_coordinator_approved(self, event):
# All units have ack'ed the activity, which means we have stopped.
if self.charm.unit.is_leader() and event.tag == "_stop_my_charm":
# Now kickstart the restarting process
self.start_coordinator.coordinate()
"""


import logging
from typing import AnyStr

from ops.charm import (
CharmBase,
CharmEvents,
EventSource,
RelationChangedEvent,
)
from ops.framework import EventBase, Object

logger = logging.getLogger(__name__)


class CoordinatorEventBase(EventBase):
"""Base event for the coordination activities."""

def __init__(self, handle: 'Handle', tag: str):
super().__init__(handle)
self._tag = tag

@property
def tag(self):
"""Returns the tag representing this coordinator's controllers."""
return self._tag


class CoordinatorRequestedEvent(CoordinatorEventBase):
"""Event to signal that the leader requested the units to coordinate a new activity."""
def __init__(self, handle: 'Handle', tag: str):
super().__init__(handle, tag)


class CoordinatorApprovedEvent(CoordinatorEventBase):
"""Event to signal that all units ack'ed the coordination request and can proceed."""
def __init__(self, handle: 'Handle', tag: str):
super().__init__(handle, tag)


class CoordinatorCharmEvents(CharmEvents):
"""List of events that the TLS Certificates requirer charm can leverage."""

coordinator_approved = EventSource(CoordinatorApprovedEvent)
coordinator_requested = EventSource(CoordinatorRequestedEvent)


class CoordinatedOpsManager(Object):
"""Coordinates activities that demand the entire peer group to act at once."""

on = CoordinatorCharmEvents()

def __init__(self, charm: CharmBase, relation: AnyStr, tag: AnyStr = ""):
super().__init__(charm, relation)
self.tag = tag
self.relation = relation
self.app = charm.app
self.name = relation + tag # use the tag to separate multiple coordinator objects
# in the same charm class.
self.charm = charm # Maintain a reference to charm, so we can emit events.
self.framework.observe(charm.on[self.relation].relation_changed, self._on_relation_changed)

@property
def under_coordination(self):
"""Returns True if the _coord_approved == False."""
return (
self.model.get_relation(self.relation)
.data[self.app]
.get(f"_{self.name}_coord_approved", "True")
== "False"
)

def coordinate(self):
"""Process a request to ask a new coordination activity.

If we are the leader, fire off a coordinator requested event in the self.name.
"""
logger.info("coordinate: starting")
if self.charm.unit.is_leader():
counter = (
self.model.get_relation(self.relation)
.data[self.app]
.get(f"_{self.name}_coord_counter", 0)
)
self.model.get_relation(self.relation).data[self.app][
f"_{self.name}_coord_counter"
] = str(counter + 1 if counter < 10000000 else 0)
self.model.get_relation(self.relation).data[self.app][
f"_{self.name}_coord_approved"
] = "False"
logger.info("coordinate: tasks executed")

def acknowledge(self, event):
"""Runs the ack of the latest requested coordination.

Each unit will set their own _counter to the same value as app's.
"""
coord_counter = f"_{self.name}_coord_counter"
self.model.get_relation(self.relation).data[self.charm.unit][coord_counter] = str(
self.model.get_relation(self.relation).data[self.app].get(coord_counter, 0)
)
logger.info("acknowledge: updated internal counter")

if not self.charm.unit.is_leader():
# Nothing to do anymore.
logger.info("acknowledge: this unit is not a leader")
return

relation = self.model.get_relation(self.relation)
# Now, the leader must check if everyone has ack'ed
for unit in relation.units:
if relation.data[unit].get(coord_counter, "0") != relation.data[self.app].get(
coord_counter, "0"
):
logger.info(f"acknowledge: {unit.name} still has a different coord_counter")
# We defer the event until _coord_approved == True.
# If we have _coord_counter differing, then we are not yet there.
event.defer()
return
logger.info("acknowledge: all units are set, set coord_approved == True")
# Just confirmed we have all units ack'ed. Now, set the approval.
relation.data[self.app][f"_{self.name}_coord_approved"] = "True"

def _on_relation_changed(self: CharmBase, _: RelationChangedEvent):
"""Process relation changed.

First, determine whether this unit has received a new request for coordination.

Then, if we are the leader, fire off a coordinator requested event.
"""
logger.info("coordinator: starting _on_relation_changed")
relation_data = self.model.get_relation(self.relation).data[self.app]
unit_data = self.model.get_relation(self.relation).data[self.charm.unit]

if relation_data.get(f"_{self.name}_coord_approved", "False") == "True":
logger.info("coordinator: _on_relation_changed -- coordinator approved")
# We are approved to move on, issue the coordinator_approved event.
self.on.coordinator_approved.emit(self.tag)
return
coord_counter = f"_{self.name}_coord_counter"
if coord_counter in relation_data and relation_data.get(
coord_counter, "0"
) != unit_data.get(coord_counter, "0"):
logger.info("coordinator: _on_relation_changed -- coordinator requested")
self.on.coordinator_requested.emit(self.tag)
return
15 changes: 13 additions & 2 deletions src/patroni.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,9 @@ def render_patroni_yml_file(
# Open the template patroni.yml file.
with open("templates/patroni.yml.j2", "r") as file:
template = Template(file.read())

primary = self._charm.async_manager.get_primary_data()

# Render the template file with the correct values.
rendered = template.render(
connectivity=connectivity,
Expand All @@ -343,8 +346,12 @@ def render_patroni_yml_file(
is_no_sync_member=is_no_sync_member,
namespace=self._namespace,
storage_path=self._storage_path,
superuser_password=self._superuser_password,
replication_password=self._replication_password,
superuser_password=primary["superuser-password"]
if primary
else self._superuser_password,
replication_password=primary["replication-password"]
if primary
else self._replication_password,
rewind_user=REWIND_USER,
rewind_password=self._rewind_password,
enable_pgbackrest=stanza is not None,
Expand All @@ -355,6 +362,10 @@ def render_patroni_yml_file(
minority_count=self._members_count // 2,
version=self.rock_postgresql_version.split(".")[0],
pg_parameters=parameters,
standby_cluster_endpoint=primary["endpoint"] if primary else None,
extra_replication_endpoints={"{}/32".format(primary["endpoint"])}
if primary
else self._charm.async_manager.standby_endpoints(),
)
self._render_file(f"{self._storage_path}/patroni.yml", rendered, 0o644)

Expand Down
Loading