Skip to content

Commit

Permalink
fix: have requestedassignment respect rack (strimzi#190)
Browse files Browse the repository at this point in the history
Signed-off-by: kwall <kwall@apache.org>
  • Loading branch information
k-wall committed Jun 13, 2022
1 parent 2302fe6 commit 195e852
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 8 deletions.
59 changes: 51 additions & 8 deletions internal/services/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,12 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) {
func (ts *TopicService) Close() {
glog.Infof("Closing topic service")

if err := ts.admin.Close(); err != nil {
glog.Fatalf("Error closing the Sarama cluster admin: %v", err)
if ts.admin != nil {
if err := ts.admin.Close(); err != nil {
glog.Fatalf("Error closing the Sarama cluster admin: %v", err)
}
ts.admin = nil
}
ts.admin = nil
glog.Infof("Topic service closed")
}

Expand Down Expand Up @@ -312,14 +314,55 @@ func (ts *TopicService) requestedAssignments(currentPartitions int, brokers []*s
return brokers[i].ID() < brokers[j].ID()
})

assignments := make(map[int32][]int32, int(partitions))
rackMap := make(map[string][]*sarama.Broker)
var rackNames []string
brokersWithRack := 0
for _, broker := range brokers {
if broker.Rack() != "" {
brokersWithRack++
if _, ok := rackMap[broker.Rack()]; !ok {
rackMap[broker.Rack()] = make([]*sarama.Broker, 0)
rackNames = append(rackNames, broker.Rack())
}
rackMap[broker.Rack()] = append(rackMap[broker.Rack()], broker)
}
}

if len(brokers) != brokersWithRack {
if brokersWithRack > 0 {
glog.Warningf("Not *all* brokers have rack assignments (%d/%d), topic %s will be created without rack awareness", brokersWithRack, len(brokers), ts.canaryConfig.Topic)
}
} else {
index := 0

for ;; {
again := false

for _, rackName := range rackNames {
brokerList := rackMap[rackName]
if len(brokerList) > 0 {
var head *sarama.Broker
head, rackMap[rackName] = brokerList[0], brokerList[1:]
brokers[index] = head
index++
again = true
}
}

if !again {
break
}
}
}

assignments := make(map[int32][]int32, partitions)
for p := 0; p < int(partitions); p++ {
assignments[int32(p)] = make([]int32, int(replicationFactor))
assignments[int32(p)] = make([]int32, replicationFactor)
k := p
for r := 0; r < int(replicationFactor); r++ {
for r := 0; r < replicationFactor; r++ {
// get brokers ID for assignment from the brokers list and not using
// just a monotonic increasing index because there could be "hole" (a broker down)
assignments[int32(p)][r] = brokers[int32(k%int(brokersNumber))].ID()
assignments[int32(p)][r] = brokers[int32(k%brokersNumber)].ID()
k++
}
}
Expand All @@ -338,7 +381,7 @@ func (ts *TopicService) currentAssignments(topicMetadata *sarama.TopicMetadata)

// Alter the replica assignment for the partitions
//
// After the request for the replica assignement, it run a loop for checking if the reassignment is still ongoing
// After the request for the replica assignment, it run a loop for checking if the reassignment is still ongoing
// It returns when the reassignment is done or there is an error
func (ts *TopicService) alterAssignments(assignments [][]int32) error {
if err := ts.admin.AlterPartitionReassignments(ts.canaryConfig.Topic, assignments); err != nil {
Expand Down
176 changes: 176 additions & 0 deletions internal/services/topic_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
//
// Copyright Strimzi authors.
// License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
//

// +build unit_test

// Package services defines an interface for canary services and related implementations

package services

import (
"fmt"
"github.com/Shopify/sarama"
"github.com/strimzi/strimzi-canary/internal/config"
"math/rand"
"reflect"
"testing"
"time"
"unsafe"
)

func TestRequestedAssignments(t *testing.T) {
var tests = []struct {
name string
numPartitions int
numBrokers int
useRack bool
brokersWithMultipleLeaders []int32
expectedMinISR int
}{
{"one broker", 1, 1, false, []int32{}, 1},
{"three brokers without rack info", 3, 3, false, []int32{}, 2},
{"fewer brokers than partitions", 3, 2, false, []int32{0}, 1},
{"six brokers with rack info", 6, 6, true, []int32{}, 2},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := &config.CanaryConfig{
Topic: "test",
}

brokers, brokerMap := createBrokers(t, tt.numBrokers, tt.useRack)

ts := NewTopicService(cfg, nil)

assignments, minISR := ts.requestedAssignments(tt.numPartitions, brokers)

if tt.expectedMinISR != minISR {
t.Errorf("unexpected minISR, got = %d, want = %d", minISR, tt.expectedMinISR)
}

for _, brokerIds := range assignments {
duplicatedBrokers := make(map[int32]int)

for _, brokerId := range brokerIds {
if _, ok := duplicatedBrokers[brokerId]; !ok {
duplicatedBrokers[brokerId] = 1
} else {
duplicatedBrokers[brokerId] = duplicatedBrokers[brokerId] + 1
}

}

for brokerId, count := range duplicatedBrokers {
if count > 1 {
t.Errorf("partition is replicated on same broker (%d) more than once (%d)", brokerId, count)
}
}
}

leaderBrokers := make(map[int32]int)
for _, brokerIds := range assignments {

leaderBrokerId := brokerIds[0]
if _, ok := leaderBrokers[leaderBrokerId]; !ok {
leaderBrokers[leaderBrokerId] = 1
} else {
leaderBrokers[leaderBrokerId] = leaderBrokers[leaderBrokerId] + 1
}
}

for brokerId, count := range leaderBrokers {
if count > 1 {
found := false
for _, expectedBrokerId := range tt.brokersWithMultipleLeaders {
if expectedBrokerId == brokerId {
found = true
break
}
}

if !found {
t.Errorf("broker %d is leader more than one partition (%d)", brokerId, count)
}
}
}

if tt.useRack {
for i, brokerIds := range assignments {
rackBrokerId := make(map[string][]int32)
for _, brokerId := range brokerIds {
broker := brokerMap[brokerId]
_, ok := rackBrokerId[broker.Rack()]
if !ok {
rackBrokerId[broker.Rack()] = make([]int32, 0)
}
rackBrokerId[broker.Rack()] = append(rackBrokerId[broker.Rack()], broker.ID())
}

for rack, brokerIds := range rackBrokerId {
if len(brokerIds) > 1 {
t.Errorf("partition %d has been assigned to %d brokers %v in rackBrokerId %s", i, len(brokerIds), brokerIds, rack)
}
}
}
}

ts.Close()
})
}

}

func createBrokers(t *testing.T, num int, rack bool) ([]*sarama.Broker, map[int32]*sarama.Broker) {
brokers := make([]*sarama.Broker, 0)
brokerMap := make(map[int32]*sarama.Broker)
for i := 0; i < num ; i++ {
broker := &sarama.Broker{}

setBrokerID(t, broker, i)

brokers = append(brokers, broker)
brokerMap[broker.ID()] = broker
}

rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(brokers), func(i, j int) { brokers[i], brokers[j] = brokers[j], brokers[i] })

if rack {
rackNames := make([]string, 3)
for i, _ := range rackNames {
rackNames[i] = fmt.Sprintf("useRack%d", i)
}

for i, broker := range brokers {
rackName := rackNames[i%3]
setBrokerRack(t, broker, rackName)
}
}

return brokers, brokerMap
}

func setBrokerID(t *testing.T, broker *sarama.Broker, i int) {
idV := reflect.ValueOf(broker).Elem().FieldByName("id")
setUnexportedField(idV, int32(i))
if int32(i) != broker.ID() {
t.Errorf("failed to set id by reflection")
}
}

func setBrokerRack(t *testing.T, broker *sarama.Broker, rackName string) {
rackV := reflect.ValueOf(broker).Elem().FieldByName("rack")
setUnexportedField(rackV, &rackName)
if rackName != broker.Rack() {
t.Errorf("failed to set useRack by reflection")
}
}

func setUnexportedField(field reflect.Value, value interface{}) {
reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).
Elem().
Set(reflect.ValueOf(value))
}

0 comments on commit 195e852

Please sign in to comment.