Skip to content

Commit

Permalink
partitioner: Add FNV-1a partitioner (confluentinc#2724)
Browse files Browse the repository at this point in the history
Adds a new partitioner using the FNV-1a hashing algorithm, with some
tweaks to match Sarama's default hashing partitioner behaviour.
Main use case is for users switching from Sarama to librdkafka
(or confluent-kafka-go) and wanting to maintain ordering guarantees.
  • Loading branch information
Manicben committed Mar 11, 2020
1 parent 7e2f7d7 commit 6fe229a
Show file tree
Hide file tree
Showing 14 changed files with 293 additions and 9 deletions.
2 changes: 1 addition & 1 deletion CONFIGURATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ message.timeout.ms | P | 0 .. 2147483647 | 300000
delivery.timeout.ms | P | 0 .. 2147483647 | 300000 | high | Alias for `message.timeout.ms`: Local message timeout. This value is only enforced locally and limits the time a produced message waits for successful delivery. A time of 0 is infinite. This is the maximum time librdkafka may use to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. The message timeout is automatically adjusted to `transaction.timeout.ms` if `transactional.id` is configured. <br>*Type: integer*
queuing.strategy | P | fifo, lifo | fifo | low | **EXPERIMENTAL**: subject to change or removal. **DEPRECATED** Producer queuing strategy. FIFO preserves produce ordering, while LIFO prioritizes new messages. <br>*Type: enum value*
produce.offset.report | P | true, false | false | low | **DEPRECATED** No longer used. <br>*Type: boolean*
partitioner | P | | consistent_random | high | Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.). <br>*Type: string*
partitioner | P | | consistent_random | high | Partitioner: `random` - random distribution, `consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), `consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), `murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), `murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.), `fnv1a` - FNV-1a hash of key (NULL keys are mapped to single partition), `fnv1a_random` - FNV-1a hash of key (NULL keys are randomly partitioned). <br>*Type: string*
partitioner_cb | P | | | low | Custom partitioner callback (set with rd_kafka_topic_conf_set_partitioner_cb()) <br>*Type: see dedicated API*
msg_order_cmp | P | | | low | **EXPERIMENTAL**: subject to change or removal. **DEPRECATED** Message queue ordering comparator (set with rd_kafka_topic_conf_set_msg_order_cmp()). Also see `queuing.strategy`. <br>*Type: see dedicated API*
opaque | * | | | low | Application opaque (set with rd_kafka_topic_conf_set_opaque()) <br>*Type: see dedicated API*
Expand Down
18 changes: 18 additions & 0 deletions LICENSE.fnv1a
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
parts of src/rdfnv1a.c: http://www.isthe.com/chongo/src/fnv/hash_32a.c


Please do not copyright this code. This code is in the public domain.

LANDON CURT NOLL DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO
EVENT SHALL LANDON CURT NOLL BE LIABLE FOR ANY SPECIAL, INDIRECT OR
CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF
USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.

By:
chongo <Landon Curt Noll> /\oo/\
http://www.isthe.com/chongo/

Share and Enjoy! :-)
22 changes: 22 additions & 0 deletions LICENSES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@ LICENSE.crc32c
*/


LICENSE.fnv1a
--------------------------------------------------------------
parts of src/rdfnv1a.c: http://www.isthe.com/chongo/src/fnv/hash_32a.c


Please do not copyright this code. This code is in the public domain.

LANDON CURT NOLL DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO
EVENT SHALL LANDON CURT NOLL BE LIABLE FOR ANY SPECIAL, INDIRECT OR
CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF
USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
PERFORMANCE OF THIS SOFTWARE.

By:
chongo <Landon Curt Noll> /\oo/\
http://www.isthe.com/chongo/

Share and Enjoy! :-)


