Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions lib/charms/postgresql_k8s/v1/postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1246,3 +1246,21 @@ def validate_group_map(self, group_map: Optional[str]) -> bool:
return False

return True

def is_user_in_hba(self, username: str) -> bool:
"""Check if user was added in pg_hba."""
connection = None
try:
with self._connect_to_database() as connection, connection.cursor() as cursor:
cursor.execute(
SQL(
"SELECT COUNT(*) FROM pg_hba_file_rules WHERE {} = ANY(user_name);"
).format(Literal(username))
)
return cursor.fetchone()[0] > 0
except psycopg2.Error as e:
logger.debug(f"Failed to check pg_hba: {e}")
return False
finally:
if connection:
connection.close()
Comment on lines +1250 to +1266
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if a user has been added to pg_hba

38 changes: 23 additions & 15 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2488,22 +2488,30 @@ def relations_user_databases_map(self) -> dict:
if not self.is_cluster_initialised or not self._patroni.member_started:
return {USER: "all", REPLICATION_USER: "all", REWIND_USER: "all"}
user_database_map = {}
for user in self.postgresql.list_users_from_relation(
current_host=self.is_connectivity_enabled
):
user_database_map[user] = ",".join(
self.postgresql.list_accessible_databases_for_user(
user, current_host=self.is_connectivity_enabled
try:
for user in self.postgresql.list_users_from_relation(
current_host=self.is_connectivity_enabled
):
user_database_map[user] = ",".join(
self.postgresql.list_accessible_databases_for_user(
user, current_host=self.is_connectivity_enabled
)
)
)
# Add "landscape" superuser by default to the list when the "db-admin" relation is present.
if any(True for relation in self.client_relations if relation.name == "db-admin"):
user_database_map["landscape"] = "all"
if self.postgresql.list_access_groups(current_host=self.is_connectivity_enabled) != set(
ACCESS_GROUPS
):
user_database_map.update({USER: "all", REPLICATION_USER: "all", REWIND_USER: "all"})
return user_database_map
# Add "landscape" superuser by default to the list when the "db-admin" relation is present.
if any(True for relation in self.client_relations if relation.name == "db-admin"):
user_database_map["landscape"] = "all"
if self.postgresql.list_access_groups(
current_host=self.is_connectivity_enabled
) != set(ACCESS_GROUPS):
user_database_map.update({
USER: "all",
REPLICATION_USER: "all",
REWIND_USER: "all",
})
return user_database_map
except PostgreSQLListUsersError:
logger.debug("relations_user_databases_map: Unable to get users")
return {USER: "all", REPLICATION_USER: "all", REWIND_USER: "all"}
Comment on lines +2512 to +2514
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handling in the property, because otherwise we cannot regenerate the configuration.


def override_patroni_restart_condition(
self, new_condition: str, repeat_cause: str | None
Expand Down
12 changes: 12 additions & 0 deletions src/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,8 @@ def member_started(self) -> bool:
True if services is ready False otherwise. Retries over a period of 60 seconds times to
allow server time to start up.
"""
if not self.is_patroni_running():
return False
Comment on lines +498 to +499
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no service running, there's no point calling the rest API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to backport this to plain 14/edge.
It smells to me as a game changes for many long-wasting activities.

Should it also be included in member_inactive() below (and maybe all other requests.get(http://localhost/cluster) requests).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we try to backport this to plain 14/edge. It smells to me as a game changes for many long-wasting activities.

If the merge goes well, we should backport both this and the pg_hba check.

Should it also be included in member_inactive() below (and maybe all other requests.get(http://localhost/cluster) requests).

We can give it a go, but lets get the 14/edge changes first.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Backport PR: #963.

try:
response = self.get_patroni_health()
except RetryError:
Expand Down Expand Up @@ -979,6 +981,16 @@ def reload_patroni_configuration(self):
timeout=PATRONI_TIMEOUT,
)

def is_patroni_running(self) -> bool:
"""Check if the Patroni service is running."""
try:
cache = snap.SnapCache()
selected_snap = cache["charmed-postgresql"]
return selected_snap.services["patroni"]["active"]
except snap.SnapError as e:
logger.debug(f"Failed to check Patroni service: {e}")
return False

def restart_patroni(self) -> bool:
"""Restart Patroni.

Expand Down
34 changes: 11 additions & 23 deletions src/relations/postgresql_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import logging
import typing
from datetime import datetime

from charms.data_platform_libs.v0.data_interfaces import (
DatabaseProvides,
Expand All @@ -23,6 +24,7 @@
from ops.charm import RelationBrokenEvent, RelationChangedEvent
from ops.framework import Object
from ops.model import ActiveStatus, BlockedStatus, Relation
from tenacity import RetryError, Retrying, stop_after_attempt, wait_fixed

from constants import (
ALL_CLIENT_RELATIONS,
Expand Down Expand Up @@ -71,9 +73,6 @@ def __init__(self, charm: "PostgresqlOperatorCharm", relation_name: str = "datab
self.framework.observe(
self.database_provides.on.database_requested, self._on_database_requested
)
self.framework.observe(
charm.on[self.relation_name].relation_changed, self._on_relation_changed
)

@staticmethod
def _sanitize_extra_roles(extra_roles: str | None) -> list[str]:
Expand Down Expand Up @@ -153,27 +152,16 @@ def _on_database_requested(self, event: DatabaseRequestedEvent) -> None:
)
)

def _on_relation_changed(self, event: RelationChangedEvent) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking for pg_hba changes before releasing database_requested should make this hook redundant.

# Check for some conditions before trying to access the PostgreSQL instance.
if not self.charm.is_cluster_initialised:
logger.debug(
"Deferring on_relation_changed: Cluster must be initialized before configuration can be updated with relation users"
)
event.defer()
return

user = f"relation-{event.relation.id}"
# Try to wait for pg_hba trigger
try:
if user not in self.charm.postgresql.list_users():
logger.debug("Deferring on_relation_changed: user was not created yet")
event.defer()
return
except PostgreSQLListUsersError:
logger.debug("Deferring on_relation_changed: failed to list users")
event.defer()
return

self.charm.update_config()
for attempt in Retrying(stop=stop_after_attempt(3), wait=wait_fixed(1)):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hold the hook until the user is added to pg_hba or timeout. Locally for me it's happening on the first try, but we may want to increase the timeout to accommodate for variance in hw/storage. On timeout, the topology observer should detect the changes when they become available.

with attempt:
self.charm.postgresql.is_user_in_hba(user)
self.charm.unit_peer_data.update({
"pg_hba_needs_update_timestamp": str(datetime.now())
})
except RetryError:
logger.warning("database requested: Unable to check pg_hba rule update")

def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
"""Correctly update the status."""
Expand Down
48 changes: 22 additions & 26 deletions tests/integration/ha_tests/test_async_replication.py
Original file line number Diff line number Diff line change
Expand Up @@ -529,36 +529,32 @@ async def test_scaling(
logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test)

async with ops_test.fast_forward(FAST_INTERVAL), fast_forward(second_model, FAST_INTERVAL):
logger.info("scaling out the first cluster")
first_cluster_original_size = len(first_model.applications[DATABASE_APP_NAME].units)
await scale_application(ops_test, DATABASE_APP_NAME, first_cluster_original_size + 1)

logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test, extra_model=second_model)

logger.info("scaling out the second cluster")
second_cluster_original_size = len(second_model.applications[DATABASE_APP_NAME].units)
await scale_application(
ops_test, DATABASE_APP_NAME, second_cluster_original_size + 1, model=second_model
)

logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test, extra_model=second_model)

logger.info("scaling in the first cluster")
await scale_application(ops_test, DATABASE_APP_NAME, first_cluster_original_size)
logger.info("scaling out the clusters")
first_cluster_original_size = len(first_model.applications[DATABASE_APP_NAME].units)
second_cluster_original_size = len(second_model.applications[DATABASE_APP_NAME].units)
await gather(
scale_application(ops_test, DATABASE_APP_NAME, first_cluster_original_size + 1),
scale_application(
ops_test,
DATABASE_APP_NAME,
second_cluster_original_size + 1,
model=second_model,
),
)

logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test, extra_model=second_model)
logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test, extra_model=second_model)

