Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Client.Leader() public. #30

Merged
merged 1 commit into from
Aug 27, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,23 +119,9 @@ func (client *Client) Topics() ([]string, error) {
return ret, nil
}

// RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics.
func (client *Client) RefreshTopicMetadata(topics ...string) error {
return client.refreshMetadata(topics, client.config.MetadataRetries)
}

// RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
func (client *Client) RefreshAllMetadata() error {
// Kafka refreshes all when you encode it an empty array...
return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries)
}

// functions for use by producers and consumers
// if Go had the concept they would be marked 'protected'
// TODO: see https://github.com/Shopify/sarama/issues/23

func (client *Client) leader(topic string, partition_id int32) (*Broker, error) {
// Leader returns the broker object that is the leader of the current topic/partition, as
// determined by querying the cluster metadata.
func (client *Client) Leader(topic string, partition_id int32) (*Broker, error) {
leader := client.cachedLeader(topic, partition_id)

if leader == nil {
Expand All @@ -153,6 +139,24 @@ func (client *Client) leader(topic string, partition_id int32) (*Broker, error)
return leader, nil
}

// RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics.
func (client *Client) RefreshTopicMetadata(topics ...string) error {
return client.refreshMetadata(topics, client.config.MetadataRetries)
}

// RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
func (client *Client) RefreshAllMetadata() error {
// Kafka refreshes all when you encode it an empty array...
return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries)
}

// misc private helper functions

// XXX: see https://github.com/Shopify/sarama/issues/15
// and https://github.com/Shopify/sarama/issues/23
// disconnectBroker is a bad hacky way to accomplish broker management. It should be replaced with
// something sane and the replacement should be made part of the public Client API
func (client *Client) disconnectBroker(broker *Broker) {
client.lock.Lock()
defer client.lock.Unlock()
Expand All @@ -174,8 +178,6 @@ func (client *Client) disconnectBroker(broker *Broker) {
go broker.Close()
}

// truly private helper functions

func (client *Client) refreshMetadata(topics []string, retries int) error {
for broker := client.any(); broker != nil; broker = client.any() {
response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func TestClientMetadata(t *testing.T) {
t.Error("Client returned incorrect partitions for my_topic:", parts)
}

tst, err := client.leader("my_topic", 0)
tst, err := client.Leader("my_topic", 0)
if err != nil {
t.Error(err)
} else if tst.ID() != 5 {
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
t.Error("Client returned incorrect partitions for my_topic:", parts)
}

tst, err := client.leader("my_topic", 0xb)
tst, err := client.Leader("my_topic", 0xb)
if err != nil {
t.Error(err)
} else if tst.ID() != 0xaa {
Expand Down
10 changes: 5 additions & 5 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
return nil, ConfigurationError("Invalid EventBufferSize")
}

broker, err := client.leader(topic, partition)
broker, err := client.Leader(topic, partition)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (c *Consumer) fetchMessages() {
}
default:
c.client.disconnectBroker(c.broker)
for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
for c.broker = nil; err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
if !c.sendError(err) {
return
}
Expand All @@ -217,7 +217,7 @@ func (c *Consumer) fetchMessages() {
case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
err = c.client.RefreshTopicMetadata(c.topic)
if c.sendError(err) {
for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
for c.broker = nil; err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
if !c.sendError(err) {
return
}
Expand Down Expand Up @@ -295,7 +295,7 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
return -1, err
}
c.client.disconnectBroker(c.broker)
c.broker, err = c.client.leader(c.topic, c.partition)
c.broker, err = c.client.Leader(c.topic, c.partition)
if err != nil {
return -1, err
}
Expand All @@ -321,7 +321,7 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
if err != nil {
return -1, err
}
c.broker, err = c.client.leader(c.topic, c.partition)
c.broker, err = c.client.Leader(c.topic, c.partition)
if err != nil {
return -1, err
}
Expand Down
2 changes: 1 addition & 1 deletion producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
return err
}

broker, err := p.client.leader(p.topic, partition)
broker, err := p.client.Leader(p.topic, partition)
if err != nil {
return err
}
Expand Down