Skip to content

Commit

Permalink
ringhash: fix normalizeWeights (#7156)
Browse files Browse the repository at this point in the history
  • Loading branch information
arvindbr8 authored May 29, 2024
1 parent 0756c0d commit 33faea8
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 24 deletions.
43 changes: 25 additions & 18 deletions xds/internal/balancer/ringhash/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,30 +116,37 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64, log
return &ring{items: items}
}

// normalizeWeights divides all the weights by the sum, so that the total weight
// is 1.
// normalizeWeights calculates the normalized weights for each subConn in the
// given subConns map. It returns a slice of subConnWithWeight structs, where
// each struct contains a subConn and its corresponding weight. The function
// also returns the minimum weight among all subConns.
//
// The normalized weight of each subConn is calculated by dividing its weight
// attribute by the sum of all subConn weights. If the weight attribute is not
// found on the address, a default weight of 1 is used.
//
// The addresses are sorted in ascending order to ensure consistent results.
//
// Must be called with a non-empty subConns map.
func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64) {
var weightSum uint32
keys := subConns.Keys()
for _, a := range keys {
weightSum += getWeightAttribute(a)
// Since attributes are explicitly ignored in the AddressMap key, we need to
// iterate over the values to get the weights.
scVals := subConns.Values()
for _, a := range scVals {
weightSum += a.(*subConn).weight
}
ret := make([]subConnWithWeight, 0, len(keys))
min := float64(1.0)
for _, a := range keys {
v, _ := subConns.Get(a)
scInfo := v.(*subConn)
// getWeightAttribute() returns 1 if the weight attribute is not found
// on the address. And since this function is guaranteed to be called
// with a non-empty subConns map, weightSum is guaranteed to be
// non-zero. So, we need not worry about divide a by zero error here.
nw := float64(getWeightAttribute(a)) / float64(weightSum)
ret := make([]subConnWithWeight, 0, subConns.Len())
min := 1.0
for _, a := range scVals {
scInfo := a.(*subConn)
// (*subConn).weight is set to 1 if the weight attribute is not found on
// the address. And since this function is guaranteed to be called with
// a non-empty subConns map, weightSum is guaranteed to be non-zero. So,
// we need not worry about divide by zero error here.
nw := float64(scInfo.weight) / float64(weightSum)
ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw})
if nw < min {
min = nw
}
min = math.Min(min, nw)
}
// Sort the addresses to return consistent results.
//
Expand Down
6 changes: 3 additions & 3 deletions xds/internal/balancer/ringhash/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,9 @@ func init() {
testAddr("c", 4),
}
testSubConnMap = resolver.NewAddressMap()
testSubConnMap.Set(testAddrs[0], &subConn{addr: "a"})
testSubConnMap.Set(testAddrs[1], &subConn{addr: "b"})
testSubConnMap.Set(testAddrs[2], &subConn{addr: "c"})
testSubConnMap.Set(testAddrs[0], &subConn{addr: "a", weight: 3})
testSubConnMap.Set(testAddrs[1], &subConn{addr: "b", weight: 3})
testSubConnMap.Set(testAddrs[2], &subConn{addr: "c", weight: 4})
}

func testAddr(addr string, weight uint32) resolver.Address {
Expand Down
36 changes: 33 additions & 3 deletions xds/internal/balancer/ringhash/ringhash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,17 +344,26 @@ func (s) TestThreeSubConnsAffinityMultiple(t *testing.T) {
}
}

// TestAddrWeightChange covers the following scenarios after setting up the
// balancer with 3 addresses [A, B, C]:
// - updates balancer with [A, B, C], a new Picker should not be sent.
// - updates balancer with [A, B] (C removed), a new Picker is sent and the
// ring is updated.
// - updates balancer with [A, B], but B has a weight of 2, a new Picker is
// sent. And the new ring should contain the correct number of entries
// and weights.
func (s) TestAddrWeightChange(t *testing.T) {
wantAddrs := []resolver.Address{
addrs := []resolver.Address{
{Addr: testBackendAddrStrs[0]},
{Addr: testBackendAddrStrs[1]},
{Addr: testBackendAddrStrs[2]},
}
cc, b, p0 := setupTest(t, wantAddrs)
cc, b, p0 := setupTest(t, addrs)
ring0 := p0.(*picker).ring

// Update with the same addresses, should not send a new Picker.
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: wantAddrs},
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
Expand Down Expand Up @@ -407,6 +416,27 @@ func (s) TestAddrWeightChange(t *testing.T) {
if p2.(*picker).ring == ring1 {
t.Fatalf("new picker after changing address weight has the same ring as before, want different")
}
// With the new update, the ring must look like this:
// [
// {idx:0 sc: {addr: testBackendAddrStrs[0], weight: 1}},
// {idx:1 sc: {addr: testBackendAddrStrs[1], weight: 2}},
// {idx:2 sc: {addr: testBackendAddrStrs[2], weight: 2}},
// ].
if len(p2.(*picker).ring.items) != 3 {
t.Fatalf("new picker after changing address weight has %d entries, want 3", len(p2.(*picker).ring.items))
}
for _, i := range p2.(*picker).ring.items {
if i.sc.addr == testBackendAddrStrs[0] {
if i.sc.weight != 1 {
t.Fatalf("new picker after changing address weight has weight %d for %v, want 1", i.sc.weight, i.sc.addr)
}
}
if i.sc.addr == testBackendAddrStrs[1] {
if i.sc.weight != 2 {
t.Fatalf("new picker after changing address weight has weight %d for %v, want 2", i.sc.weight, i.sc.addr)
}
}
}
}

// TestSubConnToConnectWhenOverallTransientFailure covers the situation when the
Expand Down

0 comments on commit 33faea8

Please sign in to comment.