Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partitioner callback in producer #16

Open
0x1997 opened this issue Nov 22, 2016 · 18 comments
Open

Support partitioner callback in producer #16

0x1997 opened this issue Nov 22, 2016 · 18 comments

Comments

@0x1997
Copy link

0x1997 commented Nov 22, 2016

No description provided.

@edenhill
Copy link
Contributor

Out of curiousity, can you share your custom partitioner?
It might be generic enough to be integrated.

@0x1997
Copy link
Author

0x1997 commented Dec 1, 2016

@edenhill It's application specific. Basically custom_hash(static_cast<State*>(msg_opaque)) % partition_cnt in C++.

@edenhill
Copy link
Contributor

Okay, what you'll need to do in the meantime is get the partition count (with GetMetadata()) for your topic(s) and then run your partitioner prior to calling Produce() and setting Message.Partition accordingly.

@edenhill
Copy link
Contributor

edenhill commented Dec 11, 2016

The problem with implementing partitioner_cb support in high-level language bindings is that the partitioner callback may be called from an internal librdkafka thread and this isn't trivial to handle in cgo, cpython, et, al.
This should be fixed in librdkafka, rather than the bindings, but this isnt trivial either, that's why this functionality is currently missing from our bindings.

But here's a dumb idea:
what if you, as a Go app developer, implemented the partitioner in C (cgo) and the Go client provided an API to set the C partitioner_cb.
You wouldn't be allowed to call any Go methods from this callback, but since partitioners are pretty minimal by design this might be okay.

It would look something like this:

/*
#include <librdkafka/rdkafka.h>

static int32_t my_partitioner (rd_kafka_topic_t *rkt, ..) {
    ..some custom hasher goes here..
     return hash % partition_cnt;
}
*/
import "C"

...

  conf := ConfigMap{..., "default.topic.config": &ConfigMap{"partitioner_cb", C.my_partitioner}}
 p, err := NewProducer(conf)
...

I know it is ugly, but would it be a reasonable workaround for you until proper Go partitioners are supported?

@tchap
Copy link

tchap commented Aug 16, 2017

I wouldn't even need an entirely custom partitioner, I would just need the same key to always go to the same partition. Perhaps it would be possible to bake in something like that, for example by adding another constant besides PartitionAny... ?

@edenhill
Copy link
Contributor

@tchap the default partitioner is Consistent-Random, which maps the same key to the same partition, so you should be fine.
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka.h#L1606

@edenhill
Copy link
Contributor

edenhill commented Jan 3, 2018

The next version of librdkafka (the underlying Kafka client) will expose the builtin partitioners as configuration properties, allowing you to change to an alternative builtin partitioner, such as the Java compatible murmur2_random partitioner.

Search for 'partitioner' here: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

Custom partitioners are not yet exposed in the Go client though.

@billygout
Copy link
Contributor

billygout commented Mar 12, 2018

@edenhill

Custom partitioners are not yet exposed in the Go client though.

I have a usecase which really needs this, and I'm trying any kind of workaround and it seems not to be easy.
First, I tried the workaround mentioned here #16 (comment) , but I get the error:

Failed to create producer: Invalid value type unsafe.Pointer for key partitioner_cb (expected string,bool,int,ConfigMap)

Then, I casted the pointer to an uintptr, and then, casted again to int, and then I ran into a runtime exception saying:

Failed to create producer: Property "partitioner_cb" must be set through dedicated ..set..() function

At this point, I stopped trying, but it looks like it wants me to use rd_kafka_topic_conf_set_partitioner_cb() through some more CGo magic, but I gave up since I didn't know what to put in the first param to that function, which is rd_kafka_topic_conf_t *topic_conf, and I don't know how to get the rd_kafka_topic_conf_t in the context of my golang program.

Do you have any tips for completing this workaround?
Thanks!

@edenhill
Copy link
Contributor

Due to the generic way configuration is passed from Go to C it is a bit tricky to add a special case for set_partitioner_cb(), so for the sake of proof-of-concepting I suggest you insert a call to ..set_partitioner_cb() with a hardcoded C-function callback here, right before rd_kafka_conf_set_default_topic_conf():
https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/config.go#L149

Do note that this callback may be called from internal librdkafka threads and it is not clear to me how to safely trigger a Go call from such a thread.

@billygout
Copy link
Contributor

billygout commented Mar 12, 2018

