diff --git a/partitioner.go b/partitioner.go index a9a183236..6a708e729 100644 --- a/partitioner.go +++ b/partitioner.go @@ -42,6 +42,31 @@ type PartitionerConstructor func(topic string) Partitioner type manualPartitioner struct{} +// HashPartitionOption lets you modify default values of the partitioner +type HashPartitionerOption func(*hashPartitioner) + +// WithAbsFirst means that the partitioner handles absolute values +// in the same way as the reference Java implementation +func WithAbsFirst() HashPartitionerOption { + return func(hp *hashPartitioner) { + hp.referenceAbs = true + } +} + +// WithCustomHashFunction lets you specify what hash function to use for the partitioning +func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption { + return func(hp *hashPartitioner) { + hp.hasher = hasher() + } +} + +// WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty +func WithCustomFallbackPartitioner(randomHP *hashPartitioner) HashPartitionerOption { + return func(hp *hashPartitioner) { + hp.random = hp + } +} + // NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided // ProducerMessage's Partition field as the partition to produce to. func NewManualPartitioner(topic string) Partitioner { @@ -116,6 +141,20 @@ func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor } } +// NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options +func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor { + return func(topic string) Partitioner { + p := new(hashPartitioner) + p.random = NewRandomPartitioner(topic) + p.hasher = fnv.New32a() + p.referenceAbs = false + for _, option := range options { + option(p) + } + return p + } +} + // NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a // random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used, // modulus the number of partitions. This ensures that messages with the same key always end up on the