Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
ea1dedd
Add async replication implementation
marceloneppel Apr 22, 2024
3cac31c
Add async replication integration tests
marceloneppel Apr 22, 2024
9212766
Add test for scaling
marceloneppel Apr 23, 2024
f03a281
Backup standby pgdata folder
marceloneppel Apr 23, 2024
8c6b6b4
Merge branch 'dpe-2953-async-replication' into dpe-2955-async-replica…
marceloneppel Apr 23, 2024
041228e
Fix OS call
marceloneppel Apr 23, 2024
67a0459
Merge branch 'dpe-2953-async-replication' into dpe-2955-async-replica…
marceloneppel Apr 23, 2024
500ce2c
Fix unit tests
marceloneppel Apr 23, 2024
2a490eb
Merge branch 'dpe-2953-async-replication' into dpe-2955-async-replica…
marceloneppel Apr 23, 2024
e41b18b
Improve comments and logs
marceloneppel Apr 23, 2024
848e071
Merge branch 'dpe-2953-async-replication' into dpe-2955-async-replica…
marceloneppel Apr 23, 2024
2d86e32
Fix juju3 markers
marceloneppel Apr 29, 2024
b9f251d
Merge remote-tracking branch 'origin/main' into dpe-2953-async-replic…
marceloneppel Apr 29, 2024
bbd5976
Revert permission change
marceloneppel Apr 29, 2024
32f3351
Merge branch 'dpe-2953-async-replication' into dpe-2955-async-replica…
marceloneppel Apr 29, 2024
5c491a0
Merge remote-tracking branch 'origin/main' into dpe-2953-async-replic…
marceloneppel May 2, 2024
e918e4d
Add optional type hint
marceloneppel May 2, 2024
08932b2
Merge branch 'dpe-2953-async-replication' into dpe-2955-async-replica…
marceloneppel May 2, 2024
1fd2b77
Add relation name to secret label and revert poetry.lock
marceloneppel May 2, 2024
ae3dd2e
Merge branch 'dpe-2953-async-replication' into dpe-2955-async-replica…
marceloneppel May 2, 2024
0f2ba76
Reload Patroni configuration when member is not ready yet
marceloneppel May 2, 2024
2be4d5e
Merge branch 'dpe-2953-async-replication' into dpe-2955-async-replica…
marceloneppel May 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions actions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ list-backups:
description: Lists backups in s3 storage.
pre-upgrade-check:
description: Run necessary pre-upgrade checks and preparations before executing a charm refresh.
promote-cluster:
description: Promotes the cluster of choice to a primary cluster. Must be ran against the leader unit.
params:
force-promotion:
type: boolean
description: Force the promotion of a cluster when there is already a primary cluster.
restore:
description: Restore a database backup using pgBackRest.
S3 credentials are retrieved from a relation with the S3 integrator charm.
Expand Down
45 changes: 23 additions & 22 deletions lib/charms/postgresql_k8s/v0/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

