Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partitioner_cb documentation? #397

Closed
sfrooster opened this issue Apr 10, 2018 · 11 comments
Closed

partitioner_cb documentation? #397

sfrooster opened this issue Apr 10, 2018 · 11 comments
Labels
stale Stale issues

Comments

@sfrooster
Copy link

Documentation on this feature is really thin, so much so that it's not clear to me whether it's actually supported. If it is, my attempts to infer the callback signature from the C code are failing so it'd be useful if there was even a small blurb or example that would describe how to use this.

Currently, all our attempts to use "partitioner_cb" have produced the following output:
Unhandled rejection (<{"name":"our_client_id...>, no stack trace)

@40x
Copy link

40x commented Apr 19, 2018

I am new to Kafka so I could be wrong with my implementation.

For me, the partitioner_cb is not being triggered at all. So whatever logic I wanted to implement in that function (which did not depend on any of the arguments it was providing), I moved it to the place where I am calling the .produce method and passing the partition generated as the second parameter. But this means I need to know ahead of time the number of partitions which may not always be possible.

It will be helpful to know what the issue is and how we can resolve it.

@poolik
Copy link

poolik commented Sep 27, 2018

Ran into this as well, partly documented, but cannot get partitioner_cb to work. @webmakersteve could you give any examples or directions on how to use this?

@alexander-alvarez
Copy link
Contributor

bump @webmakersteve

I just ran into this... been trolling the source of librdkafka and this is what I've found:
I think the issue is that topics are only ever strings.

I don't think ref

  } else {
    // First parameter is a topic OBJECT
    Topic* topic = ObjectWrap::Unwrap<Topic>(info[0].As<v8::Object>());


    // Unwrap it and turn it into an RdKafka::Topic*
    Baton topic_baton = topic->toRDKafkaTopic(producer);

is ever reached because if (info[0]->IsString()) { will always return true since topic is enforced to be a string!

Which doesn't call ref

Baton Topic::toRDKafkaTopic(Connection* handle) {
  if (m_config) {
    return handle->CreateTopic(m_topic_name, m_config);

Which doesn't call ref

topic = RdKafka::Topic::create(m_client, topic_name, conf, errstr);

Which would have called ref

RdKafka::Topic *RdKafka::Topic::create (Handle *base,
					const std::string &topic_str,
					Conf *conf,
					std::string &errstr) {

which ultimately calls
rd_kafka_topic_conf_set_partitioner_cb

@alexander-alvarez
Copy link
Contributor

@webmakersteve does this make sense to you?

How do you suggest remediating this?

@stale
Copy link

stale bot commented Jan 13, 2019

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

@stale stale bot added the stale Stale issues label Jan 13, 2019
@sfrooster
Copy link
Author

Is there an answer here?

@stale stale bot removed the stale Stale issues label Jan 14, 2019
@stale
Copy link

stale bot commented Apr 14, 2019

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

@stale stale bot added the stale Stale issues label Apr 14, 2019
@stale stale bot closed this as completed Apr 21, 2019
@mvataleu
Copy link

Are there any updates? For me, the partitioner_cb is not being triggered at all. ;(

@jondeandres
Copy link

Any update on this? We are having same issue trying to use partitioner_cb

@JForsaken
Copy link

JForsaken commented Dec 5, 2024

Encountering the same issue here as-well -- partitioner_cb is never called CC @GaryWilber @serj026 (tagging you since you look like you're an active maintainer 🥹)

@sfrooster
Copy link
Author

sfrooster commented Dec 23, 2024

I happened to be looking at something else in github and had my attention pulled to this issue I opened 6.5 years ago. I started reacquainting myself with it and for those who find themselves here, I believe but don't guarantee the current state of this to be as follows:

  • the idea behind partitioner_cb is that a developer could create a custom partitioning function (a partitioning function is a stateless producer-side function associated with a topic that takes the key for a given message and returns a valid partition number for its associated topic - this map from key to partition must be consistent for a given number of partitions), pass the custom partitioning function as part of the configuration to the producer(s) which publish to that topic and then the producer, when told to publish a message to the topic, calls the partitioning function passing in the message key, receives back the partition to publish to, publishes the message to the indicated partition and then calls partitioner_cb and passes the indicated partition which the partitioner_cb can then do any number of things but certainly record the partition in an audit log
  • however, in this driver/client while you can configure a custom partitioning function which will be used, but partitioner_cb will never be called so you'll never be told what partition the message was published to
  • in fact, it appears this is largely the case across the landscape of librdkafka wrapper clients
  • I didn't dive too deep into this but it seems to be an issue with reliably propagating the partition indicated (when the custom function executes down in the c++ internals) back up into the wrapper level, though it seems one issue which can hamper this is the wrapper level being able to discern which of several possible c++ threads the partition is coming from
  • you can follow much of this here which is an open issue on confluent's own golang driver
  • there are two primarily recommended "fixes"
    • fork librdkafka or submit a PR which makes your custom partitioning function a standard available option (it appears the issue with allowing custom functions is being able to auto-determine the c++ --> wrapper mapping which isn't an issue if the function is built-in and - at the very least - can be computed in the wrapper, set into the message, then passed in the call to partitioner_cb
    • take librdkafka as it is, write the partitioning function in your code which uses the wrapper level, manually set the partition into the message and then use the indicated/computed partition completely ignoring partitioner_cb. There's a little more work because you have to track the number of partitions yourself etc., but it's essentially the same work as the first option

Bottom-line, I don't see this "issue" being resolved for you/us soon or ever and suggest you evaluate your particular needs and decide which of the three options above - fork, PR or an additional layer/abstraction - serves you best.

As I said, I did a cursory dive in arriving at the above so please do correct me where I'm wrong.

cc: @edenhill as you are, to my knowledge, best positioned to correct my mistakes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
stale Stale issues
Projects
None yet
Development

No branches or pull requests

7 participants