Skip to content

Commit

Permalink
Support db.statement, server and url attributes (#155)
Browse files Browse the repository at this point in the history
  • Loading branch information
pquentin committed Mar 25, 2024
1 parent 4c15f79 commit 989099a
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 20 deletions.
9 changes: 7 additions & 2 deletions elastic_transport/_async_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from ._models import DEFAULT, DefaultType, HttpHeaders, NodeConfig, SniffOptions
from ._node import AiohttpHttpNode, BaseAsyncNode
from ._node_pool import NodePool, NodeSelector
from ._otel import OpenTelemetrySpan
from ._serializer import Serializer
from ._transport import (
DEFAULT_CLIENT_META_SERVICE,
Expand Down Expand Up @@ -220,7 +221,7 @@ async def perform_request( # type: ignore[override]
method,
endpoint_id=resolve_default(endpoint_id, None),
path_parts=resolve_default(path_parts, {}),
) as span:
) as otel_span:
response = await self._perform_request(
method,
target,
Expand All @@ -231,8 +232,9 @@ async def perform_request( # type: ignore[override]
retry_on_timeout=retry_on_timeout,
request_timeout=request_timeout,
client_meta=client_meta,
otel_span=otel_span,
)
span.set_elastic_cloud_metadata(response.meta.headers)
otel_span.set_elastic_cloud_metadata(response.meta.headers)
return response

async def _perform_request( # type: ignore[override,return]
Expand All @@ -247,6 +249,7 @@ async def _perform_request( # type: ignore[override,return]
retry_on_timeout: Union[bool, DefaultType] = DEFAULT,
request_timeout: Union[Optional[float], DefaultType] = DEFAULT,
client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT,
otel_span: OpenTelemetrySpan,
) -> TransportApiResponse:
await self._async_call()

Expand Down Expand Up @@ -275,6 +278,7 @@ async def _perform_request( # type: ignore[override,return]
request_body = self.serializers.dumps(
body, mimetype=request_headers["content-type"]
)
otel_span.set_db_statement(request_body)
else:
request_body = None

Expand All @@ -293,6 +297,7 @@ async def _perform_request( # type: ignore[override,return]
node: BaseAsyncNode = self.node_pool.get() # type: ignore[assignment]
start_time = self._loop.time()
try:
otel_span.set_node_metadata(node.host, node.port, node.base_url, target)
resp = await node.perform_request(
method,
target,
Expand Down
82 changes: 74 additions & 8 deletions elastic_transport/_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,94 @@
_tracer = None


# Valid values for the enabled config are 'true' and 'false'. Default is 'true'.
ENABLED_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_ENABLED"
# Describes how to handle search queries in the request body when assigned to
# a span attribute.
# Valid values are 'omit' and 'raw'.
# Default is 'omit' as 'raw' has security implications.
BODY_STRATEGY_ENV_VAR = "OTEL_PYTHON_INSTRUMENTATION_ELASTICSEARCH_CAPTURE_SEARCH_QUERY"
DEFAULT_BODY_STRATEGY = "omit"

# A list of the Elasticsearch endpoints that qualify as "search" endpoints. The search query in
# the request body may be captured for these endpoints, depending on the body capture strategy.
SEARCH_ENDPOINTS = (
"search",
"async_search.submit",
"msearch",
"eql.search",
"esql.query",
"terms_enum",
"search_template",
"msearch_template",
"render_search_template",
)


class OpenTelemetrySpan:
def __init__(self, otel_span: Optional[Span]):
def __init__(
self,
otel_span: Optional[Span],
endpoint_id: Optional[str] = None,
body_strategy: Optional[str] = None,
):
self.otel_span = otel_span
self.body_strategy = body_strategy
self.endpoint_id = endpoint_id

def set_node_metadata(
self, host: str, port: int, base_url: str, target: str
) -> None:
if self.otel_span is None:
return

def set_attribute(self, key: str, value: str) -> None:
if self.otel_span is not None:
self.otel_span.set_attribute(key, value)
# url.full does not contain auth info which is passed as headers
self.otel_span.set_attribute("url.full", base_url + target)
self.otel_span.set_attribute("server.address", host)
self.otel_span.set_attribute("server.port", port)

def set_elastic_cloud_metadata(self, headers: Mapping[str, str]) -> None:
if self.otel_span is None:
return

cluster_name = headers.get("X-Found-Handling-Cluster")
if cluster_name is not None:
self.set_attribute("db.elasticsearch.cluster.name", cluster_name)
self.otel_span.set_attribute("db.elasticsearch.cluster.name", cluster_name)
node_name = headers.get("X-Found-Handling-Instance")
if node_name is not None:
self.set_attribute("db.elasticsearch.node.name", node_name)
self.otel_span.set_attribute("db.elasticsearch.node.name", node_name)

def set_db_statement(self, serialized_body: bytes) -> None:
if self.otel_span is None:
return

if self.body_strategy == "omit":
return
elif self.body_strategy == "raw" and self.endpoint_id in SEARCH_ENDPOINTS:
self.otel_span.set_attribute(
"db.statement", serialized_body.decode("utf-8")
)


class OpenTelemetry:
def __init__(self, enabled: bool | None = None, tracer: trace.Tracer | None = None):
def __init__(
self,
enabled: bool | None = None,
tracer: trace.Tracer | None = None,
body_strategy: str | None = None,
):
if enabled is None:
enabled = os.environ.get(ENABLED_ENV_VAR, "false") != "false"
self.tracer = tracer or _tracer
self.enabled = enabled and self.tracer is not None

if body_strategy is not None:
self.body_strategy = body_strategy
else:
self.body_strategy = os.environ.get(
BODY_STRATEGY_ENV_VAR, DEFAULT_BODY_STRATEGY
)

@contextlib.contextmanager
def span(
self,
Expand All @@ -77,4 +138,9 @@ def span(
otel_span.set_attribute("db.operation", endpoint_id)
for key, value in path_parts.items():
otel_span.set_attribute(f"db.elasticsearch.path_parts.{key}", value)
yield OpenTelemetrySpan(otel_span)

yield OpenTelemetrySpan(
otel_span,
endpoint_id=endpoint_id,
body_strategy=self.body_strategy,
)
14 changes: 9 additions & 5 deletions elastic_transport/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
Urllib3HttpNode,
)
from ._node_pool import NodePool, NodeSelector
from ._otel import OpenTelemetry
from ._otel import OpenTelemetry, OpenTelemetrySpan
from ._serializer import DEFAULT_SERIALIZERS, Serializer, SerializerCollection
from ._version import __version__
from .client_utils import client_meta_version, resolve_default
Expand Down Expand Up @@ -303,8 +303,8 @@ def perform_request(
method,
endpoint_id=resolve_default(endpoint_id, None),
path_parts=resolve_default(path_parts, {}),
) as span:
api_response = self._perform_request(
) as otel_span:
response = self._perform_request(
method,
target,
body=body,
Expand All @@ -314,9 +314,10 @@ def perform_request(
retry_on_timeout=retry_on_timeout,
request_timeout=request_timeout,
client_meta=client_meta,
otel_span=otel_span,
)
span.set_elastic_cloud_metadata(api_response.meta.headers)
return api_response
otel_span.set_elastic_cloud_metadata(response.meta.headers)
return response

def _perform_request( # type: ignore[return]
self,
Expand All @@ -330,6 +331,7 @@ def _perform_request( # type: ignore[return]
retry_on_timeout: Union[bool, DefaultType] = DEFAULT,
request_timeout: Union[Optional[float], DefaultType] = DEFAULT,
client_meta: Union[Tuple[Tuple[str, str], ...], DefaultType] = DEFAULT,
otel_span: OpenTelemetrySpan,
) -> TransportApiResponse:
if headers is DEFAULT:
request_headers = HttpHeaders()
Expand All @@ -356,6 +358,7 @@ def _perform_request( # type: ignore[return]
request_body = self.serializers.dumps(
body, mimetype=request_headers["content-type"]
)
otel_span.set_db_statement(request_body)
else:
request_body = None

Expand All @@ -374,6 +377,7 @@ def _perform_request( # type: ignore[return]
node = self.node_pool.get()
start_time = time.time()
try:
otel_span.set_node_metadata(node.host, node.port, node.base_url, target)
resp = node.perform_request(
method,
target,
Expand Down
68 changes: 63 additions & 5 deletions tests/test_otel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
# specific language governing permissions and limitations
# under the License.

import os

from opentelemetry.sdk.trace import TracerProvider, export
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter

from elastic_transport._otel import OpenTelemetry
from elastic_transport import JsonSerializer
from elastic_transport._otel import ENABLED_ENV_VAR, OpenTelemetry


def setup_tracing():
Expand All @@ -32,6 +34,34 @@ def setup_tracing():
return tracer, memory_exporter


def test_no_span():
# With telemetry disabled, those calls should not raise
otel = OpenTelemetry(enabled=False)
with otel.span(
"GET",
endpoint_id="ml.open_job",
path_parts={"job_id": "my-job"},
) as span:
span.set_db_statement(JsonSerializer().dumps({"timeout": "1m"}))
span.set_node_metadata(
"localhost",
9200,
"http://localhost:9200/",
"_ml/anomaly_detectors/my-job/_open",
)
span.set_elastic_cloud_metadata(
{
"X-Found-Handling-Cluster": "e9106fc68e3044f0b1475b04bf4ffd5f",
"X-Found-Handling-Instance": "instance-0000000001",
}
)


def test_enabled():
otel = OpenTelemetry()
assert otel.enabled == (os.environ.get(ENABLED_ENV_VAR, "false") != "false")


def test_minimal_span():
tracer, memory_exporter = setup_tracing()

Expand All @@ -52,8 +82,17 @@ def test_detailed_span():
tracer, memory_exporter = setup_tracing()
otel = OpenTelemetry(enabled=True, tracer=tracer)
with otel.span(
"GET", endpoint_id="ml.close_job", path_parts={"job_id": "my-job", "foo": "bar"}
"GET",
endpoint_id="ml.open_job",
path_parts={"job_id": "my-job"},
) as span:
span.set_db_statement(JsonSerializer().dumps({"timeout": "1m"}))
span.set_node_metadata(
"localhost",
9200,
"http://localhost:9200/",
"_ml/anomaly_detectors/my-job/_open",
)
span.set_elastic_cloud_metadata(
{
"X-Found-Handling-Cluster": "e9106fc68e3044f0b1475b04bf4ffd5f",
Expand All @@ -63,13 +102,32 @@ def test_detailed_span():

spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
assert spans[0].name == "ml.close_job"
assert spans[0].name == "ml.open_job"
assert spans[0].attributes == {
"http.request.method": "GET",
"url.full": "http://localhost:9200/_ml/anomaly_detectors/my-job/_open",
"server.address": "localhost",
"server.port": 9200,
"db.system": "elasticsearch",
"db.operation": "ml.close_job",
"db.operation": "ml.open_job",
"db.elasticsearch.path_parts.job_id": "my-job",
"db.elasticsearch.path_parts.foo": "bar",
"db.elasticsearch.cluster.name": "e9106fc68e3044f0b1475b04bf4ffd5f",
"db.elasticsearch.node.name": "instance-0000000001",
}


def test_db_statement():
tracer, memory_exporter = setup_tracing()
otel = OpenTelemetry(enabled=True, tracer=tracer, body_strategy="raw")
with otel.span("GET", endpoint_id="search", path_parts={}) as span:
span.set_db_statement(JsonSerializer().dumps({"query": {"match_all": {}}}))

spans = memory_exporter.get_finished_spans()
assert len(spans) == 1
assert spans[0].name == "search"
assert spans[0].attributes == {
"http.request.method": "GET",
"db.system": "elasticsearch",
"db.operation": "search",
"db.statement": '{"query":{"match_all":{}}}',
}

0 comments on commit 989099a

Please sign in to comment.