diff --git a/lib/charms/loki_k8s/v1/loki_push_api.py b/lib/charms/loki_k8s/v1/loki_push_api.py index 33610fb822..961fb7a242 100644 --- a/lib/charms/loki_k8s/v1/loki_push_api.py +++ b/lib/charms/loki_k8s/v1/loki_push_api.py @@ -544,7 +544,7 @@ def __init__(self, ...): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 19 +LIBPATCH = 21 PYDEPS = ["cosl"] @@ -1601,28 +1601,46 @@ def loki_endpoints(self) -> List[dict]: """Fetch Loki Push API endpoints sent from LokiPushApiProvider through relation data. Returns: - A list of dictionaries with Loki Push API endpoints, for instance: + A list of unique dictionaries with Loki Push API endpoints, for instance: [ {"url": "http://loki1:3100/loki/api/v1/push"}, {"url": "http://loki2:3100/loki/api/v1/push"}, ] """ - endpoints = [] # type: list + endpoints = [] + seen_urls = set() for relation in self._charm.model.relations[self._relation_name]: for unit in relation.units: if unit.app == self._charm.app: - # This is a peer unit continue - endpoint = relation.data[unit].get("endpoint") - if endpoint: - deserialized_endpoint = json.loads(endpoint) - endpoints.append(deserialized_endpoint) + if not (endpoint := relation.data[unit].get("endpoint")): + continue + + deserialized_endpoint = json.loads(endpoint) + url = deserialized_endpoint.get("url") + + # Deduplicate by URL. + # With loki-k8s we have ingress-per-unit, so in that case + # we do want to collect the URLs of all the units. + # With loki-coordinator-k8s, even when the coordinator + # is scaled, we want to advertise only one URL. + # Without deduplication, we'd end up with the same + # tls config section in the promtail config file, in which + # case promtail immediately exits with the following error: + # [promtail] level=error ts= msg="error creating promtail" error="failed to create client manager: duplicate client configs are not allowed, found duplicate for name: " + + if not url or url in seen_urls: + continue + + seen_urls.add(url) + endpoints.append(deserialized_endpoint) return endpoints + class LokiPushApiConsumer(ConsumerBase): """Loki Consumer class.""" diff --git a/lib/charms/prometheus_k8s/v0/prometheus_scrape.py b/lib/charms/prometheus_k8s/v0/prometheus_scrape.py index d01d3ec218..18b3e1857c 100644 --- a/lib/charms/prometheus_k8s/v0/prometheus_scrape.py +++ b/lib/charms/prometheus_k8s/v0/prometheus_scrape.py @@ -341,7 +341,7 @@ def _on_scrape_targets_changed(self, event): import yaml from cosl import JujuTopology from cosl.rules import AlertRules, generic_alert_groups -from ops.charm import CharmBase, RelationJoinedEvent, RelationRole +from ops.charm import CharmBase, RelationRole from ops.framework import ( BoundEvent, EventBase, @@ -350,7 +350,6 @@ def _on_scrape_targets_changed(self, event): ObjectEvents, StoredDict, StoredList, - StoredState, ) from ops.model import Relation @@ -362,7 +361,7 @@ def _on_scrape_targets_changed(self, event): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 54 +LIBPATCH = 56 # Version 0.0.53 needed for cosl.rules.generic_alert_groups PYDEPS = ["cosl>=0.0.53"] @@ -797,43 +796,6 @@ def __init__( super().__init__(self.message) -def _is_official_alert_rule_format(rules_dict: dict) -> bool: - """Are alert rules in the upstream format as supported by Prometheus. - - Alert rules in dictionary format are in "official" form if they - contain a "groups" key, since this implies they contain a list of - alert rule groups. - - Args: - rules_dict: a set of alert rules in Python dictionary format - - Returns: - True if alert rules are in official Prometheus file format. - """ - return "groups" in rules_dict - - -def _is_single_alert_rule_format(rules_dict: dict) -> bool: - """Are alert rules in single rule format. - - The Prometheus charm library supports reading of alert rules in a - custom format that consists of a single alert rule per file. This - does not conform to the official Prometheus alert rule file format - which requires that each alert rules file consists of a list of - alert rule groups and each group consists of a list of alert - rules. - - Alert rules in dictionary form are considered to be in single rule - format if in the least it contains two keys corresponding to the - alert rule name and alert expression. - - Returns: - True if alert rule is in single rule file format. - """ - # one alert rule per file - return set(rules_dict) >= {"alert", "expr"} - - class TargetsChangedEvent(EventBase): """Event emitted when Prometheus scrape targets change.""" @@ -1271,15 +1233,6 @@ def _dedupe_job_names(jobs: List[dict]): return deduped_jobs -def _dedupe_list(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """Deduplicate items in the list via object identity.""" - unique_items = [] - for item in items: - if item not in unique_items: - unique_items.append(item) - return unique_items - - def _resolve_dir_against_charm_path(charm: CharmBase, *path_elements: str) -> str: """Resolve the provided path items against the directory of the main file. @@ -1711,619 +1664,6 @@ def _update_relation_data(self, _): sort_keys=True, # sort, to prevent unnecessary relation_changed events ) - -class MetricsEndpointAggregator(Object): - """Aggregate metrics from multiple scrape targets. - - `MetricsEndpointAggregator` collects scrape target information from one - or more related charms and forwards this to a `MetricsEndpointConsumer` - charm, which may be in a different Juju model. However, it is - essential that `MetricsEndpointAggregator` itself resides in the same - model as its scrape targets, as this is currently the only way to - ensure in Juju that the `MetricsEndpointAggregator` will be able to - determine the model name and uuid of the scrape targets. - - `MetricsEndpointAggregator` should be used in place of - `MetricsEndpointProvider` in the following two use cases: - - 1. Integrating one or more scrape targets that do not support the - `prometheus_scrape` interface. - - 2. Integrating one or more scrape targets through cross model - relations. Although the [Scrape Config Operator](https://charmhub.io/cos-configuration-k8s) - may also be used for the purpose of supporting cross model - relations. - - Using `MetricsEndpointAggregator` to build a Prometheus charm client - only requires instantiating it. Instantiating - `MetricsEndpointAggregator` is similar to `MetricsEndpointProvider` except - that it requires specifying the names of three relations: the - relation with scrape targets, the relation for alert rules, and - that with the Prometheus charms. For example - - ```python - self._aggregator = MetricsEndpointAggregator( - self, - { - "prometheus": "monitoring", - "scrape_target": "prometheus-target", - "alert_rules": "prometheus-rules" - } - ) - ``` - - `MetricsEndpointAggregator` assumes that each unit of a scrape target - sets in its unit-level relation data two entries with keys - "hostname" and "port". If it is required to integrate with charms - that do not honor these assumptions, it is always possible to - derive from `MetricsEndpointAggregator` overriding the `_get_targets()` - method, which is responsible for aggregating the unit name, host - address ("hostname") and port of the scrape target. - `MetricsEndpointAggregator` also assumes that each unit of a - scrape target sets in its unit-level relation data a key named - "groups". The value of this key is expected to be the string - representation of list of Prometheus Alert rules in YAML format. - An example of a single such alert rule is - - ```yaml - - alert: HighRequestLatency - expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5 - for: 10m - labels: - severity: page - annotations: - summary: High request latency - ``` - - Once again if it is required to integrate with charms that do not - honour these assumptions about alert rules then an object derived - from `MetricsEndpointAggregator` may be used by overriding the - `_get_alert_rules()` method. - - `MetricsEndpointAggregator` ensures that Prometheus scrape job - specifications and alert rules are annotated with Juju topology - information, just like `MetricsEndpointProvider` and - `MetricsEndpointConsumer` do. - - By default, `MetricsEndpointAggregator` ensures that Prometheus - "instance" labels refer to Juju topology. This ensures that - instance labels are stable over unit recreation. While it is not - advisable to change this option, if required it can be done by - setting the "relabel_instance" keyword argument to `False` when - constructing an aggregator object. - """ - - _stored = StoredState() - - def __init__( - self, - charm, - relation_names: Optional[dict] = None, - relabel_instance=True, - resolve_addresses=False, - path_to_own_alert_rules: Optional[str] = None, - *, - forward_alert_rules: bool = True, - ): - """Construct a `MetricsEndpointAggregator`. - - Args: - charm: a `CharmBase` object that manages this - `MetricsEndpointAggregator` object. Typically, this is - `self` in the instantiating class. - relation_names: a dictionary with three keys. The value - of the "scrape_target" and "alert_rules" keys are - the relation names over which scrape job and alert rule - information is gathered by this `MetricsEndpointAggregator`. - And the value of the "prometheus" key is the name of - the relation with a `MetricsEndpointConsumer` such as - the Prometheus charm. - relabel_instance: A boolean flag indicating if Prometheus - scrape job "instance" labels must refer to Juju Topology. - resolve_addresses: A boolean flag indiccating if the aggregator - should attempt to perform DNS lookups of targets and append - a `dns_name` label - path_to_own_alert_rules: Optionally supply a path for alert rule files - forward_alert_rules: a boolean flag to toggle forwarding of charmed alert rules - """ - self._charm = charm - - relation_names = relation_names or {} - - self._prometheus_relation = relation_names.get( - "prometheus", "downstream-prometheus-scrape" - ) - self._target_relation = relation_names.get("scrape_target", "prometheus-target") - self._alert_rules_relation = relation_names.get("alert_rules", "prometheus-rules") - - super().__init__(charm, self._prometheus_relation) - self.topology = JujuTopology.from_charm(charm) - - self._stored.set_default(jobs=[], alert_rules=[]) - - self._relabel_instance = relabel_instance - self._resolve_addresses = resolve_addresses - - self._forward_alert_rules = forward_alert_rules - - # manage Prometheus charm relation events - prometheus_events = self._charm.on[self._prometheus_relation] - self.framework.observe(prometheus_events.relation_joined, self._set_prometheus_data) - - self.path_to_own_alert_rules = path_to_own_alert_rules - - # manage list of Prometheus scrape jobs from related scrape targets - target_events = self._charm.on[self._target_relation] - self.framework.observe(target_events.relation_changed, self._on_prometheus_targets_changed) - self.framework.observe( - target_events.relation_departed, self._on_prometheus_targets_departed - ) - - # manage alert rules for Prometheus from related scrape targets - alert_rule_events = self._charm.on[self._alert_rules_relation] - self.framework.observe(alert_rule_events.relation_changed, self._on_alert_rules_changed) - self.framework.observe(alert_rule_events.relation_departed, self._on_alert_rules_departed) - - def _set_prometheus_data(self, event: Optional[RelationJoinedEvent] = None): - """Ensure every new Prometheus instances is updated. - - Any time a new Prometheus unit joins the relation with - `MetricsEndpointAggregator`, that Prometheus unit is provided - with the complete set of existing scrape jobs and alert rules. - """ - if not self._charm.unit.is_leader(): - return - - # Gather the scrape jobs - jobs = [] + _type_convert_stored( - self._stored.jobs # pyright: ignore - ) # list of scrape jobs, one per relation - for relation in self.model.relations[self._target_relation]: - targets = self._get_targets(relation) - if targets and relation.app: - jobs.append(self._static_scrape_job(targets, relation.app.name)) - - # Gather the alert rules - groups = [] + _type_convert_stored( - self._stored.alert_rules # pyright: ignore - ) # list of alert rule groups - for relation in self.model.relations[self._alert_rules_relation]: - unit_rules = self._get_alert_rules(relation) - if unit_rules and relation.app: - appname = relation.app.name - rules = self._label_alert_rules(unit_rules, appname) - group = {"name": self.group_name(appname), "rules": rules} - groups.append(group) - alert_rules = AlertRules(query_type="promql", topology=self.topology) - # Add alert rules from file - if self.path_to_own_alert_rules: - alert_rules.add_path(self.path_to_own_alert_rules, recursive=True) - # Add generic alert rules - alert_rules.add( - copy.deepcopy(generic_alert_groups.application_rules), - group_name_prefix=self.topology.identifier, - ) - groups.extend(alert_rules.as_dict()["groups"]) - - groups = _dedupe_list(groups) - jobs = _dedupe_list(jobs) - - # Set scrape jobs and alert rules in relation data - relations = [event.relation] if event else self.model.relations[self._prometheus_relation] - for rel in relations: - rel.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs) # type: ignore - rel.data[self._charm.app]["alert_rules"] = json.dumps( # type: ignore - {"groups": groups if self._forward_alert_rules else []} - ) - - def _on_prometheus_targets_changed(self, event): - """Update scrape jobs in response to scrape target changes. - - When there is any change in relation data with any scrape - target, the Prometheus scrape job, for that specific target is - updated. - """ - targets = self._get_targets(event.relation) - if not targets: - return - - # new scrape job for the relation that has changed - self.set_target_job_data(targets, event.relation.app.name) - - def set_target_job_data(self, targets: dict, app_name: str, **kwargs) -> None: - """Update scrape jobs in response to scrape target changes. - - When there is any change in relation data with any scrape - target, the Prometheus scrape job, for that specific target is - updated. Additionally, if this method is called manually, do the - same. - - Args: - targets: a `dict` containing target information - app_name: a `str` identifying the application - kwargs: a `dict` of the extra arguments passed to the function - """ - if not self._charm.unit.is_leader(): - return - - # new scrape job for the relation that has changed - updated_job = self._static_scrape_job(targets, app_name, **kwargs) - - for relation in self.model.relations[self._prometheus_relation]: - jobs = json.loads(relation.data[self._charm.app].get("scrape_jobs", "[]")) - # list of scrape jobs that have not changed - jobs = [job for job in jobs if updated_job["job_name"] != job["job_name"]] - jobs.append(updated_job) - relation.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs) - - if not _type_convert_stored(self._stored.jobs) == jobs: # pyright: ignore - self._stored.jobs = jobs - - def _on_prometheus_targets_departed(self, event): - """Remove scrape jobs when a target departs. - - Any time a scrape target departs, any Prometheus scrape job - associated with that specific scrape target is removed. - """ - job_name = self._job_name(event.relation.app.name) - unit_name = event.unit.name - self.remove_prometheus_jobs(job_name, unit_name) - - def remove_prometheus_jobs(self, job_name: str, unit_name: Optional[str] = ""): - """Given a job name and unit name, remove scrape jobs associated. - - The `unit_name` parameter is used for automatic, relation data bag-based - generation, where the unit name in labels can be used to ensure that jobs with - similar names (which are generated via the app name when scanning relation data - bags) are not accidentally removed, as their unit name labels will differ. - For NRPE, the job name is calculated from an ID sent via the NRPE relation, and is - sufficient to uniquely identify the target. - """ - if not self._charm.unit.is_leader(): - return - - for relation in self.model.relations[self._prometheus_relation]: - jobs = json.loads(relation.data[self._charm.app].get("scrape_jobs", "[]")) - if not jobs: - continue - - changed_job = [j for j in jobs if j.get("job_name") == job_name] - if not changed_job: - continue - changed_job = changed_job[0] - - # list of scrape jobs that have not changed - jobs = [job for job in jobs if job.get("job_name") != job_name] - - # list of scrape jobs for units of the same application that still exist - configs_kept = [ - config - for config in changed_job["static_configs"] # type: ignore - if config.get("labels", {}).get("juju_unit") != unit_name - ] - - if configs_kept: - changed_job["static_configs"] = configs_kept # type: ignore - jobs.append(changed_job) - - relation.data[self._charm.app]["scrape_jobs"] = json.dumps(jobs) - - if not _type_convert_stored(self._stored.jobs) == jobs: # pyright: ignore - self._stored.jobs = jobs - - def _job_name(self, appname) -> str: - """Construct a scrape job name. - - Each relation has its own unique scrape job name. All units in - the relation are scraped as part of the same scrape job. - - Args: - appname: string name of a related application. - - Returns: - a string Prometheus scrape job name for the application. - """ - return "juju_{}_{}_{}_prometheus_scrape".format( - self.model.name, self.model.uuid[:7], appname - ) - - def _get_targets(self, relation) -> dict: - """Fetch scrape targets for a relation. - - Scrape target information is returned for each unit in the - relation. This information contains the unit name, network - hostname (or address) for that unit, and port on which a - metrics endpoint is exposed in that unit. - - Args: - relation: an `ops.model.Relation` object for which scrape - targets are required. - - Returns: - a dictionary whose keys are names of the units in the - relation. There values associated with each key is itself - a dictionary of the form - ``` - {"hostname": hostname, "port": port} - ``` - """ - targets = {} - for unit in relation.units: - if not (unit_databag := relation.data.get(unit)): - continue - if not (hostname := unit_databag.get("hostname")): - continue - port = unit_databag.get("port", 80) - targets.update({unit.name: {"hostname": hostname, "port": port}}) - - return targets - - def _static_scrape_job(self, targets, application_name, **kwargs) -> dict: - """Construct a static scrape job for an application. - - Args: - targets: a dictionary providing hostname and port for all - scrape target. The keys of this dictionary are unit - names. Values corresponding to these keys are - themselves a dictionary with keys "hostname" and - "port". - application_name: a string name of the application for - which this static scrape job is being constructed. - kwargs: a `dict` of the extra arguments passed to the function - - Returns: - A dictionary corresponding to a Prometheus static scrape - job configuration for one application. The returned - dictionary may be transformed into YAML and appended to - the list of any existing list of Prometheus static configs. - """ - juju_model = self.model.name - juju_model_uuid = self.model.uuid - - job = { - "job_name": self._job_name(application_name), - "static_configs": [ - { - "targets": ["{}:{}".format(target["hostname"], target["port"])], - "labels": { - "juju_model": juju_model, - "juju_model_uuid": juju_model_uuid, - "juju_application": application_name, - "juju_unit": unit_name, - "host": target["hostname"], - # Expanding this will merge the dicts and replace the - # topology labels if any were present/found - **self._static_config_extra_labels(target), - }, - } - for unit_name, target in targets.items() - ], - "relabel_configs": self._relabel_configs + kwargs.get("relabel_configs", []), - } - job.update(kwargs.get("updates", {})) - - return job - - def _static_config_extra_labels(self, target: Dict[str, str]) -> Dict[str, str]: - """Build a list of extra static config parameters, if specified.""" - extra_info = {} - - if self._resolve_addresses: - try: - dns_name = socket.gethostbyaddr(target["hostname"])[0] - except OSError: - logger.debug("Could not perform DNS lookup for %s", target["hostname"]) - dns_name = target["hostname"] - extra_info["dns_name"] = dns_name - - return extra_info - - @property - def _relabel_configs(self) -> list: - """Create Juju topology relabeling configuration. - - Using Juju topology for instance labels ensures that these - labels are stable across unit recreation. - - Returns: - a list of Prometheus relabeling configurations. Each item in - this list is one relabel configuration. - """ - return ( - [ - { - "source_labels": [ - "juju_model", - "juju_model_uuid", - "juju_application", - "juju_unit", - ], - "separator": "_", - "target_label": "instance", - "regex": "(.*)", - } - ] - if self._relabel_instance - else [] - ) - - def _on_alert_rules_changed(self, event): - """Update alert rules in response to scrape target changes. - - When there is any change in alert rule relation data for any - scrape target, the list of alert rules for that specific - target is updated. - """ - unit_rules = self._get_alert_rules(event.relation) - if not unit_rules: - return - - app_name = event.relation.app.name - self.set_alert_rule_data(app_name, unit_rules) - - def set_alert_rule_data(self, name: str, unit_rules: dict, label_rules: bool = True) -> None: - """Consolidate incoming alert rules (from stored-state or event) with those from relation data. - - The unit rules should be a dict, which have additional Juju topology labels added. For - rules generated by the NRPE exporter, they are pre-labeled so lookups can be performed. - The unit rules are combined with the alert rules from relation data before being written - back to relation data and stored-state. - """ - if not self._charm.unit.is_leader(): - return - - if label_rules: - rules = self._label_alert_rules(unit_rules, name) - else: - rules = [unit_rules] - updated_group = {"name": self.group_name(name), "rules": rules} - - for relation in self.model.relations[self._prometheus_relation]: - alert_rules = json.loads(relation.data[self._charm.app].get("alert_rules", "{}")) - groups = alert_rules.get("groups", []) - # list of alert rule groups that have not changed - for group in groups: - if group["name"] == updated_group["name"]: - group["rules"] = [r for r in group["rules"] if r not in updated_group["rules"]] - group["rules"].extend(updated_group["rules"]) - - if updated_group["name"] not in [g["name"] for g in groups]: - groups.append(updated_group) - - groups = _dedupe_list(groups) - - relation.data[self._charm.app]["alert_rules"] = json.dumps( - {"groups": groups if self._forward_alert_rules else []} - ) - - if not _type_convert_stored(self._stored.alert_rules) == groups: # pyright: ignore - self._stored.alert_rules = groups - - def _on_alert_rules_departed(self, event): - """Remove alert rules for departed targets. - - Any time a scrape target departs any alert rules associated - with that specific scrape target is removed. - """ - group_name = self.group_name(event.relation.app.name) - unit_name = event.unit.name - self.remove_alert_rules(group_name, unit_name) - - def remove_alert_rules(self, group_name: str, unit_name: str) -> None: - """Remove an alert rule group from relation data.""" - if not self._charm.unit.is_leader(): - return - - for relation in self.model.relations[self._prometheus_relation]: - alert_rules = json.loads(relation.data[self._charm.app].get("alert_rules", "{}")) - if not alert_rules: - continue - - groups = alert_rules.get("groups", []) - if not groups: - continue - - changed_group = [group for group in groups if group["name"] == group_name] - if not changed_group: - continue - changed_group = changed_group[0] - - # list of alert rule groups that have not changed - groups = [group for group in groups if group["name"] != group_name] - - # list of alert rules not associated with departing unit - rules_kept = [ - rule - for rule in changed_group.get("rules") # type: ignore - if rule.get("labels").get("juju_unit") != unit_name - ] - - if rules_kept: - changed_group["rules"] = rules_kept # type: ignore - groups.append(changed_group) - - groups = _dedupe_list(groups) - - relation.data[self._charm.app]["alert_rules"] = json.dumps( - {"groups": groups if self._forward_alert_rules else []} - ) - - if not _type_convert_stored(self._stored.alert_rules) == groups: # pyright: ignore - self._stored.alert_rules = groups - - def _get_alert_rules(self, relation) -> dict: - """Fetch alert rules for a relation. - - Each unit of the related scrape target may have its own - associated alert rules. Alert rules for all units are returned - indexed by unit name. - - Args: - relation: an `ops.model.Relation` object for which alert - rules are required. - - Returns: - a dictionary whose keys are names of the units in the - relation. There values associated with each key is a list - of alert rules. Each rule is in dictionary format. The - structure "rule dictionary" corresponds to single - Prometheus alert rule. - """ - rules = {} - for unit in relation.units: - if not (unit_databag := relation.data.get(unit)): - continue - if not (unit_rules := yaml.safe_load(unit_databag.get("groups", ""))): - continue - - rules.update({unit.name: unit_rules}) - - return rules - - def group_name(self, unit_name: str) -> str: - """Construct name for an alert rule group. - - Each unit in a relation may define its own alert rules. All - rules, for all units in a relation are grouped together and - given a single alert rule group name. - - Args: - unit_name: string name of a related application. - - Returns: - a string Prometheus alert rules group name for the unit. - """ - unit_name = re.sub(r"/", "_", unit_name) - return "juju_{}_{}_{}_alert_rules".format(self.model.name, self.model.uuid[:7], unit_name) - - def _label_alert_rules(self, unit_rules, app_name: str) -> list: - """Apply juju topology labels to alert rules. - - Args: - unit_rules: a list of alert rules, where each rule is in - dictionary format. - app_name: a string name of the application to which the - alert rules belong. - - Returns: - a list of alert rules with Juju topology labels. - """ - labeled_rules = [] - for unit_name, rules in unit_rules.items(): - for rule in rules: - # the new JujuTopology removed this, so build it up by hand - matchers = { - "juju_{}".format(k): v - for k, v in JujuTopology(self.model.name, self.model.uuid, app_name, unit_name) - .as_dict(excluded_keys=["charm_name"]) - .items() - } - rule["labels"].update(matchers.items()) - labeled_rules.append(rule) - - return labeled_rules - - class CosTool: """Uses cos-tool to inject label matchers into alert rule expressions and validate rules.""" @@ -2384,7 +1724,7 @@ def validate_alert_rules(self, rules: dict) -> Tuple[bool, str]: self._exec(args) return True, "" except subprocess.CalledProcessError as e: - logger.debug("Validating the rules failed: %s", e.output) + logger.debug("Validating the rules failed: %s", e.output.decode("utf8")) return False, ", ".join( [ line diff --git a/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py b/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py index f58ee72e0f..254c681f8b 100644 --- a/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py +++ b/lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py @@ -266,7 +266,7 @@ def _remove_stale_otel_sdk_packages(): if name.startswith("opentelemetry_"): otel_distributions[name].append(distribution) - otel_logger.debug(f"Found {len(otel_distributions)} opentelemetry distributions") + otel_logger.debug("Found %d opentelemetry distributions", len(otel_distributions)) # If we have multiple distributions with the same name, remove any that have 0 associated files for name, distributions_ in otel_distributions.items(): @@ -274,12 +274,12 @@ def _remove_stale_otel_sdk_packages(): continue otel_logger.debug( - f"Package {name} has multiple ({len(distributions_)}) distributions." + "Package %s has multiple (%d) distributions.", name, len(distributions_) ) for distribution in distributions_: if not distribution.files: # Not None or empty list path = distribution._path # type: ignore - otel_logger.info(f"Removing empty distribution of {name} at {path}.") + otel_logger.info("Removing empty distribution of %s at %s.", name, path) shutil.rmtree(path) otel_logger.debug("Successfully applied _remove_stale_otel_sdk_packages patch. ") @@ -350,7 +350,7 @@ def _remove_stale_otel_sdk_packages(): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 10 +LIBPATCH = 11 PYDEPS = ["opentelemetry-exporter-otlp-proto-http==1.21.0"] @@ -430,7 +430,8 @@ def _prune(self, queue: Sequence[bytes]) -> Sequence[bytes]: if overflow > 0: n_dropped_spans += overflow logger.warning( - f"charm tracing buffer exceeds max history length ({self._max_event_history_length} events)" + "charm tracing buffer exceeds max history length (%d events)", + self._max_event_history_length, ) new_spans = deque(queue[-self._max_event_history_length :]) @@ -446,19 +447,21 @@ def _prune(self, queue: Sequence[bytes]) -> Sequence[bytes]: # only do this once if not logged_drop: logger.warning( - f"charm tracing buffer exceeds size limit ({self._max_buffer_size_mib}MiB)." + "charm tracing buffer exceeds size limit (%dMiB).", + self._max_buffer_size_mib, ) logged_drop = True if n_dropped_spans > 0: dev_logger.debug( - f"charm tracing buffer overflow: dropped {n_dropped_spans} older spans. " - f"Please increase the buffer limits, or ensure the spans can be flushed." + "charm tracing buffer overflow: dropped %d older spans. " + "Please increase the buffer limits, or ensure the spans can be flushed.", + n_dropped_spans, ) return new_spans def _save(self, spans: Sequence[ReadableSpan], replace: bool = False): - dev_logger.debug(f"saving {len(spans)} new spans to buffer") + dev_logger.debug("saving %d new spans to buffer", len(spans)) old = [] if replace else self.load() queue = old + [self._serialize(spans)] new_buffer = self._prune(queue) @@ -480,7 +483,7 @@ def _write(self, spans: Sequence[bytes]): # ensure the destination folder exists db_file_dir = self._db_file.parent if not db_file_dir.exists(): - dev_logger.info(f"creating buffer dir: {db_file_dir}") + dev_logger.info("creating buffer dir: %s", db_file_dir) db_file_dir.mkdir(parents=True) self._db_file.write_bytes(self._SPANSEP.join(spans)) @@ -496,7 +499,7 @@ def load(self) -> List[bytes]: try: spans = self._db_file.read_bytes().split(self._SPANSEP) except Exception: - logger.exception(f"error parsing {self._db_file}") + logger.exception("error parsing %s", self._db_file) return [] return spans @@ -504,7 +507,7 @@ def drop(self, n_spans: Optional[int] = None): """Drop some currently buffered spans from the cache file.""" current = self.load() if n_spans: - dev_logger.debug(f"dropping {n_spans} spans from buffer") + dev_logger.debug("dropping %d spans from buffer", n_spans) new = current[n_spans:] else: dev_logger.debug("emptying buffer") @@ -693,7 +696,7 @@ def _get_tracing_endpoint( ) dev_logger.debug( - f"Setting up span exporter to endpoint: {tracing_endpoint}/v1/traces" + "Setting up span exporter to endpoint: %s/v1/traces", tracing_endpoint ) return f"{tracing_endpoint}/v1/traces" @@ -711,13 +714,17 @@ def _get_server_cert( if server_cert is None: logger.warning( - f"{charm_type}.{server_cert_attr} is None; sending traces over INSECURE connection." + "%s.%s is None; sending traces over INSECURE connection.", + charm_type, + server_cert_attr, ) return if not isinstance(server_cert, (str, Path)): logger.warning( - f"{charm_type}.{server_cert_attr} has unexpected type {type(server_cert)}; " - f"sending traces over INSECURE connection." + "%s.%s has unexpected type %s; sending traces over INSECURE connection.", + charm_type, + server_cert_attr, + type(server_cert), ) return path = Path(server_cert) @@ -862,13 +869,13 @@ def wrap_init(self: CharmBase, framework: Framework, *args, **kwargs): # log a trace id, so we can pick it up from the logs (and jhack) to look it up in tempo. root_trace_id = hex(span.get_span_context().trace_id)[2:] # strip 0x prefix - logger.debug(f"Starting root trace with id={root_trace_id!r}.") + logger.debug("Starting root trace with id=%r.", root_trace_id) span_token = opentelemetry.context.attach(ctx) # type: ignore @contextmanager def wrap_event_context(event_name: str): - dev_logger.debug(f"entering event context: {event_name}") + dev_logger.debug("entering event context: %s", event_name) # when the framework enters an event context, we create a span. with _span("event: " + event_name) as event_context_span: if event_context_span: @@ -1059,7 +1066,7 @@ def _autoinstrument( Minimum 10MiB. :param buffer_path: path to buffer file to use for saving buffered spans. """ - dev_logger.debug(f"instrumenting {charm_type}") + dev_logger.debug("instrumenting %s", charm_type) _setup_root_span_initializer( charm_type, tracing_endpoint_attr, @@ -1083,12 +1090,12 @@ def trace_type(cls: _T) -> _T: It assumes that this class is only instantiated after a charm type decorated with `@trace_charm` has been instantiated. """ - dev_logger.debug(f"instrumenting {cls}") + dev_logger.debug("instrumenting %s", cls) for name, method in inspect.getmembers(cls, predicate=inspect.isfunction): - dev_logger.debug(f"discovered {method}") + dev_logger.debug("discovered %s", method) if method.__name__.startswith("__"): - dev_logger.debug(f"skipping {method} (dunder)") + dev_logger.debug("skipping %s (dunder)", method) continue # the span title in the general case should be: @@ -1134,7 +1141,7 @@ def trace_function(function: _F, name: Optional[str] = None) -> _F: def _trace_callable(callable: _F, qualifier: str, name: Optional[str] = None) -> _F: - dev_logger.debug(f"instrumenting {callable}") + dev_logger.debug("instrumenting %s", callable) # sig = inspect.signature(callable) @functools.wraps(callable) diff --git a/lib/charms/tempo_coordinator_k8s/v0/tracing.py b/lib/charms/tempo_coordinator_k8s/v0/tracing.py index 9bf7a89b2a..387ac6a477 100644 --- a/lib/charms/tempo_coordinator_k8s/v0/tracing.py +++ b/lib/charms/tempo_coordinator_k8s/v0/tracing.py @@ -111,7 +111,7 @@ def __init__(self, *args): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 10 +LIBPATCH = 11 PYDEPS = ["pydantic"] @@ -657,7 +657,7 @@ def _get_requested_protocols(relation: Relation): try: databag = TracingRequirerAppData.load(relation.data[app]) except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError): - logger.info(f"relation {relation} is not ready to talk tracing") + logger.info("relation %s is not ready to talk tracing", relation) raise NotReadyError() return databag.receivers @@ -705,8 +705,9 @@ def publish_receivers(self, receivers: Sequence[RawReceiver]): b"ERROR cannot read relation application settings: permission denied" ): logger.error( - f"encountered error {e} while attempting to update_relation_data." - f"The relation must be gone." + "encountered error %s while attempting to update_relation_data." + "The relation must be gone.", + e, ) continue raise @@ -800,8 +801,9 @@ def __init__( self.request_protocols(protocols) except ModelError as e: logger.error( - f"encountered error {e} while attempting to request_protocols." - f"The relation must be gone." + "encountered error %s while attempting to request_protocols." + "The relation must be gone.", + e, ) pass @@ -849,20 +851,20 @@ def is_ready(self, relation: Optional[Relation] = None): """Is this endpoint ready?""" relation = relation or self._relation if not relation: - logger.debug(f"no relation on {self._relation_name!r}: tracing not ready") + logger.debug("no relation on %r: tracing not ready", self._relation_name) return False if relation.data is None: - logger.error(f"relation data is None for {relation}") + logger.error("relation data is None for %s", relation) return False if not relation.app: - logger.error(f"{relation} event received but there is no relation.app") + logger.error("%s event received but there is no relation.app", relation) return False try: databag = dict(relation.data[relation.app]) TracingProviderAppData.load(databag) except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError): - logger.info(f"failed validating relation data for {relation}") + logger.info("failed validating relation data for %s", relation) return False return True @@ -902,12 +904,14 @@ def _get_endpoint( if not receivers: # it can happen if the charm requests tracing protocols, but the relay (such as grafana-agent) isn't yet # connected to the tracing backend. In this case, it's not an error the charm author can do anything about - logger.warning(f"no receiver found with protocol={protocol!r}.") + logger.warning("no receiver found with protocol=%r.", protocol) return if len(receivers) > 1: # if we have more than 1 receiver that matches, it shouldn't matter which receiver we'll be using. logger.warning( - f"too many receivers with protocol={protocol!r}; using first one. Found: {receivers}" + "too many receivers with protocol=%r; using first one. Found: %s", + protocol, + receivers, ) receiver = receivers[0] diff --git a/src/backups.py b/src/backups.py index cd310b5050..affd8a701d 100644 --- a/src/backups.py +++ b/src/backups.py @@ -10,6 +10,7 @@ import tempfile import time from datetime import datetime, timezone +from functools import cached_property from io import BytesIO from boto3.session import Session @@ -77,7 +78,7 @@ def __init__(self, charm, relation_name: str): self.framework.observe(self.charm.on.list_backups_action, self._on_list_backups_action) self.framework.observe(self.charm.on.restore_action, self._on_restore_action) - @property + @cached_property def stanza_name(self) -> str: """Stanza name, composed by model and cluster name.""" return f"{self.model.name}.{self.charm.cluster_name}" diff --git a/src/charm.py b/src/charm.py index e89ae10aa8..38c330cc6f 100755 --- a/src/charm.py +++ b/src/charm.py @@ -13,6 +13,7 @@ import sys import time from datetime import datetime +from functools import cached_property from hashlib import shake_128 from pathlib import Path from typing import Literal, get_args @@ -436,7 +437,7 @@ def is_unit_stopped(self) -> bool: """Returns whether the unit is stopped.""" return "stopped" in self.unit_peer_data - @property + @cached_property def postgresql(self) -> PostgreSQL: """Returns an instance of the object used to interact with the database.""" return PostgreSQL( @@ -448,17 +449,17 @@ def postgresql(self) -> PostgreSQL: system_users=SYSTEM_USERS, ) - @property + @cached_property def endpoint(self) -> str: """Returns the endpoint of this instance's pod.""" return f"{self._unit.replace('/', '-')}.{self._build_service_name('endpoints')}" - @property + @cached_property def primary_endpoint(self) -> str: """Returns the endpoint of the primary instance's service.""" return self._build_service_name("primary") - @property + @cached_property def replicas_endpoint(self) -> str: """Returns the endpoint of the replicas instances' service.""" return self._build_service_name("replicas") @@ -637,23 +638,6 @@ def _on_peer_relation_changed(self, event: HookEvent) -> None: # noqa: C901 event.defer() return - # Restart the workload if it's stuck on the starting state after a timeline divergence - # due to a backup that was restored. - if ( - not self.is_primary - and not self.is_standby_leader - and ( - self._patroni.member_replication_lag == "unknown" - or int(self._patroni.member_replication_lag) > 1000 - ) - ): - logger.warning("Workload failure detected. Reinitialising unit.") - self.unit.status = MaintenanceStatus("reinitialising replica") - self._patroni.reinitialize_postgresql() - logger.debug("Deferring on_peer_relation_changed: reinitialising replica") - event.defer() - return - try: self.postgresql_client_relation.update_read_only_endpoint() except ModelError as e: @@ -1549,9 +1533,6 @@ def _on_update_status(self, _) -> None: ) and not self._was_restore_successful(container, services[0]): return - if self._handle_processes_failures(): - return - # Update the sync-standby endpoint in the async replication data. self.async_replication.update_async_replication_data() @@ -1620,51 +1601,6 @@ def _was_restore_successful(self, container: Container, service: ServiceInfo) -> return True - def _handle_processes_failures(self) -> bool: - """Handle Patroni and PostgreSQL OS processes failures. - - Returns: - a bool indicating whether the charm performed any action. - """ - container = self.unit.get_container("postgresql") - - # Restart the Patroni process if it was killed (in that case, the PostgreSQL - # process is still running). This is needed until - # https://github.com/canonical/pebble/issues/149 is resolved. - if not self._patroni.member_started and self._patroni.is_database_running: - try: - container.restart(self.postgresql_service) - logger.info("restarted Patroni because it was not running") - except ChangeError: - logger.error("failed to restart Patroni after checking that it was not running") - return False - return True - - try: - is_primary = self.is_primary - is_standby_leader = self.is_standby_leader - except RetryError: - return False - - if ( - not is_primary - and not is_standby_leader - and self._patroni.member_started - and not self._patroni.member_streaming - ): - try: - logger.warning("Degraded member detected: reinitialising unit") - self.unit.status = MaintenanceStatus("reinitialising replica") - self._patroni.reinitialize_postgresql() - except RetryError: - logger.error( - "failed to reinitialise replica after checking that it was not streaming from primary" - ) - return False - return True - - return False - @property def _patroni(self): """Returns an instance of the Patroni object.""" @@ -2042,12 +1978,13 @@ def _is_workload_running(self) -> bool: @property def _can_connect_to_postgresql(self) -> bool: try: - for attempt in Retrying(stop=stop_after_delay(30), wait=wait_fixed(3)): + for attempt in Retrying(stop=stop_after_delay(10), wait=wait_fixed(3)): with attempt: if not self.postgresql.get_postgresql_timezones(): + logger.debug("Cannot connect to database (CannotConnectError)") raise CannotConnectError except RetryError: - logger.debug("Cannot connect to database") + logger.debug("Cannot connect to database (RetryError)") return False return True @@ -2117,11 +2054,16 @@ def update_config(self, is_creating_backup: bool = False) -> bool: "wal_keep_size": self.config.durability_wal_keep_size, }) - self._handle_postgresql_restart_need() + self._handle_postgresql_restart_need( + self.unit_peer_data.get("config_hash") != self.generate_config_hash + ) self._restart_metrics_service() self._restart_ldap_sync_service() - self.unit_peer_data.update({"user_hash": self.generate_user_hash}) + self.unit_peer_data.update({ + "user_hash": self.generate_user_hash, + "config_hash": self.generate_config_hash, + }) if self.unit.is_leader(): self.app_peer_data.update({"user_hash": self.generate_user_hash}) return True @@ -2163,22 +2105,28 @@ def _validate_config_options(self) -> None: f"Value for {parameter} not one of the locales available in the system" ) - def _handle_postgresql_restart_need(self): + def _handle_postgresql_restart_need(self, config_changed: bool): """Handle PostgreSQL restart need based on the TLS configuration and configuration changes.""" restart_postgresql = self.is_tls_enabled != self.postgresql.is_tls_enabled() - self._patroni.reload_patroni_configuration() - # Wait for some more time than the Patroni's loop_wait default value (10 seconds), - # which tells how much time Patroni will wait before checking the configuration - # file again to reload it. try: - for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): - with attempt: - restart_postgresql = restart_postgresql or self.postgresql.is_restart_pending() - if not restart_postgresql: - raise Exception - except RetryError: - # Ignore the error, as it happens only to indicate that the configuration has not changed. - pass + self._patroni.reload_patroni_configuration() + except Exception as e: + logger.error(f"Reload patroni call failed! error: {e!s}") + if config_changed and not restart_postgresql: + # Wait for some more time than the Patroni's loop_wait default value (10 seconds), + # which tells how much time Patroni will wait before checking the configuration + # file again to reload it. + try: + for attempt in Retrying(stop=stop_after_attempt(5), wait=wait_fixed(3)): + with attempt: + restart_postgresql = ( + restart_postgresql or self.postgresql.is_restart_pending() + ) + if not restart_postgresql: + raise Exception + except RetryError: + # Ignore the error, as it happens only to indicate that the configuration has not changed. + pass self.unit_peer_data.update({"tls": "enabled" if self.is_tls_enabled else ""}) self.postgresql_client_relation.update_tls_flag("True" if self.is_tls_enabled else "False") @@ -2305,20 +2253,10 @@ def client_relations(self) -> list[Relation]: @property def relations_user_databases_map(self) -> dict: """Returns a user->databases map for all relations.""" - user_database_map = {} # Copy relations users directly instead of waiting for them to be created - for relation in self.model.relations[self.postgresql_client_relation.relation_name]: - user = f"relation_id_{relation.id}" - if database := self.postgresql_client_relation.database_provides.fetch_relation_field( - relation.id, "database" - ): - user_database_map[user] = database - if ( - not self.is_cluster_initialised - or not self._patroni.member_started - or self.postgresql.list_access_groups(current_host=self.is_connectivity_enabled) - != set(ACCESS_GROUPS) - ): + user_database_map = self._collect_user_relations() + + if not self.is_cluster_initialised or not self._patroni.member_started: user_database_map.update({ USER: "all", REPLICATION_USER: "all", @@ -2337,8 +2275,10 @@ def relations_user_databases_map(self) -> dict: ): continue user_database_map[user] = ",".join( - self.postgresql.list_accessible_databases_for_user( - user, current_host=self.is_connectivity_enabled + sorted( + self.postgresql.list_accessible_databases_for_user( + user, current_host=self.is_connectivity_enabled + ) ) ) if self.postgresql.list_access_groups( @@ -2354,17 +2294,32 @@ def relations_user_databases_map(self) -> dict: logger.debug("relations_user_databases_map: Unable to get users") return {USER: "all", REPLICATION_USER: "all", REWIND_USER: "all"} - @property + def _collect_user_relations(self) -> dict[str, str]: + user_db_pairs = {} + for relation in self.client_relations: + user = f"relation_id_{relation.id}" + if relation.name == "database": + if ( + database + := self.postgresql_client_relation.database_provides.fetch_relation_field( + relation.id, "database" + ) + ): + user_db_pairs[user] = database + else: + if database := relation.data.get(self.unit, {}).get("database"): + user_db_pairs[user] = database + return user_db_pairs + + @cached_property def generate_user_hash(self) -> str: """Generate expected user and database hash.""" - user_db_pairs = {} - for relation in self.model.relations[self.postgresql_client_relation.relation_name]: - if database := self.postgresql_client_relation.database_provides.fetch_relation_field( - relation.id, "database" - ): - user = f"relation_id_{relation.id}" - user_db_pairs[user] = database - return shake_128(str(user_db_pairs).encode()).hexdigest(16) + return shake_128(str(self._collect_user_relations()).encode()).hexdigest(16) + + @cached_property + def generate_config_hash(self) -> str: + """Generate current configuration hash.""" + return shake_128(str(self.config.dict()).encode()).hexdigest(16) def override_patroni_on_failure_condition( self, new_condition: str, repeat_cause: str | None diff --git a/src/patroni.py b/src/patroni.py index 5e8b1937fd..e1882da7ad 100644 --- a/src/patroni.py +++ b/src/patroni.py @@ -7,6 +7,7 @@ import logging import os import pwd +from functools import cached_property from typing import Any import requests @@ -95,11 +96,11 @@ def __init__( # TLS is enabled, otherwise True is set because it's the default value. self._verify = f"{self._storage_path}/{TLS_CA_FILE}" if tls_enabled else True - @property + @cached_property def _patroni_auth(self) -> requests.auth.HTTPBasicAuth: return requests.auth.HTTPBasicAuth("patroni", self._patroni_password) - @property + @cached_property def _patroni_url(self) -> str: """Patroni REST API URL.""" return f"{'https' if self._tls_enabled else 'http'}://{self._endpoint}:8008" @@ -145,7 +146,7 @@ def _get_alternative_patroni_url( url = self._patroni_url return url - @property + @cached_property def _synchronous_node_count(self) -> int: planned_units = self._charm.app.planned_units() if self._charm.config.synchronous_node_count == "all": @@ -252,7 +253,7 @@ def get_sync_standby_names(self) -> list[str]: sync_standbys.append("/".join(member["name"].rsplit("-", 1))) return sync_standbys - @property + @cached_property def cluster_members(self) -> set: """Get the current cluster members.""" # Request info from cluster endpoint (which returns all members of the cluster). @@ -429,19 +430,6 @@ def member_streaming(self) -> bool: return r.json().get("replication_state") == "streaming" - @property - def is_database_running(self) -> bool: - """Returns whether the PostgreSQL database process is running (and isn't frozen).""" - container = self._charm.unit.get_container("postgresql") - output = container.exec(["ps", "aux"]).wait_output() - postgresql_processes = [ - process - for process in output[0].split("/n") - if "/usr/lib/postgresql/14/bin/postgres" in process - ] - # Check whether the PostgreSQL process has a state equal to T (frozen). - return any(process for process in postgresql_processes if process.split()[7] != "T") - @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10)) def bulk_update_parameters_controller_by_patroni(self, parameters: dict[str, Any]) -> None: """Update the value of a parameter controller by Patroni. diff --git a/src/relations/db.py b/src/relations/db.py index 0a124bf52f..184fe82bac 100644 --- a/src/relations/db.py +++ b/src/relations/db.py @@ -116,6 +116,11 @@ def _on_relation_changed(self, event: RelationChangedEvent) -> None: event.defer() return + if not self.charm.postgresql.is_user_in_hba(f"relation-{event.relation.id}"): + logger.debug("Deferring on_relation_changed: User not in pg_hba yet.") + event.defer() + return + def _check_exist_current_relation(self) -> bool: return any(r in ALL_LEGACY_RELATIONS for r in self.charm.client_relations) diff --git a/tests/unit/test_charm.py b/tests/unit/test_charm.py index 84759e94b3..5286d943ae 100644 --- a/tests/unit/test_charm.py +++ b/tests/unit/test_charm.py @@ -3,7 +3,6 @@ import itertools import json import logging -from datetime import datetime from unittest import TestCase from unittest.mock import MagicMock, Mock, PropertyMock, patch, sentinel @@ -19,7 +18,7 @@ RelationDataTypeError, WaitingStatus, ) -from ops.pebble import Change, ChangeError, ChangeID, ServiceStatus +from ops.pebble import ChangeError, ServiceStatus from ops.testing import Harness from requests import ConnectionError as RequestsConnectionError from tenacity import RetryError, wait_fixed @@ -477,13 +476,10 @@ def test_fail_to_get_primary(harness): def test_on_update_status(harness): with ( patch("charm.logger") as _logger, - patch( - "charm.PostgresqlOperatorCharm._handle_processes_failures" - ) as _handle_processes_failures, patch( "charm.PostgresqlOperatorCharm.enable_disable_extensions" ) as _enable_disable_extensions, - patch("charm.Patroni.member_started") as _member_started, + patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, patch("charm.Patroni.get_primary") as _get_primary, patch("ops.model.Container.pebble") as _pebble, patch("ops.model.Container.restart") as _restart, @@ -522,24 +518,18 @@ def test_on_update_status(harness): _enable_disable_extensions.assert_called_once() _pebble.get_services.assert_called_once() - # Test when a failure need to be handled. - _pebble.get_services.return_value = [MagicMock(current=ServiceStatus.ACTIVE)] - _handle_processes_failures.return_value = True - harness.charm.on.update_status.emit() - _get_primary.assert_not_called() - # Test when a failure need to be handled. _pebble.get_services.return_value = [MagicMock(current=ServiceStatus.INACTIVE)] harness.charm.on.update_status.emit() - _get_primary.assert_not_called() _restart.assert_called_once_with("postgresql") _restart.reset_mock() + _get_primary.assert_called_once_with(unit_name_pattern=True) + _get_primary.reset_mock() # Test restart failed _pebble.get_services.return_value = [MagicMock(current=ServiceStatus.INACTIVE)] _restart.side_effect = ChangeError(err=None, change=None) harness.charm.on.update_status.emit() - _get_primary.assert_not_called() _restart.assert_called_once_with("postgresql") _logger.exception.assert_called_once_with("Failed to restart patroni") _restart.reset_mock() @@ -548,13 +538,11 @@ def test_on_update_status(harness): # Check primary message not being set (current unit is not the primary). _pebble.get_services.return_value = [MagicMock(current=ServiceStatus.ACTIVE)] - _handle_processes_failures.return_value = False _get_primary.side_effect = [ "postgresql-k8s/1", harness.charm.unit.name, ] harness.charm.on.update_status.emit() - _get_primary.assert_called_once() assert harness.model.unit.status != ActiveStatus("Primary") # Test again and check primary message being set (current unit is the primary). @@ -576,9 +564,6 @@ def test_on_update_status_no_connection(harness): def test_on_update_status_with_error_on_get_primary(harness): with ( - patch( - "charm.PostgresqlOperatorCharm._handle_processes_failures", return_value=False - ) as _handle_processes_failures, patch("charm.Patroni.member_started") as _member_started, patch("charm.Patroni.get_primary") as _get_primary, patch("ops.model.Container.pebble") as _pebble, @@ -748,9 +733,6 @@ def test_on_pgdata_storage_detaching(harness): def test_on_update_status_after_restore_operation(harness): with ( patch("charm.PostgresqlOperatorCharm._set_active_status") as _set_active_status, - patch( - "charm.PostgresqlOperatorCharm._handle_processes_failures" - ) as _handle_processes_failures, patch("charm.PostgreSQLBackups.can_use_s3_repository") as _can_use_s3_repository, patch( "charms.postgresql_k8s.v0.postgresql.PostgreSQL.get_current_timeline" @@ -776,7 +758,6 @@ def test_on_update_status_after_restore_operation(harness): harness.set_can_connect(POSTGRESQL_CONTAINER, True) harness.charm.on.update_status.emit() _update_config.assert_not_called() - _handle_processes_failures.assert_not_called() _set_active_status.assert_not_called() assert isinstance(harness.charm.unit.status, BlockedStatus) @@ -786,7 +767,6 @@ def test_on_update_status_after_restore_operation(harness): _member_started.return_value = False harness.charm.on.update_status.emit() _update_config.assert_not_called() - _handle_processes_failures.assert_not_called() _set_active_status.assert_not_called() tc.assertIsInstance(harness.charm.unit.status, ActiveStatus) @@ -799,10 +779,8 @@ def test_on_update_status_after_restore_operation(harness): # Test when the restore operation finished successfully. _member_started.return_value = True _can_use_s3_repository.return_value = (True, None) - _handle_processes_failures.return_value = False harness.charm.on.update_status.emit() _update_config.assert_called_once() - _handle_processes_failures.assert_called_once() _set_active_status.assert_called_once() assert isinstance(harness.charm.unit.status, ActiveStatus) @@ -1544,99 +1522,6 @@ def test_on_peer_relation_changed(harness): _set_active_status.assert_not_called() -def test_handle_processes_failures(harness): - with ( - patch("charm.Patroni.reinitialize_postgresql") as _reinitialize_postgresql, - patch("charm.Patroni.member_streaming", new_callable=PropertyMock) as _member_streaming, - patch( - "charm.PostgresqlOperatorCharm.is_standby_leader", new_callable=PropertyMock - ) as _is_standby_leader, - patch( - "charm.PostgresqlOperatorCharm.is_primary", new_callable=PropertyMock - ) as _is_primary, - patch( - "charm.Patroni.is_database_running", new_callable=PropertyMock - ) as _is_database_running, - patch("charm.Patroni.member_started", new_callable=PropertyMock) as _member_started, - patch("ops.model.Container.restart") as _restart, - ): - # Test when there are no processes failures to handle. - harness.set_can_connect(POSTGRESQL_CONTAINER, True) - for values in itertools.product( - [True, False], [True, False], [True, False], [True, False], [True, False] - ): - # Skip conditions that lead to handling a process failure. - if (not values[0] and values[2]) or (not values[3] and values[1] and not values[4]): - continue - - _member_started.side_effect = [values[0], values[1]] - _is_database_running.return_value = values[2] - _is_primary.return_value = values[3] - _member_streaming.return_value = values[4] - assert not harness.charm._handle_processes_failures() - _restart.assert_not_called() - _reinitialize_postgresql.assert_not_called() - - # Test when the Patroni process is not running. - _is_database_running.return_value = True - for values in itertools.product( - [ - None, - ChangeError( - err="fake error", - change=Change( - ChangeID("1"), - "fake kind", - "fake summary", - "fake status", - [], - True, - "fake error", - datetime.now(), - datetime.now(), - ), - ), - ], - [True, False], - [True, False], - [True, False], - ): - _restart.reset_mock() - _restart.side_effect = values[0] - _is_primary.return_value = values[1] - _member_started.side_effect = [False, values[2]] - _member_streaming.return_value = values[3] - harness.charm.unit.status = ActiveStatus() - result = harness.charm._handle_processes_failures() - assert result == (values[0] is None) - assert isinstance(harness.charm.unit.status, ActiveStatus) - _restart.assert_called_once_with("postgresql") - _reinitialize_postgresql.assert_not_called() - - # Test when the unit is a replica and it's not streaming from primary. - _restart.reset_mock() - _is_primary.return_value = False - _is_standby_leader.return_value = False - _member_streaming.return_value = False - for values in itertools.product( - [None, RetryError(last_attempt=1)], [True, False], [True, False] - ): - # Skip the condition that lead to handling other process failure. - if not values[1] and values[2]: - continue - - _reinitialize_postgresql.reset_mock() - _reinitialize_postgresql.side_effect = values[0] - _member_started.side_effect = [values[1], True] - _is_database_running.return_value = values[2] - harness.charm.unit.status = ActiveStatus() - result = harness.charm._handle_processes_failures() - assert result == (values[0] is None) - assert isinstance(harness.charm.unit.status, MaintenanceStatus) - _restart.assert_not_called() - _reinitialize_postgresql.assert_called_once() - - def test_push_ca_file_into_workload(harness): with ( patch("charm.PostgresqlOperatorCharm.update_config") as _update_config, @@ -1810,7 +1695,7 @@ def test_handle_postgresql_restart_need(harness): postgresql_mock.is_tls_enabled = PropertyMock(return_value=values[1]) postgresql_mock.is_restart_pending = PropertyMock(return_value=values[2]) - harness.charm._handle_postgresql_restart_need() + harness.charm._handle_postgresql_restart_need(True) _reload_patroni_configuration.assert_called_once() if values[0]: assert "tls" in harness.get_relation_data(rel_id, harness.charm.unit)