diff --git a/elastic_transport/_async_transport.py b/elastic_transport/_async_transport.py index 0e70202..a4a5988 100644 --- a/elastic_transport/_async_transport.py +++ b/elastic_transport/_async_transport.py @@ -40,6 +40,7 @@ from ._models import DEFAULT, DefaultType, HttpHeaders, NodeConfig, SniffOptions from ._node import AiohttpHttpNode, BaseAsyncNode from ._node_pool import NodePool, NodeSelector +from ._otel import OpenTelemetrySpan from ._serializer import Serializer from ._transport import ( DEFAULT_CLIENT_META_SERVICE, @@ -220,7 +221,7 @@ async def perform_request( # type: ignore[override] method, endpoint_id=resolve_default(endpoint_id, None), path_parts=resolve_default(path_parts, {}), - ) as span: + ) as otel_span: response = await self._perform_request( method, target, @@ -231,8 +232,9 @@ async def perform_request( # type: ignore[override] retry_on_timeout=retry_on_timeout, request_timeout=request_timeout, client_meta=client_meta, + otel_span=otel_span, ) - span.set_elastic_cloud_metadata(response.meta.headers) + otel_span.set_elastic_cloud_metadata(response.meta.headers) return response async def _perform_request( # type: ignore[override,return] @@ -247,6 +249,7 @@ async def _perform_request( # type: ignore[override,return] retry_on_timeout: Union[bool, DefaultType] = DEFAULT, request_timeout: Union[Optional[float], DefaultType] = DEFAULT, client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT, + otel_span: OpenTelemetrySpan, ) -> TransportApiResponse: await self._async_call() @@ -275,6 +278,7 @@ async def _perform_request( # type: ignore[override,return] request_body = self.serializers.dumps( body, mimetype=request_headers["content-type"] ) + otel_span.set_db_statement(request_body) else: request_body = None @@ -293,6 +297,7 @@ async def _perform_request( # type: ignore[override,return] node: BaseAsyncNode = self.node_pool.get() # type: ignore[assignment] start_time = self._loop.time() try: + otel_span.set_node_metadata(node.host, node.port, node.base_url, target) resp = await node.perform_request( method, target, diff --git a/elastic_transport/_otel.py b/elastic_transport/_otel.py index 7e3e98b..74543c3 100644 --- a/elastic_transport/_otel.py +++ b/elastic_transport/_otel.py @@ -30,33 +30,94 @@ _tracer = None +# Valid values for the enabled config are 'true' and 'false'. Default is 'true'. ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED" +# Describes how to handle search queries in the request body when assigned to +# a span attribute. +# Valid values are 'omit' and 'raw'. +# Default is 'omit' as 'raw' has security implications. +BODY_STRATEGY_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY" +DEFAULT_BODY_STRATEGY = "omit" + +# A list of the Elasticsearch endpoints that qualify as "search" endpoints. The search query in +# the request body may be captured for these endpoints, depending on the body capture strategy. +SEARCH_ENDPOINTS = ( + "search", + "async_search.submit", + "msearch", + "eql.search", + "esql.query", + "terms_enum", + "search_template", + "msearch_template", + "render_search_template", +) class OpenTelemetrySpan: - def __init__(self, otel_span: Optional[Span]): + def __init__( + self, + otel_span: Optional[Span], + endpoint_id: Optional[str] = None, + body_strategy: Optional[str] = None, + ): self.otel_span = otel_span + self.body_strategy = body_strategy + self.endpoint_id = endpoint_id + + def set_node_metadata( + self, host: str, port: int, base_url: str, target: str + ) -> None: + if self.otel_span is None: + return - def set_attribute(self, key: str, value: str) -> None: - if self.otel_span is not None: - self.otel_span.set_attribute(key, value) + # url.full does not contain auth info which is passed as headers + self.otel_span.set_attribute("url.full", base_url + target) + self.otel_span.set_attribute("server.address", host) + self.otel_span.set_attribute("server.port", port) def set_elastic_cloud_metadata(self, headers: Mapping[str, str]) -> None: + if self.otel_span is None: + return + cluster_name = headers.get("X-Found-Handling-Cluster") if cluster_name is not None: - self.set_attribute("db.elasticsearch.cluster.name", cluster_name) + self.otel_span.set_attribute("db.elasticsearch.cluster.name", cluster_name) node_name = headers.get("X-Found-Handling-Instance") if node_name is not None: - self.set_attribute("db.elasticsearch.node.name", node_name) + self.otel_span.set_attribute("db.elasticsearch.node.name", node_name) + + def set_db_statement(self, serialized_body: bytes) -> None: + if self.otel_span is None: + return + + if self.body_strategy == "omit": + return + elif self.body_strategy == "raw" and self.endpoint_id in SEARCH_ENDPOINTS: + self.otel_span.set_attribute( + "db.statement", serialized_body.decode("utf-8") + ) class OpenTelemetry: - def __init__(self, enabled: bool | None = None, tracer: trace.Tracer | None = None): + def __init__( + self, + enabled: bool | None = None, + tracer: trace.Tracer | None = None, + body_strategy: str | None = None, + ): if enabled is None: enabled = os.environ.get(ENABLED_ENV_VAR, "false") != "false" self.tracer = tracer or _tracer self.enabled = enabled and self.tracer is not None + if body_strategy is not None: + self.body_strategy = body_strategy + else: + self.body_strategy = os.environ.get( + BODY_STRATEGY_ENV_VAR, DEFAULT_BODY_STRATEGY + ) + @contextlib.contextmanager def span( self, @@ -77,4 +138,9 @@ def span( otel_span.set_attribute("db.operation", endpoint_id) for key, value in path_parts.items(): otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value) - yield OpenTelemetrySpan(otel_span) + + yield OpenTelemetrySpan( + otel_span, + endpoint_id=endpoint_id, + body_strategy=self.body_strategy, + ) diff --git a/elastic_transport/_transport.py b/elastic_transport/_transport.py index ed013f2..e44922e 100644 --- a/elastic_transport/_transport.py +++ b/elastic_transport/_transport.py @@ -60,7 +60,7 @@ Urllib3HttpNode, ) from ._node_pool import NodePool, NodeSelector -from ._otel import OpenTelemetry +from ._otel import OpenTelemetry, OpenTelemetrySpan from ._serializer import DEFAULT_SERIALIZERS, Serializer, SerializerCollection from ._version import __version__ from .client_utils import client_meta_version, resolve_default @@ -303,8 +303,8 @@ def perform_request( method, endpoint_id=resolve_default(endpoint_id, None), path_parts=resolve_default(path_parts, {}), - ) as span: - api_response = self._perform_request( + ) as otel_span: + response = self._perform_request( method, target, body=body, @@ -314,9 +314,10 @@ def perform_request( retry_on_timeout=retry_on_timeout, request_timeout=request_timeout, client_meta=client_meta, + otel_span=otel_span, ) - span.set_elastic_cloud_metadata(api_response.meta.headers) - return api_response + otel_span.set_elastic_cloud_metadata(response.meta.headers) + return response def _perform_request( # type: ignore[return] self, @@ -330,6 +331,7 @@ def _perform_request( # type: ignore[return] retry_on_timeout: Union[bool, DefaultType] = DEFAULT, request_timeout: Union[Optional[float], DefaultType] = DEFAULT, client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT, + otel_span: OpenTelemetrySpan, ) -> TransportApiResponse: if headers is DEFAULT: request_headers = HttpHeaders() @@ -356,6 +358,7 @@ def _perform_request( # type: ignore[return] request_body = self.serializers.dumps( body, mimetype=request_headers["content-type"] ) + otel_span.set_db_statement(request_body) else: request_body = None @@ -374,6 +377,7 @@ def _perform_request( # type: ignore[return] node = self.node_pool.get() start_time = time.time() try: + otel_span.set_node_metadata(node.host, node.port, node.base_url, target) resp = node.perform_request( method, target, diff --git a/tests/test_otel.py b/tests/test_otel.py index e6cb36a..f5a7e1d 100644 --- a/tests/test_otel.py +++ b/tests/test_otel.py @@ -15,11 +15,13 @@ # specific language governing permissions and limitations # under the License. +import os from opentelemetry.sdk.trace import TracerProvider, export from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter -from elastic_transport._otel import OpenTelemetry +from elastic_transport import JsonSerializer +from elastic_transport._otel import ENABLED_ENV_VAR, OpenTelemetry def setup_tracing(): @@ -32,6 +34,34 @@ def setup_tracing(): return tracer, memory_exporter +def test_no_span(): + # With telemetry disabled, those calls should not raise + otel = OpenTelemetry(enabled=False) + with otel.span( + "GET", + endpoint_id="ml.open_job", + path_parts={"job_id": "my-job"}, + ) as span: + span.set_db_statement(JsonSerializer().dumps({"timeout": "1m"})) + span.set_node_metadata( + "localhost", + 9200, + "http://localhost:9200/", + "_ml/anomaly_detectors/my-job/_open", + ) + span.set_elastic_cloud_metadata( + { + "X-Found-Handling-Cluster": "e9106fc68e3044f0b1475b04bf4ffd5f", + "X-Found-Handling-Instance": "instance-0000000001", + } + ) + + +def test_enabled(): + otel = OpenTelemetry() + assert otel.enabled == (os.environ.get(ENABLED_ENV_VAR, "false") != "false") + + def test_minimal_span(): tracer, memory_exporter = setup_tracing() @@ -52,8 +82,17 @@ def test_detailed_span(): tracer, memory_exporter = setup_tracing() otel = OpenTelemetry(enabled=True, tracer=tracer) with otel.span( - "GET", endpoint_id="ml.close_job", path_parts={"job_id": "my-job", "foo": "bar"} + "GET", + endpoint_id="ml.open_job", + path_parts={"job_id": "my-job"}, ) as span: + span.set_db_statement(JsonSerializer().dumps({"timeout": "1m"})) + span.set_node_metadata( + "localhost", + 9200, + "http://localhost:9200/", + "_ml/anomaly_detectors/my-job/_open", + ) span.set_elastic_cloud_metadata( { "X-Found-Handling-Cluster": "e9106fc68e3044f0b1475b04bf4ffd5f", @@ -63,13 +102,32 @@ def test_detailed_span(): spans = memory_exporter.get_finished_spans() assert len(spans) == 1 - assert spans[0].name == "ml.close_job" + assert spans[0].name == "ml.open_job" assert spans[0].attributes == { "http.request.method": "GET", + "url.full": "http://localhost:9200/_ml/anomaly_detectors/my-job/_open", + "server.address": "localhost", + "server.port": 9200, "db.system": "elasticsearch", - "db.operation": "ml.close_job", + "db.operation": "ml.open_job", "db.elasticsearch.path_parts.job_id": "my-job", - "db.elasticsearch.path_parts.foo": "bar", "db.elasticsearch.cluster.name": "e9106fc68e3044f0b1475b04bf4ffd5f", "db.elasticsearch.node.name": "instance-0000000001", } + + +def test_db_statement(): + tracer, memory_exporter = setup_tracing() + otel = OpenTelemetry(enabled=True, tracer=tracer, body_strategy="raw") + with otel.span("GET", endpoint_id="search", path_parts={}) as span: + span.set_db_statement(JsonSerializer().dumps({"query": {"match_all": {}}})) + + spans = memory_exporter.get_finished_spans() + assert len(spans) == 1 + assert spans[0].name == "search" + assert spans[0].attributes == { + "http.request.method": "GET", + "db.system": "elasticsearch", + "db.operation": "search", + "db.statement": '{"query":{"match_all":{}}}', + }