Skip to content

Commit

Permalink
Register new brokers from consumer metadata responses.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Mar 21, 2015
1 parent c0cd9ce commit d441c77
Showing 1 changed file with 9 additions and 1 deletion.
10 changes: 9 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -382,8 +383,15 @@ func (client *client) getOffsetLeader(consumerGroup string, attemptsRemaining in

switch response.Err {
case ErrNoError:
// TODO: register new brokers
broker := client.brokers[response.CoordinatorID]
if broker == nil {
client.brokers[response.CoordinatorID] = &Broker{
id: response.CoordinatorID,
addr: fmt.Sprintf("%s:%d", response.CoordinatorHost, response.CoordinatorPort),
}

broker = client.brokers[response.CoordinatorID]
}
_ = broker.Open(client.conf)
return broker, nil

Expand Down

0 comments on commit d441c77

Please sign in to comment.