Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Kafka producer headers to be dict or list #1655

Merged
merged 6 commits into from
Mar 10, 2023

Conversation

mrajashree
Copy link
Contributor

@mrajashree mrajashree commented Feb 10, 2023

Description

Fixes #1364

The package confluent-kafka has a class Producer, with a method produce, which accepts headers as one of the parameters, and this headers parameter can be a list or a dict. (Ref: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#confluent_kafka.Producer.produce)

However, passing a dictionary of headers to Producer when using opentelemetry-instrumentation-confluent-kafka doesn't work, because the KafkaContextSetter's set method tries to append the headers assuming the carrier is a list.

This PR checks the carrier type, and appends the (key, value) tuple in case of a list, or adds the key:value to the dict in case the carrier is a dict.

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

  • I was passing a dictionary of headers to the producer, earlier it used to crash since the code was calling append on a dict. With the changes in this PR it appends the header in case of a list, or adds the header to the existing dict as expected.
  • Added unit tests which test context_setter.set, and assert that the new headers get added to the carrier when it is a list and also when it is a dict.

Does This PR Require a Core Repo Change?

  • Yes. - Link to PR:
  • No.

Checklist:

See contributing.md for styleguide, changelog guidelines, and more.

  • Followed the style guidelines of this project
  • Changelogs have been updated
  • Unit tests have been added
  • Documentation has been updated

@shalevr
Copy link
Member

shalevr commented Feb 10, 2023

Thank you for your contribution.
Don't you need to change the KafkaContextGetter too?
The old implementation expect carrier to be a list

@shalevr
Copy link
Member

shalevr commented Feb 10, 2023

Please add a changelog entry

@mrajashree
Copy link
Contributor Author

@shalevr thanks for reviewing!

Don't you need to change the KafkaContextGetter too?
The old implementation expect carrier to be a list

Since the KafkaContextGetter is being used on record.headers() here, and the headers method returns a list of (key, value) tuples, the get method is working on the list as expected, so I don't think we need to change it

CHANGELOG.md Outdated Show resolved Hide resolved
@mrajashree
Copy link
Contributor Author

@srikanthccv I saw that you had approved the PR which then ran the workflows, but I had to make a change based on the lint output. I made that change and force pushed. Can you approve running the workflows on this again?

@mrajashree
Copy link
Contributor Author

@srikanthccv @shalevr thanks for rebasing with the latest main commits. Do you think we can now merge this PR?

@srikanthccv
Copy link
Member

It will be merged after it gets approvals, especially from the codeowner for the fixes.

CHANGELOG.md Outdated Show resolved Hide resolved
@mrajashree
Copy link
Contributor Author

@shalevr thanks for reviewing!

Don't you need to change the KafkaContextGetter too?
The old implementation expect carrier to be a list

Since the KafkaContextGetter is being used on record.headers() here, and the headers method returns a list of (key, value) tuples, the get method is working on the list as expected, so I don't think we need to change it

regarding this: The issue was only with the KafkaContextSetter.set, since it was being called on headers which is passed by the user and could be a dict or list. And KafkaContextGetter.get is only being called on the output of record.headers() which always returns a list of tuples, hence doesn’t need to check for a dict. But for consistency and to allow future use of this method on a dict and list, I’ve updated the get and keys method on KafkaContextGetter as well to handle both, a dict and list

@mrajashree
Copy link
Contributor Author

It will be merged after it gets approvals, especially from the codeowner for the fixes.

@srikanthccv okay thanks, who are the codeowners and can they be added to this PR’s reviewers list too?

@shalevr since you first reviewed I've added a new commit, can you re-review to see if my changes/comments address your previous review comments? thanks

@mrajashree mrajashree requested review from srikanthccv and shalevr and removed request for shalevr and srikanthccv February 20, 2023 18:03
@srikanthccv
Copy link
Member

@srikanthccv
Copy link
Member

@oxeye-dorkolog, @dorkolog another ping

@srikanthccv srikanthccv enabled auto-merge (squash) March 10, 2023 05:58
@srikanthccv srikanthccv merged commit 88783f9 into open-telemetry:main Mar 10, 2023
tsloughter pushed a commit to tsloughter/opentelemetry-python-contrib that referenced this pull request Mar 13, 2023
* Allow Kafka producer headers to be dict or list

* modify kafka context getter helper methods to work on dict and list

---------

Co-authored-by: Shalev Roda <65566801+shalevr@users.noreply.github.com>
Co-authored-by: Srikanth Chekuri <srikanth.chekuri92@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

confluent-kafka-python produce headers could be dict or a list
3 participants