Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add customizable partitioner #1118

Merged
merged 2 commits into from
Jun 15, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions partitioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,31 @@ type PartitionerConstructor func(topic string) Partitioner

type manualPartitioner struct{}

// HashPartitionOption lets you modify default values of the partitioner
type HashPartitionerOption func(*hashPartitioner)

// WithAbsFirst means that the partitioner handles absolute values
// in the same way as the reference Java implementation
func WithAbsFirst() HashPartitionerOption {
return func(hp *hashPartitioner) {
hp.referenceAbs = true
}
}

// WithCustomHashFunction lets you specify what hash function to use for the partitioning
func WithCustomHashFunction(hasher func() hash.Hash32) HashPartitionerOption {
return func(hp *hashPartitioner) {
hp.hasher = hasher()
}
}

// WithCustomFallbackPartitioner lets you specify what HashPartitioner should be used in case a Distribution Key is empty
func WithCustomFallbackPartitioner(randomHP *hashPartitioner) HashPartitionerOption {
return func(hp *hashPartitioner) {
hp.random = hp
}
}

// NewManualPartitioner returns a Partitioner which uses the partition manually set in the provided
// ProducerMessage's Partition field as the partition to produce to.
func NewManualPartitioner(topic string) Partitioner {
Expand Down Expand Up @@ -116,6 +141,20 @@ func NewCustomHashPartitioner(hasher func() hash.Hash32) PartitionerConstructor
}
}

// NewCustomPartitioner creates a default Partitioner but lets you specify the behavior of each component via options
func NewCustomPartitioner(options ...HashPartitionerOption) PartitionerConstructor {
return func(topic string) Partitioner {
p := new(hashPartitioner)
p.random = NewRandomPartitioner(topic)
p.hasher = fnv.New32a()
p.referenceAbs = false
for _, option := range options {
option(p)
}
return p
}
}

// NewHashPartitioner returns a Partitioner which behaves as follows. If the message's key is nil then a
// random partition is chosen. Otherwise the FNV-1a hash of the encoded bytes of the message key is used,
// modulus the number of partitions. This ensures that messages with the same key always end up on the
Expand Down