Skip to content

Commit

Permalink
Lazily connect to brokers in the client
Browse files Browse the repository at this point in the history
Instead of opening a connection to all brokers immediately upon receiving their
information in metadata, wait until we are asked for them either via a call to
`Leader` or a call to `any`.
  • Loading branch information
eapache committed Mar 12, 2015
1 parent d61a67c commit f7f2816
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 13 deletions.
8 changes: 3 additions & 5 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,12 @@ func (client *Client) any() *Broker {
defer client.lock.RUnlock()

if client.seedBroker != nil {
_ = client.seedBroker.Open(client.conf)
return client.seedBroker
}

for _, broker := range client.brokers {
_ = broker.Open(client.conf)
return broker
}

Expand Down Expand Up @@ -436,6 +438,7 @@ func (client *Client) cachedLeader(topic string, partitionID int32) (*Broker, er
if b == nil {
return nil, ErrLeaderNotAvailable
}
_ = b.Open(client.conf)
return b, nil
}
}
Expand Down Expand Up @@ -538,17 +541,12 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
// - if it is a new ID, save it
// - if it is an existing ID, but the address we have is stale, discard the old one and save it
// - otherwise ignore it, replacing our existing one would just bounce the connection
// We asynchronously try to open connections to the new brokers. We don't care if they
// fail, since maybe that broker is unreachable but doesn't have a topic we care about.
// If it fails and we do care, whoever tries to use it will get the connection error.
for _, broker := range data.Brokers {
if client.brokers[broker.ID()] == nil {
_ = broker.Open(client.conf)
client.brokers[broker.ID()] = broker
Logger.Printf("Registered new broker #%d at %s", broker.ID(), broker.Addr())
} else if broker.Addr() != client.brokers[broker.ID()].Addr() {
safeAsyncClose(client.brokers[broker.ID()])
_ = broker.Open(client.conf)
client.brokers[broker.ID()] = broker
Logger.Printf("Replaced registered broker #%d with %s", broker.ID(), broker.Addr())
}
Expand Down
12 changes: 4 additions & 8 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ func TestSimpleClient(t *testing.T) {

func TestCachedPartitions(t *testing.T) {
seedBroker := newMockBroker(t, 1)
leader := newMockBroker(t, 5)

replicas := []int32{3, 1, 5}
isr := []int32{5, 1}

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, ErrLeaderNotAvailable)
metadataResponse.AddBroker("localhost:12345", 2)
metadataResponse.AddTopicPartition("my_topic", 0, 2, replicas, isr, ErrNoError)
metadataResponse.AddTopicPartition("my_topic", 1, 2, replicas, isr, ErrLeaderNotAvailable)
seedBroker.Returns(metadataResponse)

config := NewConfig()
Expand All @@ -61,25 +60,22 @@ func TestCachedPartitions(t *testing.T) {
t.Fatal("Not using the cache!")
}

leader.Close()
seedBroker.Close()
safeClose(t, client)
}

func TestClientSeedBrokers(t *testing.T) {
seedBroker := newMockBroker(t, 1)
discoveredBroker := newMockBroker(t, 2)

metadataResponse := new(MetadataResponse)
metadataResponse.AddBroker(discoveredBroker.Addr(), discoveredBroker.BrokerID())
metadataResponse.AddBroker("localhost:12345", 2)
seedBroker.Returns(metadataResponse)

client, err := NewClient([]string{seedBroker.Addr()}, nil)
if err != nil {
t.Fatal(err)
}

discoveredBroker.Close()
seedBroker.Close()
safeClose(t, client)
}
Expand Down

0 comments on commit f7f2816

Please sign in to comment.