Skip to content

Commit

Permalink
Merge pull request #27 from Shopify/fix_negative_partitions
Browse files Browse the repository at this point in the history
Fix HashPartitioner returning negative partitions.
  • Loading branch information
eapache committed Aug 26, 2013
2 parents 907691e + f8f6c2a commit fed6dbf
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 15 deletions.
18 changes: 11 additions & 7 deletions partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
// decides to which partition to send the message. RandomPartitioner, RoundRobinPartitioner and HashPartitioner are provided
// as simple default implementations.
type Partitioner interface {
Partition(key Encoder, numPartitions int) int
Partition(key Encoder, numPartitions int32) int32
}

// RandomPartitioner implements the Partitioner interface by choosing a random partition each time.
Expand All @@ -25,16 +25,16 @@ func NewRandomPartitioner() *RandomPartitioner {
return p
}

func (p *RandomPartitioner) Partition(key Encoder, numPartitions int) int {
return p.generator.Intn(numPartitions)
func (p *RandomPartitioner) Partition(key Encoder, numPartitions int32) int32 {
return int32(p.generator.Intn(int(numPartitions)))
}

// RoundRobinPartitioner implements the Partitioner interface by walking through the available partitions one at a time.
type RoundRobinPartitioner struct {
partition int
partition int32
}

func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int) int {
func (p *RoundRobinPartitioner) Partition(key Encoder, numPartitions int32) int32 {
if p.partition >= numPartitions {
p.partition = 0
}
Expand All @@ -58,7 +58,7 @@ func NewHashPartitioner() *HashPartitioner {
return p
}

func (p *HashPartitioner) Partition(key Encoder, numPartitions int) int {
func (p *HashPartitioner) Partition(key Encoder, numPartitions int32) int32 {
if key == nil {
return p.random.Partition(key, numPartitions)
}
Expand All @@ -68,5 +68,9 @@ func (p *HashPartitioner) Partition(key Encoder, numPartitions int) int {
}
p.hasher.Reset()
p.hasher.Write(bytes)
return int(p.hasher.Sum32()) % numPartitions
hash := int32(p.hasher.Sum32())
if hash < 0 {
hash = -hash
}
return hash % numPartitions
}
18 changes: 12 additions & 6 deletions partitioner_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package sarama

import "testing"
import (
"crypto/rand"
"testing"
)

func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, key Encoder, numPartitions int) {
func assertPartitioningConsistent(t *testing.T, partitioner Partitioner, key Encoder, numPartitions int32) {
choice := partitioner.Partition(key, numPartitions)
if choice < 0 || choice >= numPartitions {
t.Error(partitioner, "returned partition", choice, "outside of range for", key)
Expand Down Expand Up @@ -39,7 +42,8 @@ func TestRoundRobinPartitioner(t *testing.T) {
t.Error("Returned non-zero partition when only one available.")
}

for i := 1; i < 50; i++ {
var i int32
for i = 1; i < 50; i++ {
choice := partitioner.Partition(nil, 7)
if choice != i%7 {
t.Error("Returned partition", choice, "expecting", i%7)
Expand All @@ -62,7 +66,9 @@ func TestHashPartitioner(t *testing.T) {
}
}

assertPartitioningConsistent(t, partitioner, StringEncoder("ABC"), 50)
assertPartitioningConsistent(t, partitioner, StringEncoder("ABCDEF"), 37)
assertPartitioningConsistent(t, partitioner, StringEncoder("some random thing"), 3)
buf := make([]byte, 256)
for i := 1; i < 50; i++ {
rand.Read(buf)
assertPartitioningConsistent(t, partitioner, ByteEncoder(buf), 50)
}
}
6 changes: 4 additions & 2 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ func (p *Producer) choosePartition(key Encoder) (int32, error) {
return -1, err
}

choice := p.config.Partitioner.Partition(key, len(partitions))
numPartitions := int32(len(partitions))

if choice >= len(partitions) {
choice := p.config.Partitioner.Partition(key, numPartitions)

if choice < 0 || choice >= numPartitions {
return -1, InvalidPartition
}

Expand Down

0 comments on commit fed6dbf

Please sign in to comment.