LICENSE.hdrhistogram
--------------------------------------------------------------
This license covers src/rdhdrhistogram.c which is a C port of
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ set(
rdavl.c
rdbuf.c
rdcrc32.c
rdfnv1a.c
rdkafka.c
rdkafka_assignor.c
rdkafka_broker.c
Expand Down
2 changes: 1 addition & 1 deletion src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ SRCS= rdkafka.c rdkafka_broker.c rdkafka_msg.c rdkafka_topic.c \
rdkafka_partition.c rdkafka_subscription.c \
rdkafka_assignor.c rdkafka_range_assignor.c \
rdkafka_roundrobin_assignor.c rdkafka_feature.c \
rdcrc32.c crc32c.c rdmurmur2.c rdaddr.c rdrand.c rdlist.c \
rdcrc32.c crc32c.c rdmurmur2.c rdfnv1a.c rdaddr.c rdrand.c rdlist.c \
tinycthread.c tinycthread_extra.c \
rdlog.c rdstring.c rdkafka_event.c rdkafka_metadata.c \
rdregex.c rdports.c rdkafka_metadata_cache.c rdavl.c \
Expand Down
112 changes: 112 additions & 0 deletions src/rdfnv1a.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2012-2020, Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#include "rd.h"
#include "rdunittest.h"
#include "rdfnv1a.h"


/* FNV-1a by Glenn Fowler, Landon Curt Noll, and Kiem-Phong Vo
*
* Based on http://www.isthe.com/chongo/src/fnv/hash_32a.c
* with librdkafka modifications to match the Sarama default Producer implementation,
* as seen here: https://github.com/Shopify/sarama/blob/master/partitioner.go#L203
* Note that this implementation is only compatible with Sarama's default
* NewHashPartitioner and not NewReferenceHashPartitioner.
*/
uint32_t rd_fnv1a (const void *key, size_t len) {
const uint32_t prime = 0x01000193; // 16777619
const uint32_t offset = 0x811C9DC5; // 2166136261
size_t i;
int32_t h = offset;

const unsigned char *data = (const unsigned char *)key;

for (i = 0; i < len; i++) {
h ^= data[i];
h *= prime;
}

/* Take absolute value to match the Sarama NewHashPartitioner implementation */
if (h < 0) {
h = -h;
}

return (uint32_t)h;
}


/**
* @brief Unittest for rd_fnv1a()
*/
int unittest_fnv1a (void) {
const char *short_unaligned = "1234";
const char *unaligned = "PreAmbleWillBeRemoved,ThePrePartThatIs";
const char *keysToTest[] = {
"kafka",
"giberish123456789",
short_unaligned,
short_unaligned+1,
short_unaligned+2,
short_unaligned+3,
unaligned,
unaligned+1,
unaligned+2,
unaligned+3,
"",
NULL,
};

// Acquired via https://play.golang.org/p/vWIhw3zJINA
const int32_t golang_hashfnv_results[] = {
0xd33c4e1, // kafka
0x77a58295, // giberish123456789
0x23bdd03, // short_unaligned
0x2dea3cd2, // short_unaligned+1
0x740fa83e, // short_unaligned+2
0x310ca263, // short_unaligned+3
0x65cbd69c, // unaligned
0x6e49c79a, // unaligned+1
0x69eed356, // unaligned+2
0x6abcc023, // unaligned+3
0x7ee3623b, // ""
0x7ee3623b, // NULL
};

size_t i;
for (i = 0; i < RD_ARRAYSIZE(keysToTest); i++) {
uint32_t h = rd_fnv1a(keysToTest[i],
keysToTest[i] ?
strlen(keysToTest[i]) : 0);
RD_UT_ASSERT((int32_t)h == golang_hashfnv_results[i],
"Calculated FNV-1a hash 0x%x for \"%s\", "
"expected 0x%x",
h, keysToTest[i], golang_hashfnv_results[i]);
}
RD_UT_PASS();
}
35 changes: 35 additions & 0 deletions src/rdfnv1a.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2020 Magnus Edenhill
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef __RDFNV1A___H__
#define __RDFNV1A___H__

uint32_t rd_fnv1a (const void *key, size_t len);
int unittest_fnv1a (void);

#endif // __RDFNV1A___H__
43 changes: 43 additions & 0 deletions src/rdkafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -2528,6 +2528,49 @@ int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt,
void *msg_opaque);


/**
* @brief FNV-1a partitioner.
*
* Uses consistent hashing to map identical keys onto identical partitions
* using FNV-1a hashing.
*
* The \p rkt_opaque argument is the opaque set by
* rd_kafka_topic_conf_set_opaque().
* The \p msg_opaque argument is the per-message opaque
* passed to produce().
*
* @returns a partition between 0 and \p partition_cnt - 1.
*/
RD_EXPORT
int32_t rd_kafka_msg_partitioner_fnv1a (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque);


/**
* @brief Consistent-Random FNV-1a partitioner.
*
* Uses consistent hashing to map identical keys onto identical partitions
* using FNV-1a hashing.
* Messages without keys will be assigned via the random partitioner.
*
* The \p rkt_opaque argument is the opaque set by
* rd_kafka_topic_conf_set_opaque().
* The \p msg_opaque argument is the per-message opaque
* passed to produce().
*
* @returns a partition between 0 and \p partition_cnt - 1.
*/
RD_EXPORT
int32_t rd_kafka_msg_partitioner_fnv1a_random (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque);


/**@}*/


Expand Down
10 changes: 8 additions & 2 deletions src/rdkafka_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,9 @@ rd_kafka_conf_validate_partitioner (const struct rd_kafka_property *prop,
!strcmp(val, "consistent") ||
!strcmp(val, "consistent_random") ||
!strcmp(val, "murmur2") ||
!strcmp(val, "murmur2_random");
!strcmp(val, "murmur2_random") ||
!strcmp(val, "fnv1a") ||
!strcmp(val, "fnv1a_random");
}


