Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DPE-2778] Shard removal tests #283

Merged
merged 22 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:

build:
name: Build charms
uses: canonical/data-platform-workflows/.github/workflows/build_charms_with_cache.yaml@v2
uses: canonical/data-platform-workflows/.github/workflows/build_charms_with_cache.yaml@v5.1.2

integration-test:
strategy:
Expand Down
18 changes: 14 additions & 4 deletions lib/charms/mongodb/v1/mongos.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,14 @@ def remove_shard(self, shard_name: str) -> None:
)
self._move_primary(databases_using_shard_as_primary, old_primary=shard_name)

# MongoDB docs says to re-run removeShard after running movePrimary
logger.info("removing shard: %s, after moving primary", shard_name)
removal_info = self.client.admin.command("removeShard", shard_name)
self._log_removal_info(removal_info, shard_name)
# MongoDB docs says to re-run removeShard after running movePrimary
logger.info("removing shard: %s, after moving primary", shard_name)
removal_info = self.client.admin.command("removeShard", shard_name)
self._log_removal_info(removal_info, shard_name)

if shard_name in self.get_shard_members():
logger.info("Shard %s is still present in sharded cluster.", shard_name)
raise NotDrainedError()

def _is_shard_draining(self, shard_name: str) -> bool:
"""Reports if a given shard is currently in the draining state.
Expand Down Expand Up @@ -366,6 +370,12 @@ def is_shard_aware(self, shard_name: str) -> bool:

def _retrieve_remaining_chunks(self, removal_info) -> int:
"""Parses the remaining chunks to remove from removeShard command."""
# when chunks have finished draining, remaining chunks is still in the removal info, but
# marked as 0. If "remaining" is not present, in removal_info then the shard is not yet
# draining
if "remaining" not in removal_info:
raise NotDrainedError()

return removal_info["remaining"]["chunks"] if "remaining" in removal_info else 0

def _move_primary(self, databases_to_move: List[str], old_primary: str) -> None:
Expand Down
26 changes: 24 additions & 2 deletions lib/charms/mongodb/v1/shards_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ def remove_shards(self, departed_shard_id):

raises: PyMongoError, NotReadyError
"""
retry_removal = False
with MongosConnection(self.charm.mongos_config) as mongo:
cluster_shards = mongo.get_shard_members()
relation_shards = self._get_shards_from_relations(departed_shard_id)
Expand All @@ -252,11 +253,19 @@ def remove_shards(self, departed_shard_id):
self.charm.unit.status = MaintenanceStatus(f"Draining shard {shard}")
logger.info("Attempting to removing shard: %s", shard)
mongo.remove_shard(shard)
except NotReadyError:
logger.info("Unable to remove shard: %s another shard is draining", shard)
# to guarantee that shard that the currently draining shard, gets re-processed,
# do not raise immediately, instead at the end of removal processing.
retry_removal = True
MiaAltieri marked this conversation as resolved.
Show resolved Hide resolved
except ShardNotInClusterError:
logger.info(
"Shard to remove is not in sharded cluster. It has been successfully removed."
)

if retry_removal:
raise ShardNotInClusterError

def update_credentials(self, key: str, value: str) -> None:
"""Sends new credentials, for a key value pair across all shards."""
for relation in self.charm.model.relations[self.relation_name]:
Expand Down Expand Up @@ -364,6 +373,11 @@ def _on_relation_changed(self, event):

# shards rely on the config server for secrets
relation_data = event.relation.data[event.app]
if not relation_data.get(KEYFILE_KEY):
event.defer()
self.charm.unit.status = WaitingStatus("Waiting for secrets from config-server")
return

self.update_keyfile(key_file_contents=relation_data.get(KEYFILE_KEY))

# restart on high loaded databases can be very slow (e.g. up to 10-20 minutes).
Expand All @@ -378,6 +392,10 @@ def _on_relation_changed(self, event):
return

# TODO Future work, see if needed to check for all units restarted / primary elected
if not relation_data.get(OPERATOR_PASSWORD_KEY):
event.defer()
self.charm.unit.status = WaitingStatus("Waiting for secrets from config-server")
return

try:
self.update_operator_password(new_password=relation_data.get(OPERATOR_PASSWORD_KEY))
Expand Down Expand Up @@ -448,12 +466,13 @@ def wait_for_draining(self, mongos_hosts: List[str]):
while not drained:
try:
# no need to continuously check and abuse resources while shard is draining
time.sleep(10)
time.sleep(60)
drained = self.drained(mongos_hosts, self.charm.app.name)
self.charm.unit.status = MaintenanceStatus("Draining shard from cluster")
draining_status = (
"Shard is still draining" if not drained else "Shard is fully drained."
)
self.charm.unit.status = MaintenanceStatus("Draining shard from cluster")
logger.debug(draining_status)
except PyMongoError as e:
logger.error("Error occurred while draining shard: %s", e)
Expand Down Expand Up @@ -636,7 +655,10 @@ def _is_mongos_reachable(self) -> bool:

def _is_added_to_cluster(self) -> bool:
"""Returns True if the shard has been added to the cluster."""
return json.loads(self.charm.app_peer_data.get("added_to_cluster", "False"))
if "added_to_cluster" not in self.charm.app_peer_data:
return False

return json.loads(self.charm.app_peer_data.get("added_to_cluster"))

def _is_shard_aware(self) -> bool:
"""Returns True if shard is in cluster and shard aware."""
Expand Down
36 changes: 36 additions & 0 deletions tests/integration/sharding_tests/helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
from typing import List, Optional
from urllib.parse import quote_plus

from pymongo import MongoClient
Expand Down Expand Up @@ -43,3 +44,38 @@ def verify_data_mongodb(client, db_name, coll_name, key, value) -> bool:
test_collection = db[coll_name]
query = test_collection.find({}, {key: 1})
return query[0][key] == value


def get_cluster_shards(mongos_client) -> set:
"""Returns a set of the shard members."""
shard_list = mongos_client.admin.command("listShards")
curr_members = [member["host"].split("/")[0] for member in shard_list["shards"]]
return set(curr_members)


def get_databases_for_shard(mongos_client, shard_name) -> Optional[List[str]]:
"""Returns the databases hosted on the given shard."""
config_db = mongos_client["config"]
if "databases" not in config_db.list_collection_names():
return None

databases_collection = config_db["databases"]

if databases_collection is None:
return

return databases_collection.distinct("_id", {"primary": shard_name})


def has_correct_shards(mongos_client, expected_shards: List[str]) -> bool:
"""Returns true if the cluster config has the expected shards."""
shard_names = get_cluster_shards(mongos_client)
return shard_names == set(expected_shards)


def shard_has_databases(
mongos_client, shard_name: str, expected_databases_on_shard: List[str]
) -> bool:
"""Returns true if the provided shard is a primary for the provided databases."""
databases_on_shard = get_databases_for_shard(mongos_client, shard_name=shard_name)
return set(databases_on_shard) == set(expected_databases_on_shard)
Loading