From 6fe229a369845c8ef7465f407c1ee3ff5434554c Mon Sep 17 00:00:00 2001 From: Manicben Date: Mon, 9 Mar 2020 17:01:23 +0000 Subject: [PATCH] partitioner: Add FNV-1a partitioner (#2724) Adds a new partitioner using the FNV-1a hashing algorithm, with some tweaks to match Sarama's default hashing partitioner behaviour. Main use case is for users switching from Sarama to librdkafka (or confluent-kafka-go) and wanting to maintain ordering guarantees. --- CONFIGURATION.md | 2 +- LICENSE.fnv1a | 18 +++++++ LICENSES.txt | 22 ++++++++ src/CMakeLists.txt | 1 + src/Makefile | 2 +- src/rdfnv1a.c | 112 +++++++++++++++++++++++++++++++++++++++ src/rdfnv1a.h | 35 ++++++++++++ src/rdkafka.h | 43 +++++++++++++++ src/rdkafka_conf.c | 10 +++- src/rdkafka_msg.c | 25 +++++++++ src/rdkafka_topic.c | 4 ++ src/rdunittest.c | 12 +++-- tests/0048-partitioner.c | 15 ++++++ win32/librdkafka.vcxproj | 1 + 14 files changed, 293 insertions(+), 9 deletions(-) create mode 100644 LICENSE.fnv1a create mode 100644 src/rdfnv1a.c create mode 100644 src/rdfnv1a.h diff --git a/CONFIGURATION.md b/CONFIGURATION.md index 8fe84075fe..3029e48abb 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -145,7 +145,7 @@ message.timeout.ms | P | 0 .. 2147483647 | 300000 delivery.timeout.ms | P | 0 .. 2147483647 | 300000 | high | Alias for `message.timeout.ms`: Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time librdkafka may use to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. The message timeout is automatically adjusted to `transaction.timeout.ms` if `transactional.id` is configured.
*Type: integer* queuing.strategy | P | fifo, lifo | fifo | low | **EXPERIMENTAL**: subject to change or removal. **DEPRECATED** Producer queuing strategy. FIFO preserves produce ordering, while LIFO prioritizes new messages.
*Type: enum value* produce.offset.report | P | true, false | false | low | **DEPRECATED** No longer used.
*Type: boolean* -partitioner | P | | consistent_random | high | Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.).
*Type: string* +partitioner | P | | consistent_random | high | Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.), `fnv1a` - FNV-1a hash of key (NULL keys are mapped to single partition), `fnv1a_random` - FNV-1a hash of key (NULL keys are randomly partitioned).
*Type: string* partitioner_cb | P | | | low | Custom partitioner callback (set with rd_kafka_topic_conf_set_partitioner_cb())
*Type: see dedicated API* msg_order_cmp | P | | | low | **EXPERIMENTAL**: subject to change or removal. **DEPRECATED** Message queue ordering comparator (set with rd_kafka_topic_conf_set_msg_order_cmp()). Also see `queuing.strategy`.
*Type: see dedicated API* opaque | * | | | low | Application opaque (set with rd_kafka_topic_conf_set_opaque())
*Type: see dedicated API* diff --git a/LICENSE.fnv1a b/LICENSE.fnv1a new file mode 100644 index 0000000000..a8c4f87515 --- /dev/null +++ b/LICENSE.fnv1a @@ -0,0 +1,18 @@ +parts of src/rdfnv1a.c: http://www.isthe.com/chongo/src/fnv/hash_32a.c + + +Please do not copyright this code. This code is in the public domain. + +LANDON CURT NOLL DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, +INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO +EVENT SHALL LANDON CURT NOLL BE LIABLE FOR ANY SPECIAL, INDIRECT OR +CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF +USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +PERFORMANCE OF THIS SOFTWARE. + +By: + chongo /\oo/\ + http://www.isthe.com/chongo/ + +Share and Enjoy! :-) diff --git a/LICENSES.txt b/LICENSES.txt index fff8e96b7b..6af55f25d1 100644 --- a/LICENSES.txt +++ b/LICENSES.txt @@ -59,6 +59,28 @@ LICENSE.crc32c */ +LICENSE.fnv1a +-------------------------------------------------------------- +parts of src/rdfnv1a.c: http://www.isthe.com/chongo/src/fnv/hash_32a.c + + +Please do not copyright this code. This code is in the public domain. + +LANDON CURT NOLL DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, +INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO +EVENT SHALL LANDON CURT NOLL BE LIABLE FOR ANY SPECIAL, INDIRECT OR +CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF +USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR +OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +PERFORMANCE OF THIS SOFTWARE. + +By: + chongo /\oo/\ + http://www.isthe.com/chongo/ + +Share and Enjoy! :-) + + LICENSE.hdrhistogram -------------------------------------------------------------- This license covers src/rdhdrhistogram.c which is a C port of diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0dc7fd438a..4b6f8e2244 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -7,6 +7,7 @@ set( rdavl.c rdbuf.c rdcrc32.c + rdfnv1a.c rdkafka.c rdkafka_assignor.c rdkafka_broker.c diff --git a/src/Makefile b/src/Makefile index 793cccefdc..d76079b6a6 100644 --- a/src/Makefile +++ b/src/Makefile @@ -39,7 +39,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \ rdkafka_partition.c rdkafka_subscription.c \ rdkafka_assignor.c rdkafka_range_assignor.c \ rdkafka_roundrobin_assignor.c rdkafka_feature.c \ - rdcrc32.c crc32c.c rdmurmur2.c rdaddr.c rdrand.c rdlist.c \ + rdcrc32.c crc32c.c rdmurmur2.c rdfnv1a.c rdaddr.c rdrand.c rdlist.c \ tinycthread.c tinycthread_extra.c \ rdlog.c rdstring.c rdkafka_event.c rdkafka_metadata.c \ rdregex.c rdports.c rdkafka_metadata_cache.c rdavl.c \ diff --git a/src/rdfnv1a.c b/src/rdfnv1a.c new file mode 100644 index 0000000000..34feffae88 --- /dev/null +++ b/src/rdfnv1a.c @@ -0,0 +1,112 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2012-2020, Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#include "rd.h" +#include "rdunittest.h" +#include "rdfnv1a.h" + + +/* FNV-1a by Glenn Fowler, Landon Curt Noll, and Kiem-Phong Vo + * + * Based on http://www.isthe.com/chongo/src/fnv/hash_32a.c + * with librdkafka modifications to match the Sarama default Producer implementation, + * as seen here: https://github.com/Shopify/sarama/blob/master/partitioner.go#L203 + * Note that this implementation is only compatible with Sarama's default + * NewHashPartitioner and not NewReferenceHashPartitioner. + */ +uint32_t rd_fnv1a (const void *key, size_t len) { + const uint32_t prime = 0x01000193; // 16777619 + const uint32_t offset = 0x811C9DC5; // 2166136261 + size_t i; + int32_t h = offset; + + const unsigned char *data = (const unsigned char *)key; + + for (i = 0; i < len; i++) { + h ^= data[i]; + h *= prime; + } + + /* Take absolute value to match the Sarama NewHashPartitioner implementation */ + if (h < 0) { + h = -h; + } + + return (uint32_t)h; +} + + +/** + * @brief Unittest for rd_fnv1a() + */ +int unittest_fnv1a (void) { + const char *short_unaligned = "1234"; + const char *unaligned = "PreAmbleWillBeRemoved,ThePrePartThatIs"; + const char *keysToTest[] = { + "kafka", + "giberish123456789", + short_unaligned, + short_unaligned+1, + short_unaligned+2, + short_unaligned+3, + unaligned, + unaligned+1, + unaligned+2, + unaligned+3, + "", + NULL, + }; + + // Acquired via https://play.golang.org/p/vWIhw3zJINA + const int32_t golang_hashfnv_results[] = { + 0xd33c4e1, // kafka + 0x77a58295, // giberish123456789 + 0x23bdd03, // short_unaligned + 0x2dea3cd2, // short_unaligned+1 + 0x740fa83e, // short_unaligned+2 + 0x310ca263, // short_unaligned+3 + 0x65cbd69c, // unaligned + 0x6e49c79a, // unaligned+1 + 0x69eed356, // unaligned+2 + 0x6abcc023, // unaligned+3 + 0x7ee3623b, // "" + 0x7ee3623b, // NULL + }; + + size_t i; + for (i = 0; i < RD_ARRAYSIZE(keysToTest); i++) { + uint32_t h = rd_fnv1a(keysToTest[i], + keysToTest[i] ? + strlen(keysToTest[i]) : 0); + RD_UT_ASSERT((int32_t)h == golang_hashfnv_results[i], + "Calculated FNV-1a hash 0x%x for \"%s\", " + "expected 0x%x", + h, keysToTest[i], golang_hashfnv_results[i]); + } + RD_UT_PASS(); +} diff --git a/src/rdfnv1a.h b/src/rdfnv1a.h new file mode 100644 index 0000000000..bd6e06ddc2 --- /dev/null +++ b/src/rdfnv1a.h @@ -0,0 +1,35 @@ +/* + * librdkafka - Apache Kafka C library + * + * Copyright (c) 2020 Magnus Edenhill + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef __RDFNV1A___H__ +#define __RDFNV1A___H__ + +uint32_t rd_fnv1a (const void *key, size_t len); +int unittest_fnv1a (void); + +#endif // __RDFNV1A___H__ diff --git a/src/rdkafka.h b/src/rdkafka.h index 1144e75912..250c231578 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -2528,6 +2528,49 @@ int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt, void *msg_opaque); +/** + * @brief FNV-1a partitioner. + * + * Uses consistent hashing to map identical keys onto identical partitions + * using FNV-1a hashing. + * + * The \p rkt_opaque argument is the opaque set by + * rd_kafka_topic_conf_set_opaque(). + * The \p msg_opaque argument is the per-message opaque + * passed to produce(). + * + * @returns a partition between 0 and \p partition_cnt - 1. + */ +RD_EXPORT +int32_t rd_kafka_msg_partitioner_fnv1a (const rd_kafka_topic_t *rkt, + const void *key, size_t keylen, + int32_t partition_cnt, + void *rkt_opaque, + void *msg_opaque); + + +/** + * @brief Consistent-Random FNV-1a partitioner. + * + * Uses consistent hashing to map identical keys onto identical partitions + * using FNV-1a hashing. + * Messages without keys will be assigned via the random partitioner. + * + * The \p rkt_opaque argument is the opaque set by + * rd_kafka_topic_conf_set_opaque(). + * The \p msg_opaque argument is the per-message opaque + * passed to produce(). + * + * @returns a partition between 0 and \p partition_cnt - 1. + */ +RD_EXPORT +int32_t rd_kafka_msg_partitioner_fnv1a_random (const rd_kafka_topic_t *rkt, + const void *key, size_t keylen, + int32_t partition_cnt, + void *rkt_opaque, + void *msg_opaque); + + /**@}*/ diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c index aa335be8a4..a6cb1246f9 100644 --- a/src/rdkafka_conf.c +++ b/src/rdkafka_conf.c @@ -214,7 +214,9 @@ rd_kafka_conf_validate_partitioner (const struct rd_kafka_property *prop, !strcmp(val, "consistent") || !strcmp(val, "consistent_random") || !strcmp(val, "murmur2") || - !strcmp(val, "murmur2_random"); + !strcmp(val, "murmur2_random") || + !strcmp(val, "fnv1a") || + !strcmp(val, "fnv1a_random"); } @@ -1288,7 +1290,11 @@ static const struct rd_kafka_property rd_kafka_properties[] = { "`consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), " "`consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), " "`murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), " - "`murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.).", + "`murmur2_random` - Java Producer compatible Murmur2 hash of key " + "(NULL keys are randomly partitioned. This is functionally equivalent " + "to the default partitioner in the Java Producer.), " + "`fnv1a` - FNV-1a hash of key (NULL keys are mapped to single partition), " + "`fnv1a_random` - FNV-1a hash of key (NULL keys are randomly partitioned).", .sdef = "consistent_random", .validate = rd_kafka_conf_validate_partitioner }, { _RK_TOPIC|_RK_PRODUCER, "partitioner_cb", _RK_C_PTR, diff --git a/src/rdkafka_msg.c b/src/rdkafka_msg.c index 832267efbb..48a62ac059 100644 --- a/src/rdkafka_msg.c +++ b/src/rdkafka_msg.c @@ -36,6 +36,7 @@ #include "rdkafka_idempotence.h" #include "rdkafka_txnmgr.h" #include "rdcrc32.h" +#include "rdfnv1a.h" #include "rdmurmur2.h" #include "rdrand.h" #include "rdtime.h" @@ -883,6 +884,30 @@ int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt, return (rd_murmur2(key, keylen) & 0x7fffffff) % partition_cnt; } +int32_t rd_kafka_msg_partitioner_fnv1a (const rd_kafka_topic_t *rkt, + const void *key, size_t keylen, + int32_t partition_cnt, + void *rkt_opaque, + void *msg_opaque) { + return rd_fnv1a(key, keylen) % partition_cnt; +} + +int32_t rd_kafka_msg_partitioner_fnv1a_random (const rd_kafka_topic_t *rkt, + const void *key, size_t keylen, + int32_t partition_cnt, + void *rkt_opaque, + void *msg_opaque) { + if (!key) + return rd_kafka_msg_partitioner_random(rkt, + key, + keylen, + partition_cnt, + rkt_opaque, + msg_opaque); + else + return rd_fnv1a(key, keylen) % partition_cnt; +} + /** * @brief Assigns a message to a topic partition using a partitioner. diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index 51ceaeb7f1..40c7cc1b39 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -301,6 +301,10 @@ shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk, (void *)rd_kafka_msg_partitioner_murmur2 }, { "murmur2_random", (void *)rd_kafka_msg_partitioner_murmur2_random }, + { "fnv1a", + (void *)rd_kafka_msg_partitioner_fnv1a }, + { "fnv1a_random", + (void *)rd_kafka_msg_partitioner_fnv1a_random }, { NULL } }; int i; diff --git a/src/rdunittest.c b/src/rdunittest.c index 3f8eb609ca..44fcb00a45 100644 --- a/src/rdunittest.c +++ b/src/rdunittest.c @@ -37,6 +37,7 @@ #include "rdbuf.h" #include "crc32c.h" #include "rdmurmur2.h" +#include "rdfnv1a.h" #if WITH_HDRHISTOGRAM #include "rdhdrhistogram.h" #endif @@ -444,12 +445,13 @@ int rd_unittest (void) { const char *name; int (*call) (void); } unittests[] = { - { "sysqueue", unittest_sysqueue }, - { "rdbuf", unittest_rdbuf }, - { "rdvarint", unittest_rdvarint }, - { "crc32c", unittest_crc32c }, - { "msg", unittest_msg }, + { "sysqueue", unittest_sysqueue }, + { "rdbuf", unittest_rdbuf }, + { "rdvarint", unittest_rdvarint }, + { "crc32c", unittest_crc32c }, + { "msg", unittest_msg }, { "murmurhash", unittest_murmur2 }, + { "fnv1a", unittest_fnv1a }, #if WITH_HDRHISTOGRAM { "rdhdrhistogram", unittest_rdhdrhistogram }, #endif diff --git a/tests/0048-partitioner.c b/tests/0048-partitioner.c index 2f05cb1b04..fcc61cd9c6 100644 --- a/tests/0048-partitioner.c +++ b/tests/0048-partitioner.c @@ -268,6 +268,21 @@ static void do_test_partitioners (void) { 0x4f7703da % part_cnt, 0x5ec19395 % part_cnt } }, + { "fnv1a", { + /* .. using https://play.golang.org/p/hRkA4xtYyJ6 */ + 0x7ee3623b % part_cnt, + 0x7ee3623b % part_cnt, + 0x27e6f469 % part_cnt, + 0x155e3e5f % part_cnt, + 0x17b1e27a % part_cnt + } }, + { "fnv1a_random", { + -1, + 0x7ee3623b % part_cnt, + 0x27e6f469 % part_cnt, + 0x155e3e5f % part_cnt, + 0x17b1e27a % part_cnt + } }, { NULL } }; int pi; diff --git a/win32/librdkafka.vcxproj b/win32/librdkafka.vcxproj index 1add307a6c..e1ce5a73b3 100644 --- a/win32/librdkafka.vcxproj +++ b/win32/librdkafka.vcxproj @@ -168,6 +168,7 @@ +