Skip to content

Commit

Permalink
Register new brokers from consumer metadata responses.
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanbergen committed Mar 21, 2015
1 parent c0cd9ce commit b5466ce
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package sarama

import (
"fmt"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -382,8 +383,15 @@ func (client *client) getOffsetLeader(consumerGroup string, attemptsRemaining in

switch response.Err {
case ErrNoError:
// TODO: register new brokers
broker := client.brokers[response.CoordinatorID]
broker, ok := client.brokers[response.CoordinatorID]
if !ok {
broker = &Broker{
id: response.CoordinatorID,
addr: fmt.Sprintf("%s:%d", response.CoordinatorHost, response.CoordinatorPort),
}

client.brokers[response.CoordinatorID] = broker
}
_ = broker.Open(client.conf)
return broker, nil

Expand Down

0 comments on commit b5466ce

Please sign in to comment.