From 30514d0645d388d8eb014c26248dad2092e03a53 Mon Sep 17 00:00:00 2001 From: Michael Ernst Date: Fri, 15 Jun 2018 16:58:26 +0200 Subject: [PATCH 1/2] add customizable partitioner which lets you create a partitioner with most permutations of configurable elements --- partitioner.go | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/partitioner.go b/partitioner.go index a9a183236..0d732fddd 100644 --- a/partitioner.go +++ b/partitioner.go @@ -42,6 +42,31 @@ type PartitionerConstructor func(topic string) Partitioner type manualPartitioner struct{} +// HashPartitionOption lets you modify default values of the partitioner +type HashPartitionerOption func(*hashPartitioner) + +// WithAbsFirst means that the partitioner handles absolute values +// in the same way as the reference Java implementation +func WithAbsFirst() HashPartitionerOption { + return func(hp *hashPartitioner) { + hp.referenceAbs = true + } +} + +// WithCustomHashFunction lets you specify what hash function to use for the partitioning +func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption { + return func(hp *hashPartitioner) { + hp.hasher = hasher() + } +} + +// WithCustomRandomPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty +func WithCustomRandomPartitioner(randomHP *hashPartitioner) HashPartitionerOption { + return func(hp *hashPartitioner) { + hp.random = hp + } +} + // NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided // ProducerMessage's Partition field as the partition to produce to. func NewManualPartitioner(topic string) Partitioner { @@ -116,6 +141,20 @@ func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor } } +// NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options +func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor { + return func(topic string) Partitioner { + p := new(hashPartitioner) + p.random = NewRandomPartitioner(topic) + p.hasher = fnv.New32a() + p.referenceAbs = false + for _, option := range options { + option(p) + } + return p + } +} + // NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a // random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used, // modulus the number of partitions. This ensures that messages with the same key always end up on the From a74fb772b5cfe121956313c4f6596e372e83cd27 Mon Sep 17 00:00:00 2001 From: Michael Ernst Date: Fri, 15 Jun 2018 17:53:20 +0200 Subject: [PATCH 2/2] update name of partitioner option so it makes more sense --- partitioner.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/partitioner.go b/partitioner.go index 0d732fddd..6a708e729 100644 --- a/partitioner.go +++ b/partitioner.go @@ -60,8 +60,8 @@ func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption { } } -// WithCustomRandomPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty -func WithCustomRandomPartitioner(randomHP *hashPartitioner) HashPartitionerOption { +// WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty +func WithCustomFallbackPartitioner(randomHP *hashPartitioner) HashPartitionerOption { return func(hp *hashPartitioner) { hp.random = hp }