Skip to content

Commit

Permalink
bug: implement unsigned modulus for partitioning with crc32 hashing
Browse files Browse the repository at this point in the history
Signed-off-by: csm8118 <csm8118@yahoo.com>
  • Loading branch information
csm8118 committed Aug 4, 2023
1 parent d8d9e73 commit f4fdb32
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 0 deletions.
31 changes: 31 additions & 0 deletions partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"hash"
"hash/crc32"
"hash/fnv"
"math/rand"
"time"
Expand Down Expand Up @@ -53,6 +54,15 @@ func WithAbsFirst() HashPartitionerOption {
}
}

// WithHashUnsigned means the partitioner treats the hashed value as unsigned when
// partitioning. This is intended to be combined with the crc32 hash algorithm to
// be compatible with librdkafka's implementation
func WithHashUnsigned() HashPartitionerOption {
return func(hp *hashPartitioner) {
hp.hashUnsigned = true
}
}

// WithCustomHashFunction lets you specify what hash function to use for the partitioning
func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
return func(hp *hashPartitioner) {
Expand Down Expand Up @@ -126,6 +136,7 @@ type hashPartitioner struct {
random Partitioner
hasher hash.Hash32
referenceAbs bool
hashUnsigned bool
}

// NewCustomHashPartitioner is a wrapper around NewHashPartitioner, allowing the use of custom hasher.
Expand All @@ -137,6 +148,7 @@ func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor
p.random = NewRandomPartitioner(topic)
p.hasher = hasher()
p.referenceAbs = false
p.hashUnsigned = false
return p
}
}
Expand All @@ -148,6 +160,7 @@ func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstruct
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = false
p.hashUnsigned = false
for _, option := range options {
option(p)
}
Expand All @@ -164,6 +177,7 @@ func NewHashPartitioner(topic string) Partitioner {
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = false
p.hashUnsigned = false
return p
}

Expand All @@ -176,6 +190,19 @@ func NewReferenceHashPartitioner(topic string) Partitioner {
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = true
p.hashUnsigned = false
return p
}

// NewConsistentCRCHashPartitioner is like NewHashPartitioner execpt that it uses the *unsigned* crc32 hash
// of the encoded bytes of the message key modulus the number of partitions. This is compatible with
// librdkafka's `consistent_random` partitioner
func NewConsistentCRCHashPartitioner(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = crc32.NewIEEE()
p.referenceAbs = false
p.hashUnsigned = true
return p
}

Expand All @@ -199,6 +226,10 @@ func (p *hashPartitioner) Partition(message *ProducerMessage, numPartitions int3
// but not past Sarama versions
if p.referenceAbs {
partition = (int32(p.hasher.Sum32()) & 0x7fffffff) % numPartitions
} else if p.hashUnsigned {
// librdkafka treats the hashed value as unsigned. If `hashUnsigned` is set we are compatible
// with librdkafka's `consistent` partitioning but not past Sarama versions
partition = int32(p.hasher.Sum32() % uint32(numPartitions))
} else {
partition = int32(p.hasher.Sum32()) % numPartitions
if partition < 0 {
Expand Down
79 changes: 79 additions & 0 deletions partitioner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"crypto/rand"
"hash/crc32"
"hash/fnv"
"log"
"testing"
Expand All @@ -26,6 +27,28 @@ func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, message
}
}

type partitionerTestCase struct {
key string
expectedPartition int32
}

func partitionAndAssert(t *testing.T, partitioner Partitioner, numPartitions int32, testCase partitionerTestCase) {
t.Run("partitionAndAssert "+testCase.key, func(t *testing.T) {
msg := &ProducerMessage{
Key: StringEncoder(testCase.key),
}

partition, err := partitioner.Partition(msg, numPartitions)

if err != nil {
t.Error(partitioner, err)
}
if partition != testCase.expectedPartition {
t.Error(partitioner, "partitioning", testCase.key, "returned partition", partition, "but expected", testCase.expectedPartition, ".")
}
})
}

func TestRandomPartitioner(t *testing.T) {
partitioner := NewRandomPartitioner("mytopic")

Expand Down Expand Up @@ -185,6 +208,62 @@ func TestHashPartitionerMinInt32(t *testing.T) {
}
}

func TestConsistentCRCHashPartitioner(t *testing.T) {
numPartitions := int32(100)
partitioner := NewConsistentCRCHashPartitioner("mytopic")

testCases := []partitionerTestCase{
{
key: "abc123def456",
expectedPartition: 57,
},
{
// `SheetJS` has a crc32 hash value of 2647669026 (which is -1647298270 as a signed int32)
// Modding the signed value will give a partition of 70. Modding the unsigned value will give 26
key: "SheetJS",
expectedPartition: 26,
},
{
key: "9e8c7f4cf45857cfff7645d6",
expectedPartition: 24,
},
{
key: "3900446192ff85a5f67da10c",
expectedPartition: 75,
},
{
key: "0f4407b7a67d6d27de372198",
expectedPartition: 50,
},
}

for _, tc := range testCases {
partitionAndAssert(t, partitioner, numPartitions, tc)
}
}

func TestCustomPartitionerWithConsistentHashing(t *testing.T) {
// Setting both `hashUnsigned` and the hash function to `crc32.NewIEEE` is equivalent to using `NewConsistentCRCHashPartitioner`
partitioner := NewCustomPartitioner(
WithHashUnsigned(),
WithCustomHashFunction(crc32.NewIEEE),
)("mytopic")

// See above re: why `SheetJS`
msg := ProducerMessage{
Key: StringEncoder("SheetJS"),
}

choice, err := partitioner.Partition(&msg, 100)
if err != nil {
t.Error(partitioner, err)
}
expectedPartition := int32(26)
if choice != expectedPartition {
t.Error(partitioner, "returned partition", choice, "but expected", expectedPartition, ".")
}
}

func TestManualPartitioner(t *testing.T) {
partitioner := NewManualPartitioner("mytopic")

Expand Down

0 comments on commit f4fdb32

Please sign in to comment.