Expand Down Expand Up @@ -1288,7 +1290,11 @@ static const struct rd_kafka_property rd_kafka_properties[] = {
"`consistent` - CRC32 hash of key (Empty and NULL keys are mapped to single partition), "
"`consistent_random` - CRC32 hash of key (Empty and NULL keys are randomly partitioned), "
"`murmur2` - Java Producer compatible Murmur2 hash of key (NULL keys are mapped to single partition), "
"`murmur2_random` - Java Producer compatible Murmur2 hash of key (NULL keys are randomly partitioned. This is functionally equivalent to the default partitioner in the Java Producer.).",
"`murmur2_random` - Java Producer compatible Murmur2 hash of key "
"(NULL keys are randomly partitioned. This is functionally equivalent "
"to the default partitioner in the Java Producer.), "
"`fnv1a` - FNV-1a hash of key (NULL keys are mapped to single partition), "
"`fnv1a_random` - FNV-1a hash of key (NULL keys are randomly partitioned).",
.sdef = "consistent_random",
.validate = rd_kafka_conf_validate_partitioner },
{ _RK_TOPIC|_RK_PRODUCER, "partitioner_cb", _RK_C_PTR,
Expand Down
25 changes: 25 additions & 0 deletions src/rdkafka_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "rdkafka_idempotence.h"
#include "rdkafka_txnmgr.h"
#include "rdcrc32.h"
#include "rdfnv1a.h"
#include "rdmurmur2.h"
#include "rdrand.h"
#include "rdtime.h"
Expand Down Expand Up @@ -883,6 +884,30 @@ int32_t rd_kafka_msg_partitioner_murmur2_random (const rd_kafka_topic_t *rkt,
return (rd_murmur2(key, keylen) & 0x7fffffff) % partition_cnt;
}

int32_t rd_kafka_msg_partitioner_fnv1a (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
return rd_fnv1a(key, keylen) % partition_cnt;
}

int32_t rd_kafka_msg_partitioner_fnv1a_random (const rd_kafka_topic_t *rkt,
const void *key, size_t keylen,
int32_t partition_cnt,
void *rkt_opaque,
void *msg_opaque) {
if (!key)
return rd_kafka_msg_partitioner_random(rkt,
key,
keylen,
partition_cnt,
rkt_opaque,
msg_opaque);
else
return rd_fnv1a(key, keylen) % partition_cnt;
}


/**
* @brief Assigns a message to a topic partition using a partitioner.
Expand Down
4 changes: 4 additions & 0 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ shptr_rd_kafka_itopic_t *rd_kafka_topic_new0 (rd_kafka_t *rk,
(void *)rd_kafka_msg_partitioner_murmur2 },
{ "murmur2_random",
(void *)rd_kafka_msg_partitioner_murmur2_random },
{ "fnv1a",
(void *)rd_kafka_msg_partitioner_fnv1a },
{ "fnv1a_random",
(void *)rd_kafka_msg_partitioner_fnv1a_random },
{ NULL }
};
int i;
Expand Down
12 changes: 7 additions & 5 deletions src/rdunittest.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "rdbuf.h"
#include "crc32c.h"
#include "rdmurmur2.h"
#include "rdfnv1a.h"
#if WITH_HDRHISTOGRAM
#include "rdhdrhistogram.h"
#endif
Expand Down Expand Up @@ -444,12 +445,13 @@ int rd_unittest (void) {
const char *name;
int (*call) (void);
} unittests[] = {
{ "sysqueue", unittest_sysqueue },
{ "rdbuf", unittest_rdbuf },
{ "rdvarint", unittest_rdvarint },
{ "crc32c", unittest_crc32c },
{ "msg", unittest_msg },
{ "sysqueue", unittest_sysqueue },
{ "rdbuf", unittest_rdbuf },
{ "rdvarint", unittest_rdvarint },
{ "crc32c", unittest_crc32c },
{ "msg", unittest_msg },
{ "murmurhash", unittest_murmur2 },
{ "fnv1a", unittest_fnv1a },
#if WITH_HDRHISTOGRAM
{ "rdhdrhistogram", unittest_rdhdrhistogram },
#endif
Expand Down
15 changes: 15 additions & 0 deletions tests/0048-partitioner.c
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,21 @@ static void do_test_partitioners (void) {
0x4f7703da % part_cnt,
0x5ec19395 % part_cnt
} },
{ "fnv1a", {
/* .. using https://play.golang.org/p/hRkA4xtYyJ6 */
0x7ee3623b % part_cnt,
0x7ee3623b % part_cnt,
0x27e6f469 % part_cnt,
0x155e3e5f % part_cnt,
0x17b1e27a % part_cnt
} },
{ "fnv1a_random", {
-1,
0x7ee3623b % part_cnt,
0x27e6f469 % part_cnt,
0x155e3e5f % part_cnt,
0x17b1e27a % part_cnt
} },
{ NULL }
};
int pi;
Expand Down
Loading

0 comments on commit 6fe229a

Please sign in to comment.