Skip to content

Commit

Permalink
FIX: It shouldn't sort the replicas and isr while the order is
Browse files Browse the repository at this point in the history
meaningful to Kafka(preferred leader)
  • Loading branch information
git-hulk committed Aug 11, 2017
1 parent 2fd980e commit 4b5702e
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 22 deletions.
4 changes: 2 additions & 2 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func (client *client) Replicas(topic string, partitionID int32) ([]int32, error)
if metadata.Err == ErrReplicaNotAvailable {
return nil, metadata.Err
}
return dupeAndSort(metadata.Replicas), nil
return metadata.Replicas, nil
}

func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) {
Expand All @@ -322,7 +322,7 @@ func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32,
if metadata.Err == ErrReplicaNotAvailable {
return nil, metadata.Err
}
return dupeAndSort(metadata.Isr), nil
return metadata.Isr, nil
}

func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
Expand Down
18 changes: 9 additions & 9 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,23 +188,23 @@ func TestClientMetadata(t *testing.T) {
replicas, err = client.Replicas("my_topic", 0)
if err != nil {
t.Error(err)
} else if replicas[0] != 1 {
t.Error("Incorrect (or unsorted) replica")
} else if replicas[1] != 3 {
t.Error("Incorrect (or unsorted) replica")
} else if replicas[0] != 3 {
t.Error("Incorrect (or sorted) replica")
} else if replicas[1] != 1 {
t.Error("Incorrect (or sorted) replica")
} else if replicas[2] != 5 {
t.Error("Incorrect (or unsorted) replica")
t.Error("Incorrect (or sorted) replica")
}

isr, err = client.InSyncReplicas("my_topic", 0)
if err != nil {
t.Error(err)
} else if len(isr) != 2 {
t.Error("Client returned incorrect ISRs for partition:", isr)
} else if isr[0] != 1 {
t.Error("Incorrect (or unsorted) ISR:", isr)
} else if isr[1] != 5 {
t.Error("Incorrect (or unsorted) ISR:", isr)
} else if isr[0] != 5 {
t.Error("Incorrect (or sorted) ISR:", isr)
} else if isr[1] != 1 {
t.Error("Incorrect (or sorted) ISR:", isr)
}

leader.Close()
Expand Down
11 changes: 0 additions & 11 deletions utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package sarama
import (
"bufio"
"net"
"sort"
)

type none struct{}
Expand All @@ -23,16 +22,6 @@ func (slice int32Slice) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}

func dupeAndSort(input []int32) []int32 {
ret := make([]int32, 0, len(input))
for _, val := range input {
ret = append(ret, val)
}

sort.Sort(int32Slice(ret))
return ret
}

func withRecover(fn func()) {
defer func() {
handler := PanicHandler
Expand Down

0 comments on commit 4b5702e

Please sign in to comment.