diff --git a/client.go b/client.go index efc3e8575..5a232c949 100644 --- a/client.go +++ b/client.go @@ -26,6 +26,7 @@ type Client struct { // so we store them separately extraBrokerAddrs []string extraBroker *Broker + deadBrokerAddrs []string brokers map[int32]*Broker // maps broker ids to brokers leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids @@ -167,6 +168,8 @@ func (client *Client) disconnectBroker(broker *Broker) { client.lock.Lock() defer client.lock.Unlock() + client.deadBrokerAddrs = append(client.deadBrokerAddrs, broker.addr) + if broker == client.extraBroker { client.extraBrokerAddrs = client.extraBrokerAddrs[1:] if len(client.extraBrokerAddrs) > 0 { @@ -223,9 +226,38 @@ func (client *Client) refreshMetadata(topics []string, retries int) error { client.disconnectBroker(broker) } + if retries > 0 { + time.Sleep(client.config.WaitForElection) + client.resurrectDeadBrokers() + return client.refreshMetadata(topics, retries-1) + } + return OutOfBrokers } +func (client *Client) resurrectDeadBrokers() { + Logger.Println("Ran out of connectable brokers. Retrying with brokers marked dead.") + client.lock.Lock() + defer client.lock.Unlock() + + brokers := make(map[string]struct{}) + for _, addr := range client.deadBrokerAddrs { + brokers[addr] = struct{}{} + } + for _, addr := range client.extraBrokerAddrs { + brokers[addr] = struct{}{} + } + + client.deadBrokerAddrs = []string{} + client.extraBrokerAddrs = []string{} + for addr, _ := range brokers { + client.extraBrokerAddrs = append(client.extraBrokerAddrs, addr) + } + + client.extraBroker = NewBroker(client.extraBrokerAddrs[0]) + client.extraBroker.Open(client.config.ConcurrencyPerBroker) +} + func (client *Client) any() *Broker { client.lock.RLock() defer client.lock.RUnlock()