Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Search] Fix sanitization for bulk queries #1990

Closed
wants to merge 12 commits into from

Conversation

GilTeixeira
Copy link

@GilTeixeira GilTeixeira commented Oct 6, 2023

Description

Fix sanitization for bulk queries for newer versions (>7.x). Continues the work of #1870

Fixes #1868

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

from opentelemetry.instrumentation.elasticsearch import ElasticsearchInstrumentor
from elasticsearch import Elasticsearch, helpers
 
ElasticsearchInstrumentor().instrument()
 
es = Elasticsearch(["http://localhost:9200"])
 
 
def gendata():
    mywords = ["foo", "bar", "baz"]
    for word in mywords:
        yield {
            "_index": "mywords",
            "word": word,
        }
 
 
helpers.bulk(es, gendata())

Currently it fails with this traceback:

Traceback (most recent call last):
  File "/Users/gil.teixeira/dev/elastic-telemetry/a.py", line 18, in <module>
    helpers.bulk(es, gendata())
  File "/Users/gil.teixeira/dev/elastic-telemetry/venv/lib/python3.11/site-packages/elasticsearch/helpers/actions.py", line 521, in bulk
    for ok, item in streaming_bulk(
  File "/Users/gil.teixeira/dev/elastic-telemetry/venv/lib/python3.11/site-packages/elasticsearch/helpers/actions.py", line 436, in streaming_bulk
    for data, (ok, info) in zip(
  File "/Users/gil.teixeira/dev/elastic-telemetry/venv/lib/python3.11/site-packages/elasticsearch/helpers/actions.py", line 339, in _process_bulk_chunk
    resp = client.bulk(*args, operations=bulk_actions, **kwargs)  # type: ignore[arg-type]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/gil.teixeira/dev/elastic-telemetry/venv/lib/python3.11/site-packages/elasticsearch/_sync/client/utils.py", line 414, in wrapped
    return api(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^
  File "/Users/gil.teixeira/dev/elastic-telemetry/venv/lib/python3.11/site-packages/elasticsearch/_sync/client/__init__.py", line 708, in bulk
    return self.perform_request(  # type: ignore[return-value]
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/gil.teixeira/dev/elastic-telemetry/venv/lib/python3.11/site-packages/elasticsearch/_sync/client/_base.py", line 285, in perform_request
    meta, resp_body = self.transport.perform_request(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/gil.teixeira/dev/elastic-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/__init__.py", line 242, in wrapper
    attributes[SpanAttributes.DB_STATEMENT] = sanitize_body(
                                              ^^^^^^^^^^^^^^
  File "/Users/gil.teixeira/dev/elastic-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/utils.py", line 58, in sanitize_body
    flatten_body = _flatten_dict(body)
                   ^^^^^^^^^^^^^^^^^^^
  File "/Users/gil.teixeira/dev/elastic-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-elasticsearch/src/opentelemetry/instrumentation/elasticsearch/utils.py", line 31, in _flatten_dict
    for k, v in d.items():
                ^^^^^^^
AttributeError: 'list' object has no attribute 'items'

Does This PR Require a Core Repo Change?

  • Yes. - Link to PR:
  • No.

Checklist:

See contributing.md for styleguide, changelog guidelines, and more.

  • Followed the style guidelines of this project
  • Changelogs have been updated
  • Unit tests have been added
  • Documentation has been updated

@GilTeixeira GilTeixeira requested a review from a team October 6, 2023 17:10
@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Oct 6, 2023

CLA Signed

The committers listed above are authorized under a signed CLA.

@ocelotl
Copy link
Contributor

ocelotl commented Nov 22, 2023

@GilTeixeira please add at least one test case for this changes :V:

@anpr
Copy link

anpr commented Feb 2, 2024

This is blocking our upgrade to opentelemetry, could somebody please review and merge this?

@GilTeixeira
Copy link
Author

@ocelotl I added a test. And this works with elasticsearch==8.x, since the body is a List[bytes].
But this will still throw an exception with elasticsearch<=7.x, since the body that is passed is formed like this: resp = client.bulk(*args, body="\n".join(bulk_actions) + "\n", **kwargs)
I am not sure what to do in these cases, do I wrap the whole sanitize_body in a try except?

Copy link
Member

@srikanthccv srikanthccv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But this will still throw an exception with elasticsearch<=7.x .... I am not sure what to do in these cases

Inspect the version of the elasticsearch (import VERSION from elasticsearch) and use appropriate logic for sanitization. Something like

CHANGELOG.md Outdated Show resolved Hide resolved
@GilTeixeira
Copy link
Author

@srikanthccv I think you forgot to add the suggestion 😛
I splitted the string by newline and sanitize each line, lmk if you have a better idea

@anpr
Copy link

anpr commented Feb 28, 2024

Would be great to have the final steps to be taken soon, thanks 🙏

@oliverholworthy
Copy link

Looks like this has been addressed by #2355 and this PR can be now closed

@srikanthccv
Copy link
Member

Thanks @oliverholworthy. Please check with the latest release and close if it fixes your issue @GilTeixeira

@GilTeixeira
Copy link
Author

That's it, good work 🚀

@GilTeixeira GilTeixeira closed this Apr 3, 2024
@anpr
Copy link

anpr commented Apr 5, 2024

Thanks a lot 🙏 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Elasticsearch sanitization does not work for bulk queries
5 participants