Any charm using this library should import the `psycopg2` or `psycopg2-binary` dependency.
"""

import logging
from collections import OrderedDict
from typing import Dict, List, Optional, Set, Tuple
Expand All @@ -35,7 +36,7 @@

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 24
LIBPATCH = 26

INVALID_EXTRA_USER_ROLE_BLOCKING_MESSAGE = "invalid role(s) for extra user roles"

Expand Down Expand Up @@ -358,9 +359,7 @@ def _generate_database_privileges_statements(
statements.append(
"""UPDATE pg_catalog.pg_largeobject_metadata
SET lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}')
WHERE lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}');""".format(
user, self.user
)
WHERE lomowner = (SELECT oid FROM pg_roles WHERE rolname = '{}');""".format(user, self.user)
)
else:
for schema in schemas:
Expand Down Expand Up @@ -477,11 +476,11 @@ def set_up_database(self) -> None:
"""Set up postgres database with the right permissions."""
connection = None
try:
self.create_user(
"admin",
extra_user_roles="pg_read_all_data,pg_write_all_data",
)
with self._connect_to_database() as connection, connection.cursor() as cursor:
cursor.execute("SELECT TRUE FROM pg_roles WHERE rolname='admin';")
if cursor.fetchone() is not None:
return

# Allow access to the postgres database only to the system users.
cursor.execute("REVOKE ALL PRIVILEGES ON DATABASE postgres FROM PUBLIC;")
cursor.execute("REVOKE CREATE ON SCHEMA public FROM PUBLIC;")
Expand All @@ -491,6 +490,10 @@ def set_up_database(self) -> None:
sql.Identifier(user)
)
)
self.create_user(
"admin",
extra_user_roles="pg_read_all_data,pg_write_all_data",
)
cursor.execute("GRANT CONNECT ON DATABASE postgres TO admin;")
except psycopg2.Error as e:
logger.error(f"Failed to set up databases: {e}")
Expand Down Expand Up @@ -562,18 +565,16 @@ def build_postgresql_parameters(
parameters = {}
for config, value in config_options.items():
# Filter config option not related to PostgreSQL parameters.
if not config.startswith(
(
"durability",
"instance",
"logging",
"memory",
"optimizer",
"request",
"response",
"vacuum",
)
):
if not config.startswith((
"durability",
"instance",
"logging",
"memory",
"optimizer",
"request",
"response",
"vacuum",
)):
continue
parameter = "_".join(config.split("_")[1:])
if parameter in ["date_style", "time_zone"]:
Expand All @@ -594,8 +595,8 @@ def build_postgresql_parameters(
# and the remaining as cache memory.
shared_buffers = int(available_memory * 0.25)
effective_cache_size = int(available_memory - shared_buffers)
parameters.setdefault("shared_buffers", f"{int(shared_buffers/10**6)}MB")
parameters.update({"effective_cache_size": f"{int(effective_cache_size/10**6)}MB"})
parameters.setdefault("shared_buffers", f"{int(shared_buffers / 10**6)}MB")
parameters.update({"effective_cache_size": f"{int(effective_cache_size / 10**6)}MB"})
else:
# Return default
parameters.setdefault("shared_buffers", "128MB")
Expand Down
8 changes: 8 additions & 0 deletions metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ peers:
interface: upgrade

provides:
async-primary:
interface: async_replication
limit: 1
optional: true
database:
interface: postgresql_client
db:
Expand All @@ -37,6 +41,10 @@ provides:
limit: 1

requires:
async-replica:
interface: async_replication
limit: 1
optional: true
certificates:
interface: tls-certificates
limit: 1
Expand Down
50 changes: 44 additions & 6 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
USER,
USER_PASSWORD_KEY,
)
from relations.async_replication import PostgreSQLAsyncReplication
from relations.db import EXTENSIONS_BLOCKING_MESSAGE, DbProvides
from relations.postgresql_provider import PostgreSQLProvider
from upgrade import PostgreSQLUpgrade, get_postgresql_dependencies_model
Expand Down Expand Up @@ -166,6 +167,7 @@ def __init__(self, *args):
self.legacy_db_admin_relation = DbProvides(self, admin=True)
self.backup = PostgreSQLBackups(self, "s3-parameters")
self.tls = PostgreSQLTLS(self, PEER)
self.async_replication = PostgreSQLAsyncReplication(self)
self.restart_manager = RollingOpsManager(
charm=self, relation="restart", callback=self._restart
)
Expand Down Expand Up @@ -321,6 +323,8 @@ def primary_endpoint(self) -> Optional[str]:
for attempt in Retrying(stop=stop_after_delay(60), wait=wait_fixed(3)):
with attempt:
primary = self._patroni.get_primary()
if primary is None and (standby_leader := self._patroni.get_standby_leader()):
primary = standby_leader
primary_endpoint = self._patroni.get_member_ip(primary)
# Force a retry if there is no primary or the member that was
# returned is not in the list of the current cluster members
Expand Down Expand Up @@ -420,6 +424,9 @@ def _on_peer_relation_departed(self, event: RelationDepartedEvent) -> None:
self.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE)
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

def _on_pgdata_storage_detaching(self, _) -> None:
# Change the primary if it's the unit that is being removed.
try:
Expand Down Expand Up @@ -513,9 +520,13 @@ def _on_peer_relation_changed(self, event: HookEvent):

# Restart the workload if it's stuck on the starting state after a timeline divergence
# due to a backup that was restored.
if not self.is_primary and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
if (
not self.is_primary
and not self.is_standby_leader
and (
self._patroni.member_replication_lag == "unknown"
or int(self._patroni.member_replication_lag) > 1000
)
):
self._patroni.reinitialize_postgresql()
logger.debug("Deferring on_peer_relation_changed: reinitialising replica")
Expand Down Expand Up @@ -551,8 +562,7 @@ def _update_new_unit_status(self) -> None:
# a failed switchover, so wait until the primary is elected.
if self.primary_endpoint:
self._update_relation_endpoints()
if not self.is_blocked:
self.unit.status = ActiveStatus()
self.async_replication.handle_read_only_mode()
else:
self.unit.status = WaitingStatus(PRIMARY_NOT_REACHABLE_MESSAGE)

Expand Down Expand Up @@ -688,6 +698,7 @@ def _hosts(self) -> set:
def _patroni(self) -> Patroni:
"""Returns an instance of the Patroni object."""
return Patroni(
self,
self._unit_ip,
self.cluster_name,
self._member_name,
Expand All @@ -704,6 +715,11 @@ def is_primary(self) -> bool:
"""Return whether this unit is the primary instance."""
return self.unit.name == self._patroni.get_primary(unit_name_pattern=True)

@property
def is_standby_leader(self) -> bool:
"""Return whether this unit is the standby leader instance."""
return self.unit.name == self._patroni.get_standby_leader(unit_name_pattern=True)

@property
def is_tls_enabled(self) -> bool:
"""Return whether TLS is enabled."""
Expand Down Expand Up @@ -902,6 +918,9 @@ def _on_config_changed(self, event) -> None:
if self.is_blocked and "Configuration Error" in self.unit.status.message:
self.unit.status = ActiveStatus()

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

if not self.unit.is_leader():
return

Expand Down Expand Up @@ -929,6 +948,9 @@ def enable_disable_extensions(self, database: str = None) -> None:
Args:
database: optional database where to enable/disable the extension.
"""
if self._patroni.get_primary() is None:
logger.debug("Early exit enable_disable_extensions: standby cluster")
return
spi_module = ["refint", "autoinc", "insert_username", "moddatetime"]
plugins_exception = {"uuid_ossp": '"uuid-ossp"'}
original_status = self.unit.status
Expand Down Expand Up @@ -1188,6 +1210,9 @@ def _on_set_password(self, event: ActionEvent) -> None:
# Other units Patroni configuration will be reloaded in the peer relation changed event.
self.update_config()

# Update the password in the async replication data.
self.async_replication.update_async_replication_data()

event.set_results({"password": password})

def _on_update_status(self, _) -> None:
Expand Down Expand Up @@ -1225,6 +1250,9 @@ def _on_update_status(self, _) -> None:
if self._handle_workload_failures():
return

# Update the sync-standby endpoint in the async replication data.
self.async_replication.update_async_replication_data()

self._set_primary_status_message()

# Restart topology observer if it is gone
Expand Down Expand Up @@ -1270,8 +1298,16 @@ def _handle_workload_failures(self) -> bool:
a bool indicating whether the charm performed any action.
"""
# Restart the workload if it's stuck on the starting state after a restart.
try:
is_primary = self.is_primary
is_standby_leader = self.is_standby_leader
except RetryError:
return False

if (
not self._patroni.member_started
not is_primary
and not is_standby_leader
and not self._patroni.member_started
and "postgresql_restarted" in self._peers.data[self.unit]
and self._patroni.member_replication_lag == "unknown"
):
Expand All @@ -1291,6 +1327,8 @@ def _set_primary_status_message(self) -> None:
try:
if self._patroni.get_primary(unit_name_pattern=True) == self.unit.name:
self.unit.status = ActiveStatus("Primary")
elif self.is_standby_leader:
self.unit.status = ActiveStatus("Standby Leader")
elif self._patroni.member_started:
self.unit.status = ActiveStatus()
except (RetryError, ConnectionError) as e:
Expand Down
Loading