diff --git a/CHANGELOG.md b/CHANGELOG.md index 3157bca33e..e338bd768f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2132](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2132)) - `opentelemetry-resource-detector-azure` Changed timeout to 4 seconds due to [timeout bug](https://github.com/open-telemetry/opentelemetry-python/issues/3644) ([#2136](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2136)) +- Fix elastic-search instrumentation sanitization to support bulk queries + ([#1990](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1990)) ## Version 1.22.0/0.43b0 (2023-12-14) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/utils.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/utils.py index 97f2bc3b87..8d91d966e0 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/utils.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/utils.py @@ -52,9 +52,19 @@ def _unflatten_dict(d): def sanitize_body(body) -> str: + if isinstance(body, bytes): + body = body.decode("utf8") + if isinstance(body, str): + body_lines = body.strip().split("\n") + if len(body_lines) > 1: + return sanitize_body(body_lines) + body = json.loads(body) + if isinstance(body, list): + return str([sanitize_body(elem) for elem in body]) + flatten_body = _flatten_dict(body) for key in flatten_body: diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py index 0c84cf5cd6..c4d4033017 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py @@ -486,3 +486,51 @@ def test_body_sanitization(self, _): sanitize_body(json.dumps(sanitization_queries.interval_query)), str(sanitization_queries.interval_query_sanitized), ) + self.assertEqual( + sanitize_body( + [ + json.dumps(sanitization_queries.filter_query).encode("utf-8"), + json.dumps(sanitization_queries.match_query).encode("utf-8"), + json.dumps(sanitization_queries.interval_query).encode("utf-8"), + ] + ), + str( + [ + str(sanitization_queries.filter_query_sanitized), + str(sanitization_queries.match_query_sanitized), + str(sanitization_queries.interval_query_sanitized), + ] + ), + ) + + def test_bulk_search(self, request_mock): + request_mock.return_value = (2, {}, json.dumps({"items": []})) + + data = [ + { + "_index": "words", + "word": "foo", + }, + { + "_index": "words", + "word": "bar", + }, + ] + client = Elasticsearch() + elasticsearch.helpers.bulk(client, data) + + spans = self.get_finished_spans() + span = spans[0] + self.assertEqual(1, len(spans)) + self.assertEqual(span.name, "Elasticsearch/_bulk") + self.assertIsNotNone(span.end_time) + expected_bulk_attributes = { + SpanAttributes.DB_SYSTEM: "elasticsearch", + "elasticsearch.url": "/_bulk", + "elasticsearch.method": "POST", + SpanAttributes.DB_STATEMENT: "[\"{'index': {'_index': 'words'}}\", \"{'word': 'foo'}\", \"{'index': {'_index': 'words'}}\", \"{'word': 'bar'}\"]", + } + self.assertEqual( + span.attributes, + expected_bulk_attributes, + )