Thank you, @edenhill ! What I actually need is not generic custom partitioner. Rather, what I'm looking for is a way to hash based on something other than the kafka key, since I'm using the key for other purposes. It looks like hashing based on the msg_opaque would work for me.

                                                const rd_kafka_topic_t *rkt,
                                                const void *keydata,
                                                size_t keylen,
                                                int32_t partition_cnt,
                                                void *rkt_opaque,
                                                void *msg_opaque

as long as I can set the msg_opaque at the golang level and it looks like i can (https://github.com/confluentinc/confluent-kafka-go/blob/master/kafka/message.go#L79).

Would you accept a PR to add a new builtin partitioner to librdkafka, like this:
consistent_opaque - CRC32 hash of msg_opaque (Empty and NULL msg_opaque are mapped to single partition) ? Although I'm not sure if the CRC32 should be applied to the pointer address msg_opaque, or "the whole data behind the pointer, and since it doesn't have a corresponding length argument, the msg_opaque would have to point to a c-string so the CRC32 knows how far to read...

@edenhill
Copy link
Contributor

I strongly advise you to stick to the existing semantics of keys in Kafka, they are used for partitioning and compaction.
If you need additional data with your message you can either create a richer message payload (using for example avro and schema-registry), or use message headers to "tag along" arbitary data to your liking.

We will not accept a PR that does partitioning on something else than the key, sorry.

Also, the msg_opaque is used by the Go client internally to map C messages back to Go messages.

@billygout
Copy link
Contributor

billygout commented Mar 12, 2018

Also, the msg_opaque is used by the Go client internally to map C messages back to Go messages.
Ah, that would certainly kill the idea.

@billygout
Copy link
Contributor

billygout commented Mar 12, 2018

I strongly advise you to stick to the existing semantics of keys in Kafka, they are used for partitioning and compaction.

This ship has sailed for a while at my company. In the future when we upgrade to kafka 0.11, we'll probably stick this metadata in the 0.11+ Headers. Additionally, we have no use for compaction and have it turned off for our use case.

@billygout
Copy link
Contributor

billygout commented Mar 12, 2018

or use message headers to "tag along" arbitary data to your liking.

This is a possibility too. I will re-visit that. It was my first option, but ran into social problems :)

@Manicben
Copy link

We would love to have this available in the producer, as we're moving from Sarama to confluent-kafka-go, but we still need to support Sarama's default partitioner, which uses the 32 bit FNV-1a hashing algorithm (part of hash/fnv in Go).

For now we will follow @edenhill's advice (i.e. get topic metadata, run custom Go partitioner prior to Produce and set Message.Partition), but it would still be nice to be able to have custom partitioners be supported in some form in the Producer API (either Go or C). Although, I may just try my hand at adding the FNV-1a algo to librdkafka...

@edenhill
Copy link
Contributor

Yeah, the simplest alternative is to add it as a builtin partitioner to librdkafka.
Look's like the fnv1a code is very simple, so should be fairly straight forward.

Find all occurences of murmur2_random in the librdkafka/src code and add fnv1a_random counterparts.
https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_msg.c#L870

@maeglindeveloper
Copy link

Aggree with @Manicben :) could be great to have that included in the producer, rather than to have to do a custom go partitioner prior to the Produce.
Is there a chance to see that feature in the future ?
Btw @edenhill thanks for all the good work 👍

@eiSouthBoy
Copy link

eiSouthBoy commented Nov 13, 2023

@edenhill hello. I have an idea that the message of key="a" send to partition-0 and other message of key="b" send to partition-1. the other message ...
after i debuged, it don't work. my code of partition_cb:
`static int32_t partitioner_cb(const rd_kafka_topic_t *rkt, const void *keydata,
size_t keylen, int32_t partition_cnt,
void *rkt_opaque, void msg_opaque)
{
/
this is a simple example */
int32_t partition = 0;
if (keylen <= 0)
return partition;
if (strncmp(keydata, "a", 1) == 0)
partition = 0;
else if (strncmp(keydata, "b", 1) == 0)
partition = 1;
else if (strncmp(keydata, "c", 1) == 0)
partition = 2;
else if (strncmp(keydata, "d", 1) == 0)
partition = 3;

return partition;

}

int err = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY,
(void *)buf, len, (void *)key, strlen(key), NULL);
`
could you help me? thanks

rayokota added a commit that referenced this issue Aug 27, 2024
* Add CSFLE tests with pre-canned data

* Minor cleanup

* Fix tests
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

7 participants