From 2b2166d5a2533bae57050b4deddf7be06cad6c21 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 19 Mar 2024 14:31:16 +0100 Subject: [PATCH 1/3] elasticsearch: don't set body as db statement for bulk requests bulk requests can be too big and diverse to make sense as db statement. Other than that the sanitizer currently only handles dicts so it's crashing. Closes #2150 Co-authored-by: Jason Mobarak Co-authored-by: Quentin Pradet --- .../instrumentation/elasticsearch/__init__.py | 8 +++++--- .../tests/test_elasticsearch.py | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py index dd72a5235e..8f3842deb2 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py @@ -245,9 +245,11 @@ def wrapper(wrapped, _, args, kwargs): if method: attributes["elasticsearch.method"] = method if body: - attributes[SpanAttributes.DB_STATEMENT] = sanitize_body( - body - ) + # Don't set db.statement for bulk requests, as it can be very large + if isinstance(body, dict): + attributes[SpanAttributes.DB_STATEMENT] = sanitize_body( + body + ) if params: attributes["elasticsearch.params"] = str(params) if doc_id: diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py index 0c84cf5cd6..8c5e91b922 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py @@ -486,3 +486,18 @@ def test_body_sanitization(self, _): sanitize_body(json.dumps(sanitization_queries.interval_query)), str(sanitization_queries.interval_query_sanitized), ) + + def test_bulk(self, request_mock): + request_mock.return_value = (1, {}, "") + + es = Elasticsearch() + es.bulk([dict(_op_type="index", _index="sw", _doc_type="_doc", _id=1, doc={"name": "adam"})] * 2) + + spans_list = self.get_finished_spans() + self.assertEqual(len(spans_list), 1) + span = spans_list[0] + + # Check version and name in span's instrumentation info + self.assertEqualSpanInstrumentationInfo( + span, opentelemetry.instrumentation.elasticsearch + ) From e70b6f9578ce7f6644af4f30c6bd75bf3904f07d Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 19 Mar 2024 14:40:36 +0100 Subject: [PATCH 2/3] Update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8797b9992a..7a10579947 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#2151](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/2298)) - Avoid losing repeated HTTP headers ([#2266](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2266)) +- `opentelemetry-instrumentation-elasticsearch` Don't send bulk request body as db statement + ([#2355](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/2355)) ## Version 1.23.0/0.44b0 (2024-02-23) From 2a8fdfefc23bf2f39fc5a404843d34ac3b4aa507 Mon Sep 17 00:00:00 2001 From: Riccardo Magliocchetti Date: Tue, 19 Mar 2024 15:20:20 +0100 Subject: [PATCH 3/3] Please the linter --- .../instrumentation/elasticsearch/__init__.py | 6 +++--- .../tests/test_elasticsearch.py | 21 ++++++++++++++++++- 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py index 8f3842deb2..0f5056de83 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py @@ -247,9 +247,9 @@ def wrapper(wrapped, _, args, kwargs): if body: # Don't set db.statement for bulk requests, as it can be very large if isinstance(body, dict): - attributes[SpanAttributes.DB_STATEMENT] = sanitize_body( - body - ) + attributes[ + SpanAttributes.DB_STATEMENT + ] = sanitize_body(body) if params: attributes["elasticsearch.params"] = str(params) if doc_id: diff --git a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py index 8c5e91b922..6008108d79 100644 --- a/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py +++ b/instrumentation/opentelemetry-instrumentation-elasticsearch/tests/test_elasticsearch.py @@ -51,6 +51,8 @@ Article = helpers.Article +# pylint: disable=too-many-public-methods + @mock.patch( "elasticsearch.connection.http_urllib3.Urllib3HttpConnection.perform_request" @@ -491,7 +493,24 @@ def test_bulk(self, request_mock): request_mock.return_value = (1, {}, "") es = Elasticsearch() - es.bulk([dict(_op_type="index", _index="sw", _doc_type="_doc", _id=1, doc={"name": "adam"})] * 2) + es.bulk( + [ + { + "_op_type": "index", + "_index": "sw", + "_doc_type": "_doc", + "_id": 1, + "doc": {"name": "adam"}, + }, + { + "_op_type": "index", + "_index": "sw", + "_doc_type": "_doc", + "_id": 1, + "doc": {"name": "adam"}, + }, + ] + ) spans_list = self.get_finished_spans() self.assertEqual(len(spans_list), 1)