Skip to content

Commit

Permalink
Make Client.Leader() public.
Browse files Browse the repository at this point in the history
Also add a more comprehensive comment to disconnectBroker().

This is the last piece of #23 that doesn't heavily depend on getting #15 right
first.
  • Loading branch information
Evan Huus committed Aug 27, 2013
1 parent b7bdd28 commit d1bbdae
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 27 deletions.
40 changes: 21 additions & 19 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,9 @@ func (client *Client) Topics() ([]string, error) {
return ret, nil
}

// RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics.
func (client *Client) RefreshTopicMetadata(topics ...string) error {
return client.refreshMetadata(topics, client.config.MetadataRetries)
}

// RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
func (client *Client) RefreshAllMetadata() error {
// Kafka refreshes all when you encode it an empty array...
return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries)
}

// functions for use by producers and consumers
// if Go had the concept they would be marked 'protected'
// TODO: see https://github.com/Shopify/sarama/issues/23

func (client *Client) leader(topic string, partition_id int32) (*Broker, error) {
// Leader returns the broker object that is the leader of the current topic/partition, as
// determined by querying the cluster metadata.
func (client *Client) Leader(topic string, partition_id int32) (*Broker, error) {
leader := client.cachedLeader(topic, partition_id)

if leader == nil {
Expand All @@ -153,6 +139,24 @@ func (client *Client) leader(topic string, partition_id int32) (*Broker, error)
return leader, nil
}

// RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics.
func (client *Client) RefreshTopicMetadata(topics ...string) error {
return client.refreshMetadata(topics, client.config.MetadataRetries)
}

// RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
func (client *Client) RefreshAllMetadata() error {
// Kafka refreshes all when you encode it an empty array...
return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries)
}

// misc private helper functions

// XXX: see https://github.com/Shopify/sarama/issues/15
// and https://github.com/Shopify/sarama/issues/23
// disconnectBroker is a bad hacky way to accomplish broker management. It should be replaced with
// something sane and the replacement should be made part of the public Client API
func (client *Client) disconnectBroker(broker *Broker) {
client.lock.Lock()
defer client.lock.Unlock()
Expand All @@ -174,8 +178,6 @@ func (client *Client) disconnectBroker(broker *Broker) {
go broker.Close()
}

// truly private helper functions

func (client *Client) refreshMetadata(topics []string, retries int) error {
for broker := client.any(); broker != nil; broker = client.any() {
response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestClientMetadata(t *testing.T) {
t.Error("Client returned incorrect partitions for my_topic:", parts)
}

tst, err := client.leader("my_topic", 0)
tst, err := client.Leader("my_topic", 0)
if err != nil {
t.Error(err)
} else if tst.ID() != 5 {
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
t.Error("Client returned incorrect partitions for my_topic:", parts)
}

tst, err := client.leader("my_topic", 0xb)
tst, err := client.Leader("my_topic", 0xb)
if err != nil {
t.Error(err)
} else if tst.ID() != 0xaa {
Expand Down
10 changes: 5 additions & 5 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
return nil, ConfigurationError("Invalid EventBufferSize")
}

broker, err := client.leader(topic, partition)
broker, err := client.Leader(topic, partition)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (c *Consumer) fetchMessages() {
}
default:
c.client.disconnectBroker(c.broker)
for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
for c.broker = nil; err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
if !c.sendError(err) {
return
}
Expand All @@ -217,7 +217,7 @@ func (c *Consumer) fetchMessages() {
case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
err = c.client.RefreshTopicMetadata(c.topic)
if c.sendError(err) {
for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
for c.broker = nil; err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
if !c.sendError(err) {
return
}
Expand Down Expand Up @@ -295,7 +295,7 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
return -1, err
}
c.client.disconnectBroker(c.broker)
c.broker, err = c.client.leader(c.topic, c.partition)
c.broker, err = c.client.Leader(c.topic, c.partition)
if err != nil {
return -1, err
}
Expand All @@ -321,7 +321,7 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
if err != nil {
return -1, err
}
c.broker, err = c.client.leader(c.topic, c.partition)
c.broker, err = c.client.Leader(c.topic, c.partition)
if err != nil {
return -1, err
}
Expand Down
2 changes: 1 addition & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
return err
}

broker, err := p.client.leader(p.topic, partition)
broker, err := p.client.Leader(p.topic, partition)
if err != nil {
return err
}
Expand Down

0 comments on commit d1bbdae

Please sign in to comment.