Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f8ecbf8
Add async replication implementation
marceloneppel Apr 17, 2024
d82230f
Add async replication integration tests
marceloneppel Apr 17, 2024
9a8e4e9
Add test for scaling
marceloneppel Apr 22, 2024
455a82c
Speedup tests
marceloneppel Apr 22, 2024
6a3009e
Backup standby pgdata folder
marceloneppel Apr 22, 2024
0679057
Merge branch 'dpe-2897-async-replication' into dpe-2900-async-replica…
marceloneppel Apr 22, 2024
79452cf
Improve comments and logs
marceloneppel Apr 23, 2024
8c9b579
Merge branch 'dpe-2897-async-replication' into dpe-2900-async-replica…
marceloneppel Apr 23, 2024
a53fde3
Remove unused constant
marceloneppel Apr 23, 2024
4f0fb14
Merge branch 'dpe-2897-async-replication' into dpe-2900-async-replica…
marceloneppel Apr 23, 2024
c0e8259
Remove warning log call and add optional type hint
marceloneppel Apr 29, 2024
b64c77f
Merge branch 'dpe-2897-async-replication' into dpe-2900-async-replica…
marceloneppel Apr 29, 2024
55a4851
Fix juju3 markers
marceloneppel Apr 29, 2024
3fea335
Merge remote-tracking branch 'origin/main' into dpe-2897-async-replic…
marceloneppel Apr 29, 2024
c9dec2d
Merge branch 'dpe-2897-async-replication' into dpe-2900-async-replica…
marceloneppel Apr 29, 2024
4188c21
Merge remote-tracking branch 'origin/main' into dpe-2897-async-replic…
marceloneppel May 2, 2024
9248af6
Merge branch 'dpe-2897-async-replication' into dpe-2900-async-replica…
marceloneppel May 2, 2024
d6707fb
Revert poetry.lock
marceloneppel May 2, 2024
1a5c951
Merge branch 'dpe-2897-async-replication' into dpe-2900-async-replica…
marceloneppel May 2, 2024
db3042f
Merge remote-tracking branch 'origin/main' into dpe-2900-async-replic…
marceloneppel May 3, 2024
eadbbba
Add relation name to secret label
marceloneppel May 3, 2024
8fdc971
Merge remote-tracking branch 'origin/main' into dpe-2900-async-replic…
marceloneppel May 8, 2024
3508a43
Handle pebble socket not accessible while upgrading
marceloneppel May 24, 2024
ea4f8d1
Merge remote-tracking branch 'origin/main' into dpe-2900-async-replic…
marceloneppel May 24, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,12 @@ def _on_postgresql_pebble_ready(self, event: WorkloadEvent) -> None:
# config-changed hook.
# Get the postgresql container so we can configure/manipulate it.
container = event.workload
if not container.can_connect():
logger.debug(
"Defer on_postgresql_pebble_ready: Waiting for container to become available"
)
event.defer()
return

# Create the PostgreSQL data directory. This is needed on cloud environments
# where the volume is mounted with more restrictive permissions.
Expand Down
112 changes: 86 additions & 26 deletions tests/integration/ha_tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import kubernetes as kubernetes
import psycopg2
import requests
from juju.model import Model
from kubernetes import config
from kubernetes.client.api import core_v1_api
from kubernetes.stream import stream
Expand Down Expand Up @@ -184,10 +185,10 @@ async def is_member_isolated(
return True


async def check_writes(ops_test) -> int:
async def check_writes(ops_test, extra_model: Model = None) -> int:
"""Gets the total writes from the test charm and compares to the writes from db."""
total_expected_writes = await stop_continuous_writes(ops_test)
actual_writes, max_number_written = await count_writes(ops_test)
actual_writes, max_number_written = await count_writes(ops_test, extra_model=extra_model)
for member, count in actual_writes.items():
assert (
count == max_number_written[member]
Expand All @@ -196,13 +197,17 @@ async def check_writes(ops_test) -> int:
return total_expected_writes


async def are_writes_increasing(ops_test, down_unit: str = None) -> None:
async def are_writes_increasing(
ops_test, down_unit: str = None, extra_model: Model = None
) -> None:
"""Verify new writes are continuing by counting the number of writes."""
writes, _ = await count_writes(ops_test, down_unit=down_unit)
writes, _ = await count_writes(ops_test, down_unit=down_unit, extra_model=extra_model)
for member, count in writes.items():
for attempt in Retrying(stop=stop_after_delay(60 * 3), wait=wait_fixed(3), reraise=True):
with attempt:
more_writes, _ = await count_writes(ops_test, down_unit=down_unit)
more_writes, _ = await count_writes(
ops_test, down_unit=down_unit, extra_model=extra_model
)
assert (
more_writes[member] > count
), f"{member}: writes not continuing to DB (current writes: {more_writes[member]} - previous writes: {count})"
Expand Down Expand Up @@ -265,28 +270,34 @@ def copy_file_into_pod(


async def count_writes(
ops_test: OpsTest, down_unit: str = None
ops_test: OpsTest, down_unit: str = None, extra_model: Model = None
) -> Tuple[Dict[str, int], Dict[str, int]]:
"""Count the number of writes in the database."""
app = await app_name(ops_test)
password = await get_password(ops_test, database_app_name=app, down_unit=down_unit)
status = await ops_test.model.get_status()
for unit_name, unit in status["applications"][app]["units"].items():
if unit_name != down_unit:
cluster = get_patroni_cluster(unit["address"])
break
members = []
for model in [ops_test.model, extra_model]:
if model is None:
continue
status = await model.get_status()
for unit_name, unit in status["applications"][app]["units"].items():
if unit_name != down_unit:
members_data = get_patroni_cluster(unit["address"])["members"]
for index, member_data in enumerate(members_data):
members_data[index]["model"] = model.info.name
members.extend(members_data)
break

count = {}
maximum = {}
for member in cluster["members"]:
for member in members:
if member["role"] != "replica" and member["host"].split(".")[0] != (
down_unit or ""
).replace("/", "-"):
host = member["host"]

# Translate the service hostname to an IP address.
model = ops_test.model.info
client = Client(namespace=model.name)
client = Client(namespace=member["model"])
service = client.get(Pod, name=host.split(".")[0])
ip = service.status.podIP

Expand All @@ -295,12 +306,23 @@ async def count_writes(
f" host='{ip}' password='{password}' connect_timeout=10"
)

with psycopg2.connect(connection_string) as connection, connection.cursor() as cursor:
cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;")
results = cursor.fetchone()
count[member["name"]] = results[0]
maximum[member["name"]] = results[1]
connection.close()
member_name = f'{member["model"]}.{member["name"]}'
connection = None
try:
with psycopg2.connect(
connection_string
) as connection, connection.cursor() as cursor:
cursor.execute("SELECT COUNT(number), MAX(number) FROM continuous_writes;")
results = cursor.fetchone()
count[member_name] = results[0]
maximum[member_name] = results[1]
except psycopg2.Error:
# Error raised when the connection is not possible.
count[member_name] = -1
maximum[member_name] = -1
finally:
if connection is not None:
connection.close()
return count, maximum


Expand Down Expand Up @@ -415,6 +437,42 @@ async def get_postgresql_parameter(ops_test: OpsTest, parameter_name: str) -> Op
return parameter_value


async def get_standby_leader(model: Model, application_name: str) -> str:
"""Get the standby leader name.

Args:
model: the model instance.
application_name: the name of the application to get the value for.

Returns:
the name of the standby leader.
"""
status = await model.get_status()
first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"]
cluster = get_patroni_cluster(first_unit_ip)
for member in cluster["members"]:
if member["role"] == "standby_leader":
return member["name"]


async def get_sync_standby(model: Model, application_name: str) -> str:
"""Get the sync_standby name.

Args:
model: the model instance.
application_name: the name of the application to get the value for.

Returns:
the name of the sync standby.
"""
status = await model.get_status()
first_unit_ip = list(status["applications"][application_name]["units"].values())[0]["address"]
cluster = get_patroni_cluster(first_unit_ip)
for member in cluster["members"]:
if member["role"] == "sync_standby":
return member["name"]


@retry(stop=stop_after_attempt(8), wait=wait_fixed(15), reraise=True)
async def is_connection_possible(ops_test: OpsTest, unit_name: str) -> bool:
"""Test a connection to a PostgreSQL server."""
Expand Down Expand Up @@ -720,31 +778,33 @@ async def send_signal_to_process(
)


async def start_continuous_writes(ops_test: OpsTest, app: str) -> None:
async def start_continuous_writes(ops_test: OpsTest, app: str, model: Model = None) -> None:
"""Start continuous writes to PostgreSQL."""
# Start the process by relating the application to the database or
# by calling the action if the relation already exists.
if model is None:
model = ops_test.model
relations = [
relation
for relation in ops_test.model.applications[app].relations
for relation in model.applications[app].relations
if not relation.is_peer
and f"{relation.requires.application_name}:{relation.requires.name}"
== f"{APPLICATION_NAME}:first-database"
]
if not relations:
await ops_test.model.relate(app, f"{APPLICATION_NAME}:first-database")
await ops_test.model.wait_for_idle(status="active", timeout=1000)
await model.relate(app, f"{APPLICATION_NAME}:first-database")
await model.wait_for_idle(status="active", timeout=1000)
else:
action = (
await ops_test.model.applications[APPLICATION_NAME]
await model.applications[APPLICATION_NAME]
.units[0]
.run_action("start-continuous-writes")
)
await action.wait()
for attempt in Retrying(stop=stop_after_delay(60 * 5), wait=wait_fixed(3), reraise=True):
with attempt:
action = (
await ops_test.model.applications[APPLICATION_NAME]
await model.applications[APPLICATION_NAME]
.units[0]
.run_action("start-continuous-writes")
)
Expand Down
Loading