Skip to content
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.

Commit

Permalink
Pluggable partition assignment strategies
Browse files Browse the repository at this point in the history
This incorporates the round robin changes from #216 (a64a8be) and
exposes the ability for users to implement their own partition assignor.

The tests for the two assignors now match the Java consumer's, to
validate that the behaviors match up as expected. Reviewing the
previous tests, I _think_ all of the same scenarios are still
covered, as well.

Backwards compatibility is maintained by falling back to the value in
`PartitionStrategy` if the `PartitionAssignor` is unset.
  • Loading branch information
pd committed Mar 22, 2018
1 parent 685bf39 commit 3a860e6
Show file tree
Hide file tree
Showing 4 changed files with 324 additions and 274 deletions.
231 changes: 149 additions & 82 deletions balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,96 +71,181 @@ func (n *Notification) success(current map[string][]int32) *Notification {

// --------------------------------------------------------------------

type balancer struct {
client sarama.Client
memberIDs []string
topics map[string]*topicInfo
}
// Assignor is a function which returns specific partition assignments
// given the set of topic subscriptions of a given group.
type Assignor func(subs *Subscriptions, topics []*TopicPartitions) Assignments

type topicInfo struct {
// TopicPartitions identifies a topic and its partition IDs.
type TopicPartitions struct {
Name string
Partitions []int32
MemberIDs []string
}

// Subscriptions contains information about all members in a consumer
// group, and which topics they have subscribed to.
type Subscriptions struct {
memberIDs []string
subscribers map[string][]string
}

// NewSubscriptions returns an empty set of subscriptions.
func NewSubscriptions() *Subscriptions {
return &Subscriptions{
memberIDs: []string{},
subscribers: map[string][]string{},
}
}

// Members returns the list of all member IDs in the group.
func (m *Subscriptions) Members() []string {
return m.memberIDs
}

// AddSubscriber registers a member as subscribed to a topic.
// Returns self.
func (m *Subscriptions) AddSubscriber(memberID, topic string) *Subscriptions {
seen := false
for i := range m.memberIDs {
if m.memberIDs[i] == memberID {
seen = true
}
}

if !seen {
m.memberIDs = append(m.memberIDs, memberID)
}

m.subscribers[topic] = append(m.subscribers[topic], memberID)
return m
}

// SubscribedMembers returns the full list of members subscribed
// to a topic.
func (m *Subscriptions) SubscribedMembers(topic string) []string {
return m.subscribers[topic]
}

// IsSubscribed returns true if a member is subscribed to a topic.
func (m *Subscriptions) IsSubscribed(memberID, topic string) bool {
subs, ok := m.subscribers[topic]
if !ok {
return false
}

for i := range subs {
if subs[i] == memberID {
return true
}
}

return false
}

// Assignments is a mapping of member IDs to the topic partitions that they
// have been assigned.
type Assignments map[string]map[string][]int32

// NewAssignments returns an empty set of assignments.
func NewAssignments() Assignments {
return map[string]map[string][]int32{}
}

// Assign adds a partition to the list of a member's assignments.
func (a Assignments) Assign(memberID, topic string, partition int32) {
topics, ok := a[memberID]
if !ok {
topics = map[string][]int32{}
a[memberID] = topics
}

topics[topic] = append(topics[topic], partition)
}

type balancer struct {
client sarama.Client
subs *Subscriptions
topics []*TopicPartitions
}

func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) {
balancer := &balancer{
client: client,
memberIDs: make([]string, 0, len(members)),
topics: make(map[string]*topicInfo),
client: client,
subs: NewSubscriptions(),
topics: []*TopicPartitions{},
}

for memberID, meta := range members {
balancer.memberIDs = append(balancer.memberIDs, memberID)
for _, topic := range meta.Topics {
if err := balancer.Topic(memberID, topic); err != nil {
balancer.subs.AddSubscriber(memberID, topic)
if err := balancer.AddTopic(topic); err != nil {
return nil, err
}
}
}

sort.Strings(balancer.memberIDs)
return balancer, nil
}

func (r *balancer) Topic(memberID string, name string) error {
info, ok := r.topics[name]
if !ok {
nums, err := r.client.Partitions(name)
if err != nil {
return err
}

r.topics[name] = &topicInfo{
MemberIDs: []string{memberID},
Partitions: nums,
func (r *balancer) AddTopic(name string) error {
for i := range r.topics {
if r.topics[i].Name == name {
return nil
}
}

return nil
nums, err := r.client.Partitions(name)
if err != nil {
return err
}

info.MemberIDs = append(info.MemberIDs, memberID)
r.topics = append(r.topics, &TopicPartitions{
Name: name,
Partitions: nums,
})
return nil
}

func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 {
switch s {
case StrategyRoundRobin:
return assignRoundRobin(r.memberIDs, r.topics)
default:
return assignRange(r.memberIDs, r.topics)
}
func (r *balancer) Perform(fn Assignor) Assignments {
return fn(r.subs, r.topics)
}

func assignRange(_ []string, topics map[string]*topicInfo) map[string]map[string][]int32 {
tlen := len(topics)
res := make(map[string]map[string][]int32)

for topic, info := range topics {
mlen := len(info.MemberIDs)
plen := len(info.Partitions)

sort.Strings(info.MemberIDs)
for pos, memberID := range info.MemberIDs {
// RangeAssignor assigns partitions to subscribed group members by
// dividing the number of partitions, per-topic, by the number of
// consumers to determine the number of partitions per consumer that
// should be assigned. If the value does not evenly divide, consumers
// lexicographically earlier will be assigned an extra partition.
func RangeAssignor(subs *Subscriptions, topics []*TopicPartitions) Assignments {
assignments := NewAssignments()

sort.Slice(topics, func(i, j int) bool { return topics[i].Name < topics[j].Name })
for _, tp := range topics {
members := subs.SubscribedMembers(tp.Name)
mlen := len(members)
plen := len(tp.Partitions)

sort.Strings(members)
for pos, memberID := range members {
n, i := float64(plen)/float64(mlen), float64(pos)
min := int(math.Floor(i*n + 0.5))
max := int(math.Floor((i+1)*n + 0.5))
sub := info.Partitions[min:max]
if len(sub) <= 0 {
continue
sub := tp.Partitions[min:max]
for i := range sub {
assignments.Assign(memberID, tp.Name, sub[i])
}

assigned, ok := res[memberID]
if !ok {
assigned = make(map[string][]int32, tlen)
res[memberID] = assigned
}
assigned[topic] = sub
}
}

return res
return assignments
}

func assignRoundRobin(memberIDs []string, topics map[string]*topicInfo) map[string]map[string][]int32 {
// RoundRobinAssignor assigns partitions by iterating through the
// list of group members and assigning one to each consumer until
// all partitions have been assigned. If a group member is not
// subscribed to a topic, the next subscribed member is assigned
// instead.
func RoundRobinAssignor(subs *Subscriptions, topics []*TopicPartitions) Assignments {
assignments := NewAssignments()
memberIDs := subs.Members()
sort.Strings(memberIDs)

r := ring.New(len(memberIDs))
Expand All @@ -169,40 +254,22 @@ func assignRoundRobin(memberIDs []string, topics map[string]*topicInfo) map[stri
r = r.Next()
}

isSubscribed := func(memberID string, topic string) bool {
info, ok := topics[topic]
if !ok {
return false
}

for _, subscriber := range info.MemberIDs {
if memberID == subscriber {
return true
}
sort.Slice(topics, func(i, j int) bool { return topics[i].Name < topics[j].Name })
for _, tp := range topics {
if len(subs.SubscribedMembers(tp.Name)) == 0 {
continue
}

return false
}

tlen := len(topics)
res := make(map[string]map[string][]int32, r.Len())

for topic, info := range topics {
for i := range info.Partitions {
for ; !isSubscribed(r.Value.(string), topic); r = r.Next() {
partitions := tp.Partitions
for i := range partitions {
for ; !subs.IsSubscribed(r.Value.(string), tp.Name); r = r.Next() {
continue
}

memberID := r.Value.(string)
assigned, ok := res[memberID]
if !ok {
assigned = make(map[string][]int32, tlen)
res[memberID] = assigned
}
assigned[topic] = append(assigned[topic], info.Partitions[i])
assignments.Assign(r.Value.(string), tp.Name, partitions[i])
r = r.Next()
}
}

return res
return assignments
}
Loading

0 comments on commit 3a860e6

Please sign in to comment.