Skip to content

Commit

Permalink
When OutOfBrokers, retry with previously-disconnected brokers.
Browse files Browse the repository at this point in the history
This is a partial workaround for #15. Really, broker management should
be completely rearchitected, but today is not the day for that.
  • Loading branch information
burke committed Dec 18, 2013
1 parent e9d9675 commit 87b7e45
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Client struct {
// so we store them separately
extraBrokerAddrs []string
extraBroker *Broker
deadBrokerAddrs []string

brokers map[int32]*Broker // maps broker ids to brokers
leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
Expand Down Expand Up @@ -167,6 +168,8 @@ func (client *Client) disconnectBroker(broker *Broker) {
client.lock.Lock()
defer client.lock.Unlock()

client.deadBrokerAddrs = append(client.deadBrokerAddrs, broker.addr)

if broker == client.extraBroker {
client.extraBrokerAddrs = client.extraBrokerAddrs[1:]
if len(client.extraBrokerAddrs) > 0 {
Expand Down Expand Up @@ -223,9 +226,38 @@ func (client *Client) refreshMetadata(topics []string, retries int) error {
client.disconnectBroker(broker)
}

if retries > 0 {
time.Sleep(client.config.WaitForElection)
client.resurrectDeadBrokers()
return client.refreshMetadata(topics, retries-1)
}

return OutOfBrokers
}

func (client *Client) resurrectDeadBrokers() {
Logger.Println("Ran out of connectable brokers. Retrying with brokers marked dead.")
client.lock.Lock()
defer client.lock.Unlock()

brokers := make(map[string]struct{})
for _, addr := range client.deadBrokerAddrs {
brokers[addr] = struct{}{}
}
for _, addr := range client.extraBrokerAddrs {
brokers[addr] = struct{}{}
}

client.deadBrokerAddrs = []string{}
client.extraBrokerAddrs = []string{}
for addr, _ := range brokers {
client.extraBrokerAddrs = append(client.extraBrokerAddrs, addr)
}

client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
client.extraBroker.Open(client.config.ConcurrencyPerBroker)
}

func (client *Client) any() *Broker {
client.lock.RLock()
defer client.lock.RUnlock()
Expand Down

0 comments on commit 87b7e45

Please sign in to comment.