logger.info("scaling in the second cluster")
await scale_application(
logger.info("scaling in the clusters")
await gather(
scale_application(ops_test, DATABASE_APP_NAME, first_cluster_original_size),
scale_application(
ops_test, DATABASE_APP_NAME, second_cluster_original_size, model=second_model
)
),
)

logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test, extra_model=second_model)
logger.info("checking whether writes are increasing")
await are_writes_increasing(ops_test, extra_model=second_model)

# Verify that no writes to the database were missed after stopping the writes
# (check that all the units have all the writes).
Expand Down
13 changes: 10 additions & 3 deletions tests/integration/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,12 @@ async def run_command_on_unit(ops_test: OpsTest, unit_name: str, command: str) -


async def scale_application(
ops_test: OpsTest, application_name: str, count: int, model: Model = None
ops_test: OpsTest,
application_name: str,
count: int,
model: Model = None,
timeout=2000,
idle_period: int = 30,
Comment on lines +1025 to +1026
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Async seems to fit in the timeout/idle periods now, but leaving the parameters exposed in case we want to tweak.

) -> None:
"""Scale a given application to a specific unit count.

Expand All @@ -1027,6 +1032,8 @@ async def scale_application(
application_name: The name of the application
count: The desired number of units to scale to
model: The model to scale the application in
timeout: timeout period
idle_period: idle period
"""
if model is None:
model = ops_test.model
Expand All @@ -1039,8 +1046,8 @@ async def scale_application(
await model.wait_for_idle(
apps=[application_name],
status="active",
timeout=2000,
idle_period=30,
timeout=timeout,
idle_period=idle_period,
wait_for_exact_units=count,
)

Expand Down
3 changes: 3 additions & 0 deletions tests/unit/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,7 @@ def test_member_started_true(peers_ips, patroni):
patch("cluster.requests.get") as _get,
patch("cluster.stop_after_delay", return_value=stop_after_delay(0)),
patch("cluster.wait_fixed", return_value=wait_fixed(0)),
patch("charm.Patroni.is_patroni_running", return_value=True),
):
_get.return_value.json.return_value = {"state": "running"}

Expand All @@ -580,6 +581,7 @@ def test_member_started_false(peers_ips, patroni):
patch("cluster.requests.get") as _get,
patch("cluster.stop_after_delay", return_value=stop_after_delay(0)),
patch("cluster.wait_fixed", return_value=wait_fixed(0)),
patch("charm.Patroni.is_patroni_running", return_value=True),
):
_get.return_value.json.return_value = {"state": "stopped"}

Expand All @@ -598,6 +600,7 @@ def test_member_started_error(peers_ips, patroni):
patch("cluster.requests.get") as _get,
patch("cluster.stop_after_delay", return_value=stop_after_delay(0)),
patch("cluster.wait_fixed", return_value=wait_fixed(0)),
patch("charm.Patroni.is_patroni_running", return_value=True),
):
_get.side_effect = Exception

Expand Down
3 changes: 0 additions & 3 deletions tests/unit/test_postgresql_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,6 @@ def request_database(_harness):
def test_on_database_requested(harness):
with (
patch("charm.PostgresqlOperatorCharm.update_config"),
patch(
"relations.postgresql_provider.PostgreSQLProvider._on_relation_changed"
) as _on_relation_changed,
patch.object(PostgresqlOperatorCharm, "postgresql", Mock()) as postgresql_mock,
patch("subprocess.check_output", return_value=b"C"),
patch("charm.PostgreSQLProvider.update_endpoints") as _update_endpoints,
Expand Down
Loading