Skip to content

Commit

Permalink
switch IntoSyncAssignmentOrError to GroupMemberBalancerOrError
Browse files Browse the repository at this point in the history
From an API perspective, failing in Balance makes more sense than
failing during the conversion of any internal plan to the kgo plan.

We also now support failing from the helper ConsumerBalancer, without
requiring an additional helper API support.
  • Loading branch information
twmb committed May 1, 2022
1 parent b5256c7 commit e8e5117
Showing 1 changed file with 44 additions and 23 deletions.
67 changes: 44 additions & 23 deletions pkg/kgo/group_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,35 +63,35 @@ type GroupBalancer interface {
IsCooperative() bool
}

// GroupMemberBalancer balances topics amongst group members.
// GroupMemberBalancer balances topics amongst group members. If your balancing
// can fail, you can implement GroupMemberBalancerOrError.
type GroupMemberBalancer interface {
// Balance balances topics and partitions among group members, where
// the int32 in the topics map corresponds to the number of partitions
// known to be in each topic.
Balance(topics map[string]int32) IntoSyncAssignment
}

// GroupMemberBalancerOrError is an optional extension interface for
// GroupMemberBalancer. This can be implemented if your balance function can
// fail.
//
// For interface purposes, it is required to implement GroupMemberBalancer, but
// Balance will never be called.
type GroupMemberBalancerOrError interface {
GroupMemberBalancer
BalanceOrError(topics map[string]int32) (IntoSyncAssignment, error)
}

// IntoSyncAssignment takes a balance plan and returns a list of assignments to
// use in a kmsg.SyncGroupRequest.
//
// It is recommended to ensure the output is deterministic and ordered by
// member / topic / partitions. If your assignment can fail, you can implement
// the optional IntoSyncAssignmentOrError.
// member / topic / partitions.
type IntoSyncAssignment interface {
IntoSyncAssignment() []kmsg.SyncGroupRequestGroupAssignment
}

// IntoSyncAssignmentOrError is an optional extension interface for
// IntoSyncAssignment. This can be implemented if your assignment function can
// fail.
//
// For interface purposes, it is required to implement IntoSyncAssignment, but
// that function will never be called.
type IntoSyncAssignmentOrError interface {
IntoSyncAssignment
IntoSyncAssignmentOrError() ([]kmsg.SyncGroupRequestGroupAssignment, error)
}

// ConsumerBalancer is a helper type for writing balance plans that use the
// "consumer" protocol, such that each member uses a kmsg.ConsumerMemberMetadata
// in its join group request.
Expand All @@ -100,11 +100,19 @@ type ConsumerBalancer struct {
members []kmsg.JoinGroupResponseMember
metadatas []kmsg.ConsumerMemberMetadata
topics map[string]struct{}

err error
}

// Balance satisfies the GroupMemberBalancer interface, but is never called
// because GroupMemberBalancerOrError exists.
func (*ConsumerBalancer) Balance(map[string]int32) IntoSyncAssignment {
panic("unreachable")
}

// Balance satisfies the GroupMemberBalancer interface.
func (b *ConsumerBalancer) Balance(topics map[string]int32) IntoSyncAssignment {
return b.b.Balance(b, topics)
// Balance satisfies the GroupMemberBalancerOrError interface.
func (b *ConsumerBalancer) BalanceOrError(topics map[string]int32) (IntoSyncAssignment, error) {
return b.b.Balance(b, topics), b.err
}

// Members returns the list of input members for this group balancer.
Expand All @@ -125,6 +133,12 @@ func (b *ConsumerBalancer) MemberAt(n int) (*kmsg.JoinGroupResponseMember, *kmsg
return &b.members[n], &b.metadatas[n]
}

// SetError allows you to set any error that occurred while balancing. This
// allows you to fail balancing and return nil from Balance.
func (b *ConsumerBalancer) SetError(err error) {
b.err = err
}

// MemberTopics returns the unique set of topics that all members are
// interested in.
//
Expand Down Expand Up @@ -152,7 +166,9 @@ func (b *ConsumerBalancer) NewPlan() *BalancePlan {
//
// This is a complicated interface, but in short, this interface has one
// function that implements the actual balancing logic: using the input
// balancer, balance the input topics and partitions.
// balancer, balance the input topics and partitions. If your balancing can
// fail, you can use ConsumerBalancer.SetError(...) to return an error from
// balancing, and then you can simply return nil from Balance.
type ConsumerBalancerBalance interface {
Balance(*ConsumerBalancer, map[string]int32) IntoSyncAssignment
}
Expand Down Expand Up @@ -433,17 +449,22 @@ func (g *groupConsumer) balanceGroup(proto string, members []kmsg.JoinGroupRespo
// If the returned IntoSyncAssignment is a BalancePlan, which it likely
// is if the balancer is a ConsumerBalancer, then we can again print
// more useful debugging information.
into := memberBalancer.Balance(topicPartitionCount)
var into IntoSyncAssignment
if memberBalancerOrErr, ok := memberBalancer.(GroupMemberBalancerOrError); ok {
into, err = memberBalancerOrErr.BalanceOrError(topicPartitionCount)
} else {
into = memberBalancer.Balance(topicPartitionCount)
}
if err != nil {
return nil, err
}

if p, ok := into.(*BalancePlan); ok {
g.cl.cfg.logger.Log(LogLevelInfo, "balanced", "plan", p.String())
} else {
g.cl.cfg.logger.Log(LogLevelInfo, "unable to log balance plan: the user has returned a custom IntoSyncAssignment (not a *BalancePlan)")
}

if intoOrErr, ok := into.(IntoSyncAssignmentOrError); ok {
return intoOrErr.IntoSyncAssignmentOrError()
}

return into.IntoSyncAssignment(), nil
}

Expand Down

0 comments on commit e8e5117

Please sign in to comment.