Skip to content

Commit

Permalink
Merge pull request #28 from Shopify/client_api_1
Browse files Browse the repository at this point in the history
Expose some client methods.
  • Loading branch information
eapache committed Aug 26, 2013
2 parents fed6dbf + 39a2698 commit b7bdd28
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 35 deletions.
78 changes: 49 additions & 29 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (client *Client,
client.leaders = make(map[string]map[int32]int32)

// do an initial fetch of all cluster metadata by specifing an empty list of topics
err = client.refreshTopics(make([]string, 0), client.config.MetadataRetries)
err = client.RefreshAllMetadata()
if err != nil {
client.Close() // this closes tmp, since it's still in the brokers hash
return nil, err
Expand Down Expand Up @@ -87,43 +87,70 @@ func (client *Client) Close() error {
return nil
}

// functions for use by producers and consumers
// if Go had the concept they would be marked 'protected'

func (client *Client) leader(topic string, partition_id int32) (*Broker, error) {
leader := client.cachedLeader(topic, partition_id)
// Partitions returns the sorted list of available partition IDs for the given topic.
func (client *Client) Partitions(topic string) ([]int32, error) {
partitions := client.cachedPartitions(topic)

if leader == nil {
err := client.refreshTopic(topic)
if partitions == nil {
err := client.RefreshTopicMetadata(topic)
if err != nil {
return nil, err
}
leader = client.cachedLeader(topic, partition_id)
partitions = client.cachedPartitions(topic)
}

if leader == nil {
return nil, UNKNOWN_TOPIC_OR_PARTITION
if partitions == nil {
return nil, NoSuchTopic
}

return leader, nil
return partitions, nil
}

func (client *Client) partitions(topic string) ([]int32, error) {
partitions := client.cachedPartitions(topic)
// Topics returns the set of available topics as retrieved from the cluster metadata.
func (client *Client) Topics() ([]string, error) {
client.lock.RLock()
defer client.lock.RUnlock()

if partitions == nil {
err := client.refreshTopic(topic)
ret := make([]string, 0, len(client.leaders))
for topic, _ := range client.leaders {
ret = append(ret, topic)
}

return ret, nil
}

// RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics.
func (client *Client) RefreshTopicMetadata(topics ...string) error {
return client.refreshMetadata(topics, client.config.MetadataRetries)
}

// RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
func (client *Client) RefreshAllMetadata() error {
// Kafka refreshes all when you encode it an empty array...
return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries)
}

// functions for use by producers and consumers
// if Go had the concept they would be marked 'protected'
// TODO: see https://github.com/Shopify/sarama/issues/23

func (client *Client) leader(topic string, partition_id int32) (*Broker, error) {
leader := client.cachedLeader(topic, partition_id)

if leader == nil {
err := client.RefreshTopicMetadata(topic)
if err != nil {
return nil, err
}
partitions = client.cachedPartitions(topic)
leader = client.cachedLeader(topic, partition_id)
}

if partitions == nil {
return nil, NoSuchTopic
if leader == nil {
return nil, UNKNOWN_TOPIC_OR_PARTITION
}

return partitions, nil
return leader, nil
}

func (client *Client) disconnectBroker(broker *Broker) {
Expand All @@ -147,16 +174,9 @@ func (client *Client) disconnectBroker(broker *Broker) {
go broker.Close()
}

func (client *Client) refreshTopic(topic string) error {
tmp := make([]string, 1)
tmp[0] = topic
// we permit three retries by default, 'cause that seemed like a nice number
return client.refreshTopics(tmp, client.config.MetadataRetries)
}

// truly private helper functions

func (client *Client) refreshTopics(topics []string, retries int) error {
func (client *Client) refreshMetadata(topics []string, retries int) error {
for broker := client.any(); broker != nil; broker = client.any() {
response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})

Expand All @@ -174,7 +194,7 @@ func (client *Client) refreshTopics(topics []string, retries int) error {
return LEADER_NOT_AVAILABLE
}
time.Sleep(client.config.WaitForElection) // wait for leader election
return client.refreshTopics(retry, retries-1)
return client.refreshMetadata(retry, retries-1)
}
case EncodingError:
// didn't even send, return the error
Expand Down
11 changes: 9 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,14 @@ func TestClientMetadata(t *testing.T) {
}
defer client.Close()

parts, err := client.partitions("my_topic")
topics, err := client.Topics()
if err != nil {
t.Error(err)
} else if len(topics) != 1 || topics[0] != "my_topic" {
t.Error("Client returned incorrect topics:", topics)
}

parts, err := client.Partitions("my_topic")
if err != nil {
t.Error(err)
} else if len(parts) != 1 || parts[0] != 0 {
Expand Down Expand Up @@ -137,7 +144,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
}
defer client.Close()

parts, err := client.partitions("my_topic")
parts, err := client.Partitions("my_topic")
if err != nil {
t.Error(err)
} else if len(parts) != 1 || parts[0] != 0xb {
Expand Down
4 changes: 2 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (c *Consumer) fetchMessages() {
case NO_ERROR:
break
case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
err = c.client.refreshTopic(c.topic)
err = c.client.RefreshTopicMetadata(c.topic)
if c.sendError(err) {
for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
if !c.sendError(err) {
Expand Down Expand Up @@ -317,7 +317,7 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
if !retry {
return -1, block.Err
}
err = c.client.refreshTopic(c.topic)
err = c.client.RefreshTopicMetadata(c.topic)
if err != nil {
return -1, err
}
Expand Down
4 changes: 2 additions & 2 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (p *Producer) SendMessage(key, value Encoder) error {
}

func (p *Producer) choosePartition(key Encoder) (int32, error) {
partitions, err := p.client.partitions(p.topic)
partitions, err := p.client.Partitions(p.topic)
if err != nil {
return -1, err
}
Expand Down Expand Up @@ -133,7 +133,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
if !retry {
return block.Err
}
err = p.client.refreshTopic(p.topic)
err = p.client.RefreshTopicMetadata(p.topic)
if err != nil {
return err
}
Expand Down

0 comments on commit b7bdd28

Please sign in to comment.