From d1bbdae1d59316586ed67ff44e3015fb94a3bde8 Mon Sep 17 00:00:00 2001 From: Evan Huus Date: Tue, 27 Aug 2013 10:18:49 -0400 Subject: [PATCH] Make Client.Leader() public. Also add a more comprehensive comment to disconnectBroker(). This is the last piece of #23 that doesn't heavily depend on getting #15 right first. --- client.go | 40 +++++++++++++++++++++------------------- client_test.go | 4 ++-- consumer.go | 10 +++++----- producer.go | 2 +- 4 files changed, 29 insertions(+), 27 deletions(-) diff --git a/client.go b/client.go index d041a2f48..d70bd465b 100644 --- a/client.go +++ b/client.go @@ -119,23 +119,9 @@ func (client *Client) Topics() ([]string, error) { return ret, nil } -// RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the -// available metadata for those topics. -func (client *Client) RefreshTopicMetadata(topics ...string) error { - return client.refreshMetadata(topics, client.config.MetadataRetries) -} - -// RefreshAllMetadata queries the cluster to refresh the available metadata for all topics. -func (client *Client) RefreshAllMetadata() error { - // Kafka refreshes all when you encode it an empty array... - return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries) -} - -// functions for use by producers and consumers -// if Go had the concept they would be marked 'protected' -// TODO: see https://github.com/Shopify/sarama/issues/23 - -func (client *Client) leader(topic string, partition_id int32) (*Broker, error) { +// Leader returns the broker object that is the leader of the current topic/partition, as +// determined by querying the cluster metadata. +func (client *Client) Leader(topic string, partition_id int32) (*Broker, error) { leader := client.cachedLeader(topic, partition_id) if leader == nil { @@ -153,6 +139,24 @@ func (client *Client) leader(topic string, partition_id int32) (*Broker, error) return leader, nil } +// RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the +// available metadata for those topics. +func (client *Client) RefreshTopicMetadata(topics ...string) error { + return client.refreshMetadata(topics, client.config.MetadataRetries) +} + +// RefreshAllMetadata queries the cluster to refresh the available metadata for all topics. +func (client *Client) RefreshAllMetadata() error { + // Kafka refreshes all when you encode it an empty array... + return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries) +} + +// misc private helper functions + +// XXX: see https://github.com/Shopify/sarama/issues/15 +// and https://github.com/Shopify/sarama/issues/23 +// disconnectBroker is a bad hacky way to accomplish broker management. It should be replaced with +// something sane and the replacement should be made part of the public Client API func (client *Client) disconnectBroker(broker *Broker) { client.lock.Lock() defer client.lock.Unlock() @@ -174,8 +178,6 @@ func (client *Client) disconnectBroker(broker *Broker) { go broker.Close() } -// truly private helper functions - func (client *Client) refreshMetadata(topics []string, retries int) error { for broker := client.any(); broker != nil; broker = client.any() { response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics}) diff --git a/client_test.go b/client_test.go index 1b0547df8..ea6da71a3 100644 --- a/client_test.go +++ b/client_test.go @@ -90,7 +90,7 @@ func TestClientMetadata(t *testing.T) { t.Error("Client returned incorrect partitions for my_topic:", parts) } - tst, err := client.leader("my_topic", 0) + tst, err := client.Leader("my_topic", 0) if err != nil { t.Error(err) } else if tst.ID() != 5 { @@ -151,7 +151,7 @@ func TestClientRefreshBehaviour(t *testing.T) { t.Error("Client returned incorrect partitions for my_topic:", parts) } - tst, err := client.leader("my_topic", 0xb) + tst, err := client.Leader("my_topic", 0xb) if err != nil { t.Error(err) } else if tst.ID() != 0xaa { diff --git a/consumer.go b/consumer.go index 0b59c754d..a1f60fa1d 100644 --- a/consumer.go +++ b/consumer.go @@ -96,7 +96,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co return nil, ConfigurationError("Invalid EventBufferSize") } - broker, err := client.leader(topic, partition) + broker, err := client.Leader(topic, partition) if err != nil { return nil, err } @@ -194,7 +194,7 @@ func (c *Consumer) fetchMessages() { } default: c.client.disconnectBroker(c.broker) - for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) { + for c.broker = nil; err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) { if !c.sendError(err) { return } @@ -217,7 +217,7 @@ func (c *Consumer) fetchMessages() { case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE: err = c.client.RefreshTopicMetadata(c.topic) if c.sendError(err) { - for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) { + for c.broker = nil; err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) { if !c.sendError(err) { return } @@ -295,7 +295,7 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) { return -1, err } c.client.disconnectBroker(c.broker) - c.broker, err = c.client.leader(c.topic, c.partition) + c.broker, err = c.client.Leader(c.topic, c.partition) if err != nil { return -1, err } @@ -321,7 +321,7 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) { if err != nil { return -1, err } - c.broker, err = c.client.leader(c.topic, c.partition) + c.broker, err = c.client.Leader(c.topic, c.partition) if err != nil { return -1, err } diff --git a/producer.go b/producer.go index 8da7254ab..9a321c453 100644 --- a/producer.go +++ b/producer.go @@ -95,7 +95,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error { return err } - broker, err := p.client.leader(p.topic, partition) + broker, err := p.client.Leader(p.topic, partition) if err != nil { return err }