Skip to content

Commit

Permalink
chore: update charm libraries (#256)
Browse files Browse the repository at this point in the history
  • Loading branch information
observability-noctua-bot committed May 31, 2024
1 parent 3f051f1 commit 0700cac
Showing 1 changed file with 127 additions and 84 deletions.
211 changes: 127 additions & 84 deletions lib/charms/tempo_k8s/v2/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, *args):
""" # noqa: W505
import enum
import json
import logging
from typing import (
Expand All @@ -94,7 +95,7 @@ def __init__(self, *args):
)
from ops.framework import EventSource, Object
from ops.model import ModelError, Relation
from pydantic import BaseModel
from pydantic import BaseModel, ConfigDict, Field

# The unique Charmhub library identifier, never change it
LIBID = "12977e9aa0b34367903d8afeb8c3d85d"
Expand All @@ -104,7 +105,7 @@ def __init__(self, *args):

# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 5
LIBPATCH = 6

PYDEPS = ["pydantic"]

Expand All @@ -121,16 +122,36 @@ def __init__(self, *args):
"tempo_grpc",
"otlp_grpc",
"otlp_http",
# "jaeger_grpc",
"jaeger_thrift_compact",
"jaeger_thrift_http",
"jaeger_thrift_binary",
]

RawReceiver = Tuple[ReceiverProtocol, int]
RawReceiver = Tuple[ReceiverProtocol, str]
"""Helper type. A raw receiver is defined as a tuple consisting of the protocol name, and the (external, if available),
(secured, if available) resolvable server url.
"""

BUILTIN_JUJU_KEYS = {"ingress-address", "private-address", "egress-subnets"}


class TransportProtocolType(str, enum.Enum):
"""Receiver Type."""

http = "http"
grpc = "grpc"


receiver_protocol_to_transport_protocol = {
"zipkin": TransportProtocolType.http,
"kafka": TransportProtocolType.http,
"opencensus": TransportProtocolType.http,
"tempo_http": TransportProtocolType.http,
"tempo_grpc": TransportProtocolType.grpc,
"otlp_grpc": TransportProtocolType.grpc,
"otlp_http": TransportProtocolType.http,
}
"""A mapping between telemetry protocols and their corresponding transport protocol.
"""


class TracingError(Exception):
"""Base class for custom errors raised by this library."""

Expand Down Expand Up @@ -289,27 +310,81 @@ def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True):


# todo use models from charm-relation-interfaces
class Receiver(BaseModel): # noqa: D101
"""Receiver data structure."""
if int(pydantic.version.VERSION.split(".")[0]) < 2:

class ProtocolType(BaseModel): # type: ignore
"""Protocol Type."""

protocol: ReceiverProtocol
port: int
class Config:
"""Pydantic config."""

use_enum_values = True
"""Allow serializing enum values."""

class TracingProviderAppData(DatabagModel): # noqa: D101
"""Application databag model for the tracing provider."""
name: str = Field(
...,
description="Receiver protocol name. What protocols are supported (and what they are called) "
"may differ per provider.",
examples=["otlp_grpc", "otlp_http", "tempo_http"],
)

type: TransportProtocolType = Field(
...,
description="The transport protocol used by this receiver.",
examples=["http", "grpc"],
)

else:

class ProtocolType(BaseModel):
"""Protocol Type."""

model_config = ConfigDict(
# Allow serializing enum values.
use_enum_values=True
)
"""Pydantic config."""

name: str = Field(
...,
description="Receiver protocol name. What protocols are supported (and what they are called) "
"may differ per provider.",
examples=["otlp_grpc", "otlp_http", "tempo_http"],
)

type: TransportProtocolType = Field(
...,
description="The transport protocol used by this receiver.",
examples=["http", "grpc"],
)

host: str
"""Server hostname (local fqdn)."""

receivers: List[Receiver]
"""Enabled receivers and ports at which they are listening."""
class Receiver(BaseModel):
"""Specification of an active receiver."""

external_url: Optional[str] = None
"""Server url. If an ingress is present, it will be the ingress address."""
protocol: ProtocolType = Field(..., description="Receiver protocol name and type.")
url: str = Field(
...,
description="""URL at which the receiver is reachable. If there's an ingress, it would be the external URL.
Otherwise, it would be the service's fqdn or internal IP.
If the protocol type is grpc, the url will not contain a scheme.""",
examples=[
"http://traefik_address:2331",
"https://traefik_address:2331",
"http://tempo_public_ip:2331",
"https://tempo_public_ip:2331",
"tempo_public_ip:2331",
],
)


class TracingProviderAppData(DatabagModel): # noqa: D101
"""Application databag model for the tracing provider."""

internal_scheme: Optional[str] = None
"""Scheme for internal communication. If it is present, it will be protocol accepted by the provider."""
receivers: List[Receiver] = Field(
...,
description="List of all receivers enabled on the tracing provider.",
)


class TracingRequirerAppData(DatabagModel): # noqa: D101
Expand Down Expand Up @@ -481,10 +556,15 @@ def requested_receivers(self) -> List[ReceiverProtocol]:
return TracingRequirerAppData.load(relation.data[app]).receivers


class BrokenEvent(RelationBrokenEvent):
"""Event emitted when a relation on tracing is broken."""


class TracingEndpointProviderEvents(CharmEvents):
"""TracingEndpointProvider events."""

request = EventSource(RequestEvent)
broken = EventSource(BrokenEvent)


class TracingEndpointProvider(Object):
Expand All @@ -495,21 +575,17 @@ class TracingEndpointProvider(Object):
def __init__(
self,
charm: CharmBase,
host: str,
external_url: Optional[str] = None,
relation_name: str = DEFAULT_RELATION_NAME,
internal_scheme: Optional[Literal["http", "https"]] = "http",
):
"""Initialize.
Args:
charm: a `CharmBase` instance that manages this instance of the Tempo service.
host: address of the node hosting the tempo server.
external_url: external address of the node hosting the tempo server,
if an ingress is present.
relation_name: an optional string name of the relation between `charm`
and the Tempo charmed service. The default is "tracing".
internal_scheme: scheme to use with internal urls.
Raises:
RelationNotFoundError: If there is no relation in the charm's metadata.yaml
Expand All @@ -525,12 +601,10 @@ def __init__(
charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.provides
)

super().__init__(charm, relation_name + "tracing-provider-v2")
super().__init__(charm, relation_name + "tracing-provider")
self._charm = charm
self._host = host
self._external_url = external_url
self._relation_name = relation_name
self._internal_scheme = internal_scheme
self.framework.observe(
self._charm.on[relation_name].relation_joined, self._on_relation_event
)
Expand All @@ -540,18 +614,21 @@ def __init__(
self.framework.observe(
self._charm.on[relation_name].relation_changed, self._on_relation_event
)
self.framework.observe(
self._charm.on[relation_name].relation_broken, self._on_relation_broken_event
)

def _on_relation_broken_event(self, e: RelationBrokenEvent):
"""Handle relation broken events."""
self.on.broken.emit(e.relation)

def _on_relation_event(self, e: RelationEvent):
"""Handle relation created/joined/changed events."""
if self.is_v2(e.relation):
if self.is_requirer_ready(e.relation):
self.on.request.emit(e.relation)

def is_v2(self, relation: Relation):
"""Attempt to determine if this relation is a tracing v2 relation.
Assumes that the V2 requirer will, as soon as possible (relation-created),
publish the list of requested ingestion receivers (can be empty too).
"""
def is_requirer_ready(self, relation: Relation):
"""Attempt to determine if requirer has already populated app data."""
try:
self._get_requested_protocols(relation)
except NotReadyError:
Expand All @@ -567,7 +644,7 @@ def _get_requested_protocols(relation: Relation):
try:
databag = TracingRequirerAppData.load(relation.data[app])
except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError):
logger.info(f"relation {relation} is not ready to talk tracing v2")
logger.info(f"relation {relation} is not ready to talk tracing")
raise NotReadyError()
return databag.receivers

Expand All @@ -584,8 +661,8 @@ def requested_protocols(self):

@property
def relations(self) -> List[Relation]:
"""All v2 relations active on this endpoint."""
return [r for r in self._charm.model.relations[self._relation_name] if self.is_v2(r)]
"""All relations active on this endpoint."""
return self._charm.model.relations[self._relation_name]

def publish_receivers(self, receivers: Sequence[RawReceiver]):
"""Let all requirers know that these receivers are active and listening."""
Expand All @@ -595,12 +672,16 @@ def publish_receivers(self, receivers: Sequence[RawReceiver]):
for relation in self.relations:
try:
TracingProviderAppData(
host=self._host,
external_url=self._external_url or None,
receivers=[
Receiver(port=port, protocol=protocol) for protocol, port in receivers
Receiver(
url=url,
protocol=ProtocolType(
name=protocol,
type=receiver_protocol_to_transport_protocol[protocol],
),
)
for protocol, url in receivers
],
internal_scheme=self._internal_scheme,
).dump(relation.data[self._charm.app])

except ModelError as e:
Expand All @@ -625,11 +706,9 @@ class EndpointRemovedEvent(RelationBrokenEvent):
class EndpointChangedEvent(_AutoSnapshotEvent):
"""Event representing a change in one of the receiver endpoints."""

__args__ = ("host", "external_url", "_receivers")
__args__ = ("_receivers",)

if TYPE_CHECKING:
host = "" # type: str
external_url = "" # type: str
_receivers = [] # type: List[dict]

@property
Expand Down Expand Up @@ -769,12 +848,6 @@ def is_ready(self, relation: Optional[Relation] = None):
return False
try:
databag = dict(relation.data[relation.app])
# "ingesters" Might be populated if the provider sees a v1 relation before a v2 requirer has had time to
# publish the 'receivers' list. This will make Tempo incorrectly assume that this is a v1
# relation, and act accordingly. Later, when the requirer publishes the requested receivers,
# tempo will be able to course-correct.
if "ingesters" in databag:
del databag["ingesters"]
TracingProviderAppData.load(databag)

except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError):
Expand All @@ -790,9 +863,7 @@ def _on_tracing_relation_changed(self, event):
return

data = TracingProviderAppData.load(relation.data[relation.app])
self.on.endpoint_changed.emit( # type: ignore
relation, data.host, data.external_url, [i.dict() for i in data.receivers]
)
self.on.endpoint_changed.emit(relation, [i.dict() for i in data.receivers]) # type: ignore

def _on_tracing_relation_broken(self, event: RelationBrokenEvent):
"""Notify the providers that the endpoint is broken."""
Expand All @@ -815,7 +886,7 @@ def _get_endpoint(
if not app_data:
return None
receivers: List[Receiver] = list(
filter(lambda i: i.protocol == protocol, app_data.receivers)
filter(lambda i: i.protocol.name == protocol, app_data.receivers)
)
if not receivers:
logger.error(f"no receiver found with protocol={protocol!r}")
Expand All @@ -827,18 +898,7 @@ def _get_endpoint(
return

receiver = receivers[0]
# if there's an external_url argument (v2.5+), use that. Otherwise, we use the tempo local fqdn
if app_data.external_url:
url = f"{app_data.external_url}:{receiver.port}"
else:
# if we didn't receive a scheme (old provider), we assume HTTP is used
url = f"{app_data.internal_scheme or 'http'}://{app_data.host}:{receiver.port}"

if receiver.protocol.endswith("grpc"):
# TCP protocols don't want an http/https scheme prefix
url = url.split("://")[1]

return url
return receiver.url

def get_endpoint(
self, protocol: ReceiverProtocol, relation: Optional[Relation] = None
Expand All @@ -861,20 +921,3 @@ def get_endpoint(

return None
return endpoint

# for backwards compatibility with earlier revisions:
def otlp_grpc_endpoint(self):
"""Use TracingEndpointRequirer.get_endpoint('otlp_grpc') instead."""
logger.warning(
"`TracingEndpointRequirer.otlp_grpc_endpoint` is deprecated. "
"Use `TracingEndpointRequirer.get_endpoint('otlp_grpc') instead.`"
)
return self.get_endpoint("otlp_grpc")

def otlp_http_endpoint(self):
"""Use TracingEndpointRequirer.get_endpoint('otlp_http') instead."""
logger.warning(
"`TracingEndpointRequirer.otlp_http_endpoint` is deprecated. "
"Use `TracingEndpointRequirer.get_endpoint('otlp_http') instead.`"
)
return self.get_endpoint("otlp_http")

0 comments on commit 0700cac

Please sign in to comment.