diff --git a/CONFIGURATION.md b/CONFIGURATION.md
index 8fe84075fe..3029e48abb 100644
--- a/CONFIGURATION.md
+++ b/CONFIGURATION.md
@@ -145,7 +145,7 @@ message.timeout.ms | P | 0 .. 2147483647 | 300000
delivery.timeout.ms | P | 0 .. 2147483647 | 300000 | high | Alias for `message.timeout.ms`: Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time librdkafka may use to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. The message timeout is automatically adjusted to `transaction.timeout.ms` if `transactional.id` is configured.
*Type: integer*
queuing.strategy | P | fifo, lifo | fifo | low | **EXPERIMENTAL**: subject to change or removal. **DEPRECATED** Producer queuing strategy. FIFO preserves produce ordering, while LIFO prioritizes new messages.
*Type: enum value*
produce.offset.report | P | true, false | false | low | **DEPRECATED** No longer used.
*Type: boolean*
-partitioner | P | | consistent_random | high | Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.).
*Type: string*
+partitioner | P | | consistent_random | high | Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.), `fnv1a` - FNV-1a hash of key (NULL keys are mapped to single partition), `fnv1a_random` - FNV-1a hash of key (NULL keys are randomly partitioned).
*Type: string*
partitioner_cb | P | | | low | Custom partitioner callback (set with rd_kafka_topic_conf_set_partitioner_cb())
*Type: see dedicated API*
msg_order_cmp | P | | | low | **EXPERIMENTAL**: subject to change or removal. **DEPRECATED** Message queue ordering comparator (set with rd_kafka_topic_conf_set_msg_order_cmp()). Also see `queuing.strategy`.
*Type: see dedicated API*
opaque | * | | | low | Application opaque (set with rd_kafka_topic_conf_set_opaque())
*Type: see dedicated API*
diff --git a/LICENSE.fnv1a b/LICENSE.fnv1a
new file mode 100644
index 0000000000..a8c4f87515
--- /dev/null
+++ b/LICENSE.fnv1a
@@ -0,0 +1,18 @@
+parts of src/rdfnv1a.c: http://www.isthe.com/chongo/src/fnv/hash_32a.c
+
+
+Please do not copyright this code. This code is in the public domain.
+
+LANDON CURT NOLL DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
+INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO
+EVENT SHALL LANDON CURT NOLL BE LIABLE FOR ANY SPECIAL, INDIRECT OR
+CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF
+USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
+OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+PERFORMANCE OF THIS SOFTWARE.
+
+By:
+ chongo /\oo/\
+ http://www.isthe.com/chongo/
+
+Share and Enjoy! :-)
diff --git a/LICENSES.txt b/LICENSES.txt
index fff8e96b7b..6af55f25d1 100644
--- a/LICENSES.txt
+++ b/LICENSES.txt
@@ -59,6 +59,28 @@ LICENSE.crc32c
*/
+LICENSE.fnv1a
+--------------------------------------------------------------
+parts of src/rdfnv1a.c: http://www.isthe.com/chongo/src/fnv/hash_32a.c
+
+
+Please do not copyright this code. This code is in the public domain.
+
+LANDON CURT NOLL DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
+INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO
+EVENT SHALL LANDON CURT NOLL BE LIABLE FOR ANY SPECIAL, INDIRECT OR
+CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF
+USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
+OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
+PERFORMANCE OF THIS SOFTWARE.
+
+By:
+ chongo /\oo/\
+ http://www.isthe.com/chongo/
+
+Share and Enjoy! :-)
+
+
LICENSE.hdrhistogram
--------------------------------------------------------------
This license covers src/rdhdrhistogram.c which is a C port of
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 0dc7fd438a..4b6f8e2244 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -7,6 +7,7 @@ set(
rdavl.c
rdbuf.c
rdcrc32.c
+ rdfnv1a.c
rdkafka.c
rdkafka_assignor.c
rdkafka_broker.c
diff --git a/src/Makefile b/src/Makefile
index 793cccefdc..d76079b6a6 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -39,7 +39,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_partition.c rdkafka_subscription.c \
rdkafka_assignor.c rdkafka_range_assignor.c \
rdkafka_roundrobin_assignor.c rdkafka_feature.c \
- rdcrc32.c crc32c.c rdmurmur2.c rdaddr.c rdrand.c rdlist.c \
+ rdcrc32.c crc32c.c rdmurmur2.c rdfnv1a.c rdaddr.c rdrand.c rdlist.c \
tinycthread.c tinycthread_extra.c \
rdlog.c rdstring.c rdkafka_event.c rdkafka_metadata.c \
rdregex.c rdports.c rdkafka_metadata_cache.c rdavl.c \
diff --git a/src/rdfnv1a.c b/src/rdfnv1a.c
new file mode 100644
index 0000000000..34feffae88
--- /dev/null
+++ b/src/rdfnv1a.c
@@ -0,0 +1,112 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2012-2020, Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include "rd.h"
+#include "rdunittest.h"
+#include "rdfnv1a.h"
+
+
+/* FNV-1a by Glenn Fowler, Landon Curt Noll, and Kiem-Phong Vo
+ *
+ * Based on http://www.isthe.com/chongo/src/fnv/hash_32a.c
+ * with librdkafka modifications to match the Sarama default Producer implementation,
+ * as seen here: https://github.com/Shopify/sarama/blob/master/partitioner.go#L203
+ * Note that this implementation is only compatible with Sarama's default
+ * NewHashPartitioner and not NewReferenceHashPartitioner.
+ */
+uint32_t rd_fnv1a (const void *key, size_t len) {
+ const uint32_t prime = 0x01000193; // 16777619
+ const uint32_t offset = 0x811C9DC5; // 2166136261
+ size_t i;
+ int32_t h = offset;
+
+ const unsigned char *data = (const unsigned char *)key;
+
+ for (i = 0; i < len; i++) {
+ h ^= data[i];
+ h *= prime;
+ }
+
+ /* Take absolute value to match the Sarama NewHashPartitioner implementation */
+ if (h < 0) {
+ h = -h;
+ }
+
+ return (uint32_t)h;
+}
+
+
+/**
+ * @brief Unittest for rd_fnv1a()
+ */
+int unittest_fnv1a (void) {
+ const char *short_unaligned = "1234";
+ const char *unaligned = "PreAmbleWillBeRemoved,ThePrePartThatIs";
+ const char *keysToTest[] = {
+ "kafka",
+ "giberish123456789",
+ short_unaligned,
+ short_unaligned+1,
+ short_unaligned+2,
+ short_unaligned+3,
+ unaligned,
+ unaligned+1,
+ unaligned+2,
+ unaligned+3,
+ "",
+ NULL,
+ };
+
+ // Acquired via https://play.golang.org/p/vWIhw3zJINA
+ const int32_t golang_hashfnv_results[] = {
+ 0xd33c4e1, // kafka
+ 0x77a58295, // giberish123456789
+ 0x23bdd03, // short_unaligned
+ 0x2dea3cd2, // short_unaligned+1
+ 0x740fa83e, // short_unaligned+2
+ 0x310ca263, // short_unaligned+3
+ 0x65cbd69c, // unaligned
+ 0x6e49c79a, // unaligned+1
+ 0x69eed356, // unaligned+2
+ 0x6abcc023, // unaligned+3
+ 0x7ee3623b, // ""
+ 0x7ee3623b, // NULL
+ };
+
+ size_t i;
+ for (i = 0; i < RD_ARRAYSIZE(keysToTest); i++) {
+ uint32_t h = rd_fnv1a(keysToTest[i],
+ keysToTest[i] ?
+ strlen(keysToTest[i]) : 0);
+ RD_UT_ASSERT((int32_t)h == golang_hashfnv_results[i],
+ "Calculated FNV-1a hash 0x%x for \"%s\", "
+ "expected 0x%x",
+ h, keysToTest[i], golang_hashfnv_results[i]);
+ }
+ RD_UT_PASS();
+}
diff --git a/src/rdfnv1a.h b/src/rdfnv1a.h
new file mode 100644
index 0000000000..bd6e06ddc2
--- /dev/null
+++ b/src/rdfnv1a.h
@@ -0,0 +1,35 @@
+/*
+ * librdkafka - Apache Kafka C library
+ *
+ * Copyright (c) 2020 Magnus Edenhill
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __RDFNV1A___H__
+#define __RDFNV1A___H__
+
+uint32_t rd_fnv1a (const void *key, size_t len);
+int unittest_fnv1a (void);
+
+#endif // __RDFNV1A___H__
diff --git a/src/rdkafka.h b/src/rdkafka.h
index 1144e75912..250c231578 100644
--- a/src/rdkafka.h
+++ b/src/rdkafka.h
@@ -2528,6 +2528,49 @@ int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt,
void *msg_opaque);
+/**
+ * @brief FNV-1a partitioner.
+ *
+ * Uses consistent hashing to map identical keys onto identical partitions
+ * using FNV-1a hashing.
+ *
+ * The \p rkt_opaque argument is the opaque set by
+ * rd_kafka_topic_conf_set_opaque().
+ * The \p msg_opaque argument is the per-message opaque
+ * passed to produce().
+ *
+ * @returns a partition between 0 and \p partition_cnt - 1.
+ */
+RD_EXPORT
+int32_t rd_kafka_msg_partitioner_fnv1a (const rd_kafka_topic_t *rkt,
+ const void *key, size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque);
+
+
+/**
+ * @brief Consistent-Random FNV-1a partitioner.
+ *
+ * Uses consistent hashing to map identical keys onto identical partitions
+ * using FNV-1a hashing.
+ * Messages without keys will be assigned via the random partitioner.
+ *
+ * The \p rkt_opaque argument is the opaque set by
+ * rd_kafka_topic_conf_set_opaque().
+ * The \p msg_opaque argument is the per-message opaque
+ * passed to produce().
+ *
+ * @returns a partition between 0 and \p partition_cnt - 1.
+ */
+RD_EXPORT
+int32_t rd_kafka_msg_partitioner_fnv1a_random (const rd_kafka_topic_t *rkt,
+ const void *key, size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque);
+
+
/**@}*/
diff --git a/src/rdkafka_conf.c b/src/rdkafka_conf.c
index aa335be8a4..a6cb1246f9 100644
--- a/src/rdkafka_conf.c
+++ b/src/rdkafka_conf.c
@@ -214,7 +214,9 @@ rd_kafka_conf_validate_partitioner (const struct rd_kafka_property *prop,
!strcmp(val, "consistent") ||
!strcmp(val, "consistent_random") ||
!strcmp(val, "murmur2") ||
- !strcmp(val, "murmur2_random");
+ !strcmp(val, "murmur2_random") ||
+ !strcmp(val, "fnv1a") ||
+ !strcmp(val, "fnv1a_random");
}
@@ -1288,7 +1290,11 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"`consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), "
"`consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), "
"`murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), "
- "`murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.).",
+ "`murmur2_random` - Java Producer compatible Murmur2 hash of key "
+ "(NULL keys are randomly partitioned. This is functionally equivalent "
+ "to the default partitioner in the Java Producer.), "
+ "`fnv1a` - FNV-1a hash of key (NULL keys are mapped to single partition), "
+ "`fnv1a_random` - FNV-1a hash of key (NULL keys are randomly partitioned).",
.sdef = "consistent_random",
.validate = rd_kafka_conf_validate_partitioner },
{ _RK_TOPIC|_RK_PRODUCER, "partitioner_cb", _RK_C_PTR,
diff --git a/src/rdkafka_msg.c b/src/rdkafka_msg.c
index 832267efbb..48a62ac059 100644
--- a/src/rdkafka_msg.c
+++ b/src/rdkafka_msg.c
@@ -36,6 +36,7 @@
#include "rdkafka_idempotence.h"
#include "rdkafka_txnmgr.h"
#include "rdcrc32.h"
+#include "rdfnv1a.h"
#include "rdmurmur2.h"
#include "rdrand.h"
#include "rdtime.h"
@@ -883,6 +884,30 @@ int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt,
return (rd_murmur2(key, keylen) & 0x7fffffff) % partition_cnt;
}
+int32_t rd_kafka_msg_partitioner_fnv1a (const rd_kafka_topic_t *rkt,
+ const void *key, size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ return rd_fnv1a(key, keylen) % partition_cnt;
+}
+
+int32_t rd_kafka_msg_partitioner_fnv1a_random (const rd_kafka_topic_t *rkt,
+ const void *key, size_t keylen,
+ int32_t partition_cnt,
+ void *rkt_opaque,
+ void *msg_opaque) {
+ if (!key)
+ return rd_kafka_msg_partitioner_random(rkt,
+ key,
+ keylen,
+ partition_cnt,
+ rkt_opaque,
+ msg_opaque);
+ else
+ return rd_fnv1a(key, keylen) % partition_cnt;
+}
+
/**
* @brief Assigns a message to a topic partition using a partitioner.
diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c
index 51ceaeb7f1..40c7cc1b39 100644
--- a/src/rdkafka_topic.c
+++ b/src/rdkafka_topic.c
@@ -301,6 +301,10 @@ shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
(void *)rd_kafka_msg_partitioner_murmur2 },
{ "murmur2_random",
(void *)rd_kafka_msg_partitioner_murmur2_random },
+ { "fnv1a",
+ (void *)rd_kafka_msg_partitioner_fnv1a },
+ { "fnv1a_random",
+ (void *)rd_kafka_msg_partitioner_fnv1a_random },
{ NULL }
};
int i;
diff --git a/src/rdunittest.c b/src/rdunittest.c
index 3f8eb609ca..44fcb00a45 100644
--- a/src/rdunittest.c
+++ b/src/rdunittest.c
@@ -37,6 +37,7 @@
#include "rdbuf.h"
#include "crc32c.h"
#include "rdmurmur2.h"
+#include "rdfnv1a.h"
#if WITH_HDRHISTOGRAM
#include "rdhdrhistogram.h"
#endif
@@ -444,12 +445,13 @@ int rd_unittest (void) {
const char *name;
int (*call) (void);
} unittests[] = {
- { "sysqueue", unittest_sysqueue },
- { "rdbuf", unittest_rdbuf },
- { "rdvarint", unittest_rdvarint },
- { "crc32c", unittest_crc32c },
- { "msg", unittest_msg },
+ { "sysqueue", unittest_sysqueue },
+ { "rdbuf", unittest_rdbuf },
+ { "rdvarint", unittest_rdvarint },
+ { "crc32c", unittest_crc32c },
+ { "msg", unittest_msg },
{ "murmurhash", unittest_murmur2 },
+ { "fnv1a", unittest_fnv1a },
#if WITH_HDRHISTOGRAM
{ "rdhdrhistogram", unittest_rdhdrhistogram },
#endif
diff --git a/tests/0048-partitioner.c b/tests/0048-partitioner.c
index 2f05cb1b04..fcc61cd9c6 100644
--- a/tests/0048-partitioner.c
+++ b/tests/0048-partitioner.c
@@ -268,6 +268,21 @@ static void do_test_partitioners (void) {
0x4f7703da % part_cnt,
0x5ec19395 % part_cnt
} },
+ { "fnv1a", {
+ /* .. using https://play.golang.org/p/hRkA4xtYyJ6 */
+ 0x7ee3623b % part_cnt,
+ 0x7ee3623b % part_cnt,
+ 0x27e6f469 % part_cnt,
+ 0x155e3e5f % part_cnt,
+ 0x17b1e27a % part_cnt
+ } },
+ { "fnv1a_random", {
+ -1,
+ 0x7ee3623b % part_cnt,
+ 0x27e6f469 % part_cnt,
+ 0x155e3e5f % part_cnt,
+ 0x17b1e27a % part_cnt
+ } },
{ NULL }
};
int pi;
diff --git a/win32/librdkafka.vcxproj b/win32/librdkafka.vcxproj
index 1add307a6c..e1ce5a73b3 100644
--- a/win32/librdkafka.vcxproj
+++ b/win32/librdkafka.vcxproj
@@ -168,6 +168,7 @@
+