Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 82 additions & 41 deletions lib/charms/tempo_coordinator_k8s/v0/charm_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

```python
# import the necessary charm libs
from charms.tempo_coordinator_k8s.v0.tracing import TracingEndpointRequirer, charm_tracing_config
from charms.tempo_coordinator_k8s.v0.tracing import (
TracingEndpointRequirer,
charm_tracing_config,
)
from charms.tempo_coordinator_k8s.v0.charm_tracing import charm_tracing


# decorate your charm class with charm_tracing:
@charm_tracing(
# forward-declare the instance attributes that the instrumentor will look up to obtain the
# tempo endpoint and server certificate
tracing_endpoint="tracing_endpoint",
server_cert="server_cert"
server_cert="server_cert",
)
class MyCharm(CharmBase):
_path_to_cert = "/path/to/cert.crt"
Expand All @@ -37,10 +41,12 @@ class MyCharm(CharmBase):
# If you do support TLS, you'll need to make sure that the server cert is copied to this location
# and kept up to date so the instrumentor can use it.

def __init__(self, ...):
...
self.tracing = TracingEndpointRequirer(self, ...)
self.tracing_endpoint, self.server_cert = charm_tracing_config(self.tracing, self._path_to_cert)
def __init__(self, framework):
# ...
self.tracing = TracingEndpointRequirer(self)
self.tracing_endpoint, self.server_cert = charm_tracing_config(
self.tracing, self._path_to_cert
)
```

# Detailed usage
Expand Down Expand Up @@ -226,12 +232,6 @@ def my_tracing_endpoint(self) -> Optional[str]:
3) If you were passing a certificate (str) using `server_cert`, you need to change it to
provide an *absolute* path to the certificate file instead.
"""
import typing

from opentelemetry.exporter.otlp.proto.common._internal.trace_encoder import (
encode_spans,
)
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter


def _remove_stale_otel_sdk_packages():
Expand Down Expand Up @@ -286,12 +286,15 @@ def _remove_stale_otel_sdk_packages():
# apply hacky patch to remove stale opentelemetry sdk packages on upgrade-charm.
# it could be trouble if someone ever decides to implement their own tracer parallel to
# ours and before the charm has inited. We assume they won't.
# !!IMPORTANT!! keep all otlp imports UNDER this call.
_remove_stale_otel_sdk_packages()

import functools
import inspect
import logging
import os
import typing
from collections import deque
from contextlib import contextmanager
from contextvars import Context, ContextVar, copy_context
from pathlib import Path
Expand All @@ -310,6 +313,9 @@ def _remove_stale_otel_sdk_packages():

import opentelemetry
import ops
from opentelemetry.exporter.otlp.proto.common._internal.trace_encoder import (
encode_spans,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import ReadableSpan, Span, TracerProvider
Expand All @@ -318,6 +324,7 @@ def _remove_stale_otel_sdk_packages():
SpanExporter,
SpanExportResult,
)
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from opentelemetry.trace import INVALID_SPAN, Tracer
from opentelemetry.trace import get_current_span as otlp_get_current_span
from opentelemetry.trace import (
Expand All @@ -338,7 +345,7 @@ def _remove_stale_otel_sdk_packages():
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version

LIBPATCH = 5
LIBPATCH = 6

PYDEPS = ["opentelemetry-exporter-otlp-proto-http==1.21.0"]

Expand Down Expand Up @@ -366,7 +373,9 @@ def _remove_stale_otel_sdk_packages():
BUFFER_DEFAULT_MAX_EVENT_HISTORY_LENGTH = 100
_MiB_TO_B = 2**20 # megabyte to byte conversion rate
_OTLP_SPAN_EXPORTER_TIMEOUT = 1
"""Timeout in seconds that the OTLP span exporter has to push traces to the backend."""


# Timeout in seconds that the OTLP span exporter has to push traces to the backend.


class _Buffer:
Expand Down Expand Up @@ -398,45 +407,75 @@ def save(self, spans: typing.Sequence[ReadableSpan]):
if self._max_event_history_length < 1:
dev_logger.debug("buffer disabled: max history length < 1")
return

current_history_length = len(self.load())
new_history_length = current_history_length + len(spans)
if (diff := self._max_event_history_length - new_history_length) < 0:
self.drop(diff)
self._save(spans)

def _serialize(self, spans: Sequence[ReadableSpan]) -> bytes:
# encode because otherwise we can't json-dump them
return encode_spans(spans).SerializeToString()

def _prune(self, queue: Sequence[bytes]) -> Sequence[bytes]:
"""Prune the queue until it fits in our constraints."""
n_dropped_spans = 0
# drop older events if we are past the max history length
overflow = len(queue) - self._max_event_history_length
if overflow > 0:
n_dropped_spans += overflow
logger.warning(
f"charm tracing buffer exceeds max history length ({self._max_event_history_length} events)"
)

new_spans = deque(queue[-self._max_event_history_length :])

# drop older events if the buffer is too big; all units are bytes
logged_drop = False
target_size = self._max_buffer_size_mib * _MiB_TO_B
current_size = sum(len(span) for span in new_spans)
while current_size > target_size:
current_size -= len(new_spans.popleft())
n_dropped_spans += 1

# only do this once
if not logged_drop:
logger.warning(
f"charm tracing buffer exceeds size limit ({self._max_buffer_size_mib}MiB)."
)
logged_drop = True

if n_dropped_spans > 0:
dev_logger.debug(
f"charm tracing buffer overflow: dropped {n_dropped_spans} older spans. "
f"Please increase the buffer limits, or ensure the spans can be flushed."
)
return new_spans

def _save(self, spans: Sequence[ReadableSpan], replace: bool = False):
dev_logger.debug(f"saving {len(spans)} new spans to buffer")
old = [] if replace else self.load()
new = self._serialize(spans)
queue = old + [self._serialize(spans)]
new_buffer = self._prune(queue)

try:
# if the buffer exceeds the size limit, we start dropping old spans until it does

while len((new + self._SPANSEP.join(old))) > (self._max_buffer_size_mib * _MiB_TO_B):
if not old:
# if we've already dropped all spans and still we can't get under the
# size limit, we can't save this span
logger.error(
f"span exceeds total buffer size limit ({self._max_buffer_size_mib}MiB); "
f"buffering FAILED"
)
return

old = old[1:]
logger.warning(
f"buffer size exceeds {self._max_buffer_size_mib}MiB; dropping older spans... "
f"Please increase the buffer size, disable buffering, or ensure the spans can be flushed."
)
if queue and not new_buffer:
# this means that, given our constraints, we are pruning so much that there are no events left.
logger.error(
"No charm events could be buffered into charm traces buffer. Please increase the memory or history size limits."
)
return

self._db_file.write_bytes(new + self._SPANSEP.join(old))
try:
self._write(new_buffer)
except Exception:
logger.exception("error buffering spans")

def _write(self, spans: Sequence[bytes]):
"""Write the spans to the db file."""
# ensure the destination folder exists
db_file_dir = self._db_file.parent
if not db_file_dir.exists():
dev_logger.info(f"creating buffer dir: {db_file_dir}")
db_file_dir.mkdir(parents=True)

self._db_file.write_bytes(self._SPANSEP.join(spans))

def load(self) -> List[bytes]:
"""Load currently buffered spans from the cache file.

Expand All @@ -461,8 +500,10 @@ def drop(self, n_spans: Optional[int] = None):
else:
dev_logger.debug("emptying buffer")
new = []

self._db_file.write_bytes(self._SPANSEP.join(new))
try:
self._write(new)
except Exception:
logger.exception("error writing charm traces buffer")

def flush(self) -> Optional[bool]:
"""Export all buffered spans to the given exporter, then clear the buffer.
Expand Down
5 changes: 4 additions & 1 deletion src/backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ def can_use_s3_repository(self) -> tuple[bool, str | None]:
for line in system_identifier_from_instance.splitlines()
if "Database system identifier" in line
).split(" ")[-1]
system_identifier_from_stanza = str(stanza.get("db")[0]["system-id"])
stanza_dbs = stanza.get("db")
system_identifier_from_stanza = (
str(stanza_dbs[0]["system-id"]) if len(stanza_dbs) else None
)
if system_identifier_from_instance != system_identifier_from_stanza:
logger.debug(
f"can_use_s3_repository: incompatible system identifier s3={system_identifier_from_stanza}, local={system_identifier_from_instance}"
Expand Down
13 changes: 13 additions & 0 deletions tests/unit/test_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,19 @@ def test_can_use_s3_repository(harness):
]
assert harness.charm.backup.can_use_s3_repository() == (True, None)

# Empty db
_execute_command.side_effect = None
_execute_command.return_value = (1, "", "")
pgbackrest_info_other_cluster_name_backup_output = (
0,
f'[{{"db": [], "name": "another-model.{harness.charm.cluster_name}"}}]',
"",
)
assert harness.charm.backup.can_use_s3_repository() == (
False,
FAILED_TO_INITIALIZE_STANZA_ERROR_MESSAGE,
)


def test_construct_endpoint(harness):
# Test with an AWS endpoint without region.
Expand Down
3 changes: 2 additions & 1 deletion tests/unit/test_charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,8 @@ def test_on_config_changed(harness):
harness.charm.on.config_changed.emit()
_enable_disable_extensions.assert_called_once()
_set_up_relation.assert_called_once()
harness.remove_relation(db_relation_id)
with harness.hooks_disabled():
harness.remove_relation(db_relation_id)

_enable_disable_extensions.reset_mock()
_set_up_relation.reset_mock()
Expand Down