diff --git a/lib/charms/tempo_k8s/v2/tracing.py b/lib/charms/tempo_k8s/v2/tracing.py index 9e5defc8..b4e341c3 100644 --- a/lib/charms/tempo_k8s/v2/tracing.py +++ b/lib/charms/tempo_k8s/v2/tracing.py @@ -69,6 +69,7 @@ def __init__(self, *args): """ # noqa: W505 +import enum import json import logging from typing import ( @@ -94,7 +95,7 @@ def __init__(self, *args): ) from ops.framework import EventSource, Object from ops.model import ModelError, Relation -from pydantic import BaseModel +from pydantic import BaseModel, ConfigDict, Field # The unique Charmhub library identifier, never change it LIBID = "12977e9aa0b34367903d8afeb8c3d85d" @@ -104,7 +105,7 @@ def __init__(self, *args): # Increment this PATCH version before using `charmcraft publish-lib` or reset # to 0 if you are raising the major API version -LIBPATCH = 5 +LIBPATCH = 6 PYDEPS = ["pydantic"] @@ -121,16 +122,36 @@ def __init__(self, *args): "tempo_grpc", "otlp_grpc", "otlp_http", - # "jaeger_grpc", - "jaeger_thrift_compact", - "jaeger_thrift_http", - "jaeger_thrift_binary", ] -RawReceiver = Tuple[ReceiverProtocol, int] +RawReceiver = Tuple[ReceiverProtocol, str] +"""Helper type. A raw receiver is defined as a tuple consisting of the protocol name, and the (external, if available), +(secured, if available) resolvable server url. +""" + BUILTIN_JUJU_KEYS = {"ingress-address", "private-address", "egress-subnets"} +class TransportProtocolType(str, enum.Enum): + """Receiver Type.""" + + http = "http" + grpc = "grpc" + + +receiver_protocol_to_transport_protocol = { + "zipkin": TransportProtocolType.http, + "kafka": TransportProtocolType.http, + "opencensus": TransportProtocolType.http, + "tempo_http": TransportProtocolType.http, + "tempo_grpc": TransportProtocolType.grpc, + "otlp_grpc": TransportProtocolType.grpc, + "otlp_http": TransportProtocolType.http, +} +"""A mapping between telemetry protocols and their corresponding transport protocol. +""" + + class TracingError(Exception): """Base class for custom errors raised by this library.""" @@ -289,27 +310,81 @@ def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True): # todo use models from charm-relation-interfaces -class Receiver(BaseModel): # noqa: D101 - """Receiver data structure.""" +if int(pydantic.version.VERSION.split(".")[0]) < 2: + + class ProtocolType(BaseModel): # type: ignore + """Protocol Type.""" - protocol: ReceiverProtocol - port: int + class Config: + """Pydantic config.""" + use_enum_values = True + """Allow serializing enum values.""" -class TracingProviderAppData(DatabagModel): # noqa: D101 - """Application databag model for the tracing provider.""" + name: str = Field( + ..., + description="Receiver protocol name. What protocols are supported (and what they are called) " + "may differ per provider.", + examples=["otlp_grpc", "otlp_http", "tempo_http"], + ) + + type: TransportProtocolType = Field( + ..., + description="The transport protocol used by this receiver.", + examples=["http", "grpc"], + ) + +else: + + class ProtocolType(BaseModel): + """Protocol Type.""" + + model_config = ConfigDict( + # Allow serializing enum values. + use_enum_values=True + ) + """Pydantic config.""" + + name: str = Field( + ..., + description="Receiver protocol name. What protocols are supported (and what they are called) " + "may differ per provider.", + examples=["otlp_grpc", "otlp_http", "tempo_http"], + ) + + type: TransportProtocolType = Field( + ..., + description="The transport protocol used by this receiver.", + examples=["http", "grpc"], + ) - host: str - """Server hostname (local fqdn).""" - receivers: List[Receiver] - """Enabled receivers and ports at which they are listening.""" +class Receiver(BaseModel): + """Specification of an active receiver.""" - external_url: Optional[str] = None - """Server url. If an ingress is present, it will be the ingress address.""" + protocol: ProtocolType = Field(..., description="Receiver protocol name and type.") + url: str = Field( + ..., + description="""URL at which the receiver is reachable. If there's an ingress, it would be the external URL. + Otherwise, it would be the service's fqdn or internal IP. + If the protocol type is grpc, the url will not contain a scheme.""", + examples=[ + "http://traefik_address:2331", + "https://traefik_address:2331", + "http://tempo_public_ip:2331", + "https://tempo_public_ip:2331", + "tempo_public_ip:2331", + ], + ) + + +class TracingProviderAppData(DatabagModel): # noqa: D101 + """Application databag model for the tracing provider.""" - internal_scheme: Optional[str] = None - """Scheme for internal communication. If it is present, it will be protocol accepted by the provider.""" + receivers: List[Receiver] = Field( + ..., + description="List of all receivers enabled on the tracing provider.", + ) class TracingRequirerAppData(DatabagModel): # noqa: D101 @@ -481,10 +556,15 @@ def requested_receivers(self) -> List[ReceiverProtocol]: return TracingRequirerAppData.load(relation.data[app]).receivers +class BrokenEvent(RelationBrokenEvent): + """Event emitted when a relation on tracing is broken.""" + + class TracingEndpointProviderEvents(CharmEvents): """TracingEndpointProvider events.""" request = EventSource(RequestEvent) + broken = EventSource(BrokenEvent) class TracingEndpointProvider(Object): @@ -495,21 +575,17 @@ class TracingEndpointProvider(Object): def __init__( self, charm: CharmBase, - host: str, external_url: Optional[str] = None, relation_name: str = DEFAULT_RELATION_NAME, - internal_scheme: Optional[Literal["http", "https"]] = "http", ): """Initialize. Args: charm: a `CharmBase` instance that manages this instance of the Tempo service. - host: address of the node hosting the tempo server. external_url: external address of the node hosting the tempo server, if an ingress is present. relation_name: an optional string name of the relation between `charm` and the Tempo charmed service. The default is "tracing". - internal_scheme: scheme to use with internal urls. Raises: RelationNotFoundError: If there is no relation in the charm's metadata.yaml @@ -525,12 +601,10 @@ def __init__( charm, relation_name, RELATION_INTERFACE_NAME, RelationRole.provides ) - super().__init__(charm, relation_name + "tracing-provider-v2") + super().__init__(charm, relation_name + "tracing-provider") self._charm = charm - self._host = host self._external_url = external_url self._relation_name = relation_name - self._internal_scheme = internal_scheme self.framework.observe( self._charm.on[relation_name].relation_joined, self._on_relation_event ) @@ -540,18 +614,21 @@ def __init__( self.framework.observe( self._charm.on[relation_name].relation_changed, self._on_relation_event ) + self.framework.observe( + self._charm.on[relation_name].relation_broken, self._on_relation_broken_event + ) + + def _on_relation_broken_event(self, e: RelationBrokenEvent): + """Handle relation broken events.""" + self.on.broken.emit(e.relation) def _on_relation_event(self, e: RelationEvent): """Handle relation created/joined/changed events.""" - if self.is_v2(e.relation): + if self.is_requirer_ready(e.relation): self.on.request.emit(e.relation) - def is_v2(self, relation: Relation): - """Attempt to determine if this relation is a tracing v2 relation. - - Assumes that the V2 requirer will, as soon as possible (relation-created), - publish the list of requested ingestion receivers (can be empty too). - """ + def is_requirer_ready(self, relation: Relation): + """Attempt to determine if requirer has already populated app data.""" try: self._get_requested_protocols(relation) except NotReadyError: @@ -567,7 +644,7 @@ def _get_requested_protocols(relation: Relation): try: databag = TracingRequirerAppData.load(relation.data[app]) except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError): - logger.info(f"relation {relation} is not ready to talk tracing v2") + logger.info(f"relation {relation} is not ready to talk tracing") raise NotReadyError() return databag.receivers @@ -584,8 +661,8 @@ def requested_protocols(self): @property def relations(self) -> List[Relation]: - """All v2 relations active on this endpoint.""" - return [r for r in self._charm.model.relations[self._relation_name] if self.is_v2(r)] + """All relations active on this endpoint.""" + return self._charm.model.relations[self._relation_name] def publish_receivers(self, receivers: Sequence[RawReceiver]): """Let all requirers know that these receivers are active and listening.""" @@ -595,12 +672,16 @@ def publish_receivers(self, receivers: Sequence[RawReceiver]): for relation in self.relations: try: TracingProviderAppData( - host=self._host, - external_url=self._external_url or None, receivers=[ - Receiver(port=port, protocol=protocol) for protocol, port in receivers + Receiver( + url=url, + protocol=ProtocolType( + name=protocol, + type=receiver_protocol_to_transport_protocol[protocol], + ), + ) + for protocol, url in receivers ], - internal_scheme=self._internal_scheme, ).dump(relation.data[self._charm.app]) except ModelError as e: @@ -625,11 +706,9 @@ class EndpointRemovedEvent(RelationBrokenEvent): class EndpointChangedEvent(_AutoSnapshotEvent): """Event representing a change in one of the receiver endpoints.""" - __args__ = ("host", "external_url", "_receivers") + __args__ = ("_receivers",) if TYPE_CHECKING: - host = "" # type: str - external_url = "" # type: str _receivers = [] # type: List[dict] @property @@ -769,12 +848,6 @@ def is_ready(self, relation: Optional[Relation] = None): return False try: databag = dict(relation.data[relation.app]) - # "ingesters" Might be populated if the provider sees a v1 relation before a v2 requirer has had time to - # publish the 'receivers' list. This will make Tempo incorrectly assume that this is a v1 - # relation, and act accordingly. Later, when the requirer publishes the requested receivers, - # tempo will be able to course-correct. - if "ingesters" in databag: - del databag["ingesters"] TracingProviderAppData.load(databag) except (json.JSONDecodeError, pydantic.ValidationError, DataValidationError): @@ -790,9 +863,7 @@ def _on_tracing_relation_changed(self, event): return data = TracingProviderAppData.load(relation.data[relation.app]) - self.on.endpoint_changed.emit( # type: ignore - relation, data.host, data.external_url, [i.dict() for i in data.receivers] - ) + self.on.endpoint_changed.emit(relation, [i.dict() for i in data.receivers]) # type: ignore def _on_tracing_relation_broken(self, event: RelationBrokenEvent): """Notify the providers that the endpoint is broken.""" @@ -815,7 +886,7 @@ def _get_endpoint( if not app_data: return None receivers: List[Receiver] = list( - filter(lambda i: i.protocol == protocol, app_data.receivers) + filter(lambda i: i.protocol.name == protocol, app_data.receivers) ) if not receivers: logger.error(f"no receiver found with protocol={protocol!r}") @@ -827,18 +898,7 @@ def _get_endpoint( return receiver = receivers[0] - # if there's an external_url argument (v2.5+), use that. Otherwise, we use the tempo local fqdn - if app_data.external_url: - url = f"{app_data.external_url}:{receiver.port}" - else: - # if we didn't receive a scheme (old provider), we assume HTTP is used - url = f"{app_data.internal_scheme or 'http'}://{app_data.host}:{receiver.port}" - - if receiver.protocol.endswith("grpc"): - # TCP protocols don't want an http/https scheme prefix - url = url.split("://")[1] - - return url + return receiver.url def get_endpoint( self, protocol: ReceiverProtocol, relation: Optional[Relation] = None @@ -861,20 +921,3 @@ def get_endpoint( return None return endpoint - - # for backwards compatibility with earlier revisions: - def otlp_grpc_endpoint(self): - """Use TracingEndpointRequirer.get_endpoint('otlp_grpc') instead.""" - logger.warning( - "`TracingEndpointRequirer.otlp_grpc_endpoint` is deprecated. " - "Use `TracingEndpointRequirer.get_endpoint('otlp_grpc') instead.`" - ) - return self.get_endpoint("otlp_grpc") - - def otlp_http_endpoint(self): - """Use TracingEndpointRequirer.get_endpoint('otlp_http') instead.""" - logger.warning( - "`TracingEndpointRequirer.otlp_http_endpoint` is deprecated. " - "Use `TracingEndpointRequirer.get_endpoint('otlp_http') instead.`" - ) - return self.get_endpoint("otlp_http")