Skip to content

Commit

Permalink
grpclb: new balancer V2
Browse files Browse the repository at this point in the history
pickfirst test passed

switching from roundrobin to pickfirst works

switching from pickfirst to roundrobin works

cleanup config

actually parse service config

2019
  • Loading branch information
menghanl committed Apr 2, 2019
1 parent 4745f6a commit db4bb28
Show file tree
Hide file tree
Showing 5 changed files with 363 additions and 59 deletions.
54 changes: 36 additions & 18 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,19 +129,8 @@ func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Bui
}
}

// newLBBuilderWithPickFirst creates a grpclb builder with pick-first.
func newLBBuilderWithPickFirst() balancer.Builder {
return &lbBuilder{
usePickFirst: true,
}
}

type lbBuilder struct {
fallbackTimeout time.Duration

// TODO: delete this when balancer can handle service config. This should be
// updated by service config.
usePickFirst bool // Use roundrobin or pickfirst for backends.
}

func (b *lbBuilder) Name() string {
Expand All @@ -167,7 +156,6 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
cc: newLBCacheClientConn(cc),
target: target,
opt: opt,
usePickFirst: b.usePickFirst,
fallbackTimeout: b.fallbackTimeout,
doneCh: make(chan struct{}),

Expand Down Expand Up @@ -231,11 +219,14 @@ type lbBalancer struct {
// serverList contains anything new. Each generate picker will also have
// reference to this list to do the first layer pick.
fullServerList []*lbpb.Server
// Backend addresses. It's kept so the addresses are available when
// switching between round_robin and pickfirst.
backendAddrs []resolver.Address
// All backends addresses, with metadata set to nil. This list contains all
// backend addresses in the same order and with the same duplicates as in
// serverlist. When generating picker, a SubConn slice with the same order
// but with only READY SCs will be gerenated.
backendAddrs []resolver.Address
backendAddrsWithoutMetadata []resolver.Address
// Roundrobin functionalities.
state connectivity.State
subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
Expand Down Expand Up @@ -275,7 +266,7 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
break
}
} else {
for _, a := range lb.backendAddrs {
for _, a := range lb.backendAddrsWithoutMetadata {
if sc, ok := lb.subConns[a]; ok {
if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready {
readySCs = append(readySCs, sc)
Expand Down Expand Up @@ -339,6 +330,11 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {
}

func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
panic("not used")
}

func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
s := scs.ConnectivityState
if grpclog.V(2) {
grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
}
Expand Down Expand Up @@ -371,7 +367,7 @@ func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivi
if lb.state != connectivity.Ready {
if !lb.inFallback && !lb.remoteBalancerConnected {
// Enter fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
lb.refreshSubConns(lb.resolvedBackendAddrs, false, lb.usePickFirst)
}
}
}
Expand Down Expand Up @@ -410,17 +406,39 @@ func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) {
return
}
// Enter fallback.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
lb.mu.Unlock()
}

// HandleResolvedAddrs sends the updated remoteLB addresses to remoteLB
// clientConn. The remoteLB clientConn will handle creating/removing remoteLB
// connections.
func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
panic("not used")
}

func (lb *lbBalancer) handleServiceConfig(sc string) {
lb.mu.Lock()
defer lb.mu.Unlock()

newUsePickFirst := childIsPickFirst(sc)
if lb.usePickFirst == newUsePickFirst {
return
}
if grpclog.V(2) {
grpclog.Infof("lbBalancer: handleResolvedResult: %+v", addrs)
grpclog.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst)
}
lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
lb.regeneratePicker(true)
}

func (lb *lbBalancer) UpdateResolverState(rs resolver.State) {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: UpdateResolverState: %+v", rs)
}
lb.handleServiceConfig(rs.ServiceConfig)

addrs := rs.Addresses
if len(addrs) <= 0 {
return
}
Expand Down Expand Up @@ -457,7 +475,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) {
// This means we received a new list of resolved backends, and we are
// still in fallback mode. Need to update the list of backends we are
// using to the new list of backends.
lb.refreshSubConns(lb.resolvedBackendAddrs, false)
lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst)
}
lb.mu.Unlock()
}
Expand Down
87 changes: 87 additions & 0 deletions balancer/grpclb/grpclb_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpclb

import (
"encoding/json"

"google.golang.org/grpc"
"google.golang.org/grpc/balancer/roundrobin"
)

type serviceConfig struct {
LoadBalancingConfig *[]map[string]*grpclbServiceConfig
}

type grpclbServiceConfig struct {
ChildPolicy *[]map[string]json.RawMessage
}

func parseFullServiceConfig(s string) *serviceConfig {
var ret serviceConfig
err := json.Unmarshal([]byte(s), &ret)
if err != nil {
return nil
}
return &ret
}

func parseServiceConfig(s string) *grpclbServiceConfig {
parsedSC := parseFullServiceConfig(s)
if parsedSC == nil {
return nil
}
lbConfigs := parsedSC.LoadBalancingConfig
if lbConfigs == nil {
return nil
}
for _, lbC := range *lbConfigs {
if v, ok := lbC[grpclbName]; ok {
return v
}
}
return nil
}

const (
roundRobinName = roundrobin.Name
pickFirstName = grpc.PickFirstBalancerName
)

func childIsPickFirst(s string) bool {
parsedSC := parseServiceConfig(s)
if parsedSC == nil {
return false
}
childConfigs := parsedSC.ChildPolicy
if childConfigs == nil {
return false
}
for _, childC := range *childConfigs {
// If round_robin exists before pick_first, return false
if _, ok := childC[roundRobinName]; ok {
return false
}
// If pick_first is before round_robin, return true
if _, ok := childC[pickFirstName]; ok {
return true
}
}
return false
}
154 changes: 154 additions & 0 deletions balancer/grpclb/grpclb_config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package grpclb

import (
"encoding/json"
"reflect"
"testing"
)

func Test_parseFullServiceConfig(t *testing.T) {
tests := []struct {
name string
s string
want *serviceConfig
}{
{
name: "empty",
s: "",
want: nil,
},
{
name: "success1",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`,
want: &serviceConfig{
LoadBalancingConfig: &[]map[string]*grpclbServiceConfig{
{"grpclb": &grpclbServiceConfig{
ChildPolicy: &[]map[string]json.RawMessage{
{"pick_first": json.RawMessage("{}")},
},
}},
},
},
},
{
name: "success2",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`,
want: &serviceConfig{
LoadBalancingConfig: &[]map[string]*grpclbServiceConfig{
{"grpclb": &grpclbServiceConfig{
ChildPolicy: &[]map[string]json.RawMessage{
{"round_robin": json.RawMessage("{}")},
{"pick_first": json.RawMessage("{}")},
},
}},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := parseFullServiceConfig(tt.s); !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseFullServiceConfig() = %+v, want %+v", got, tt.want)
}
})
}
}

func Test_parseServiceConfig(t *testing.T) {
tests := []struct {
name string
s string
want *grpclbServiceConfig
}{
{
name: "empty",
s: "",
want: nil,
},
{
name: "success1",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`,
want: &grpclbServiceConfig{
ChildPolicy: &[]map[string]json.RawMessage{
{"pick_first": json.RawMessage("{}")},
},
},
},
{
name: "success2",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`,
want: &grpclbServiceConfig{
ChildPolicy: &[]map[string]json.RawMessage{
{"round_robin": json.RawMessage("{}")},
{"pick_first": json.RawMessage("{}")},
},
},
},
{
name: "no_grpclb",
s: `{"loadBalancingConfig":[{"notgrpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`,
want: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := parseServiceConfig(tt.s); !reflect.DeepEqual(got, tt.want) {
t.Errorf("parseFullServiceConfig() = %+v, want %+v", got, tt.want)
}
})
}
}

func Test_childIsPickFirst(t *testing.T) {
tests := []struct {
name string
s string
want bool
}{
{
name: "invalid",
s: "",
want: false,
},
{
name: "pickfirst_only",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`,
want: true,
},
{
name: "pickfirst_before_rr",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}},{"round_robin":{}}]}}]}`,
want: true,
},
{
name: "rr_before_pickfirst",
s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`,
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := childIsPickFirst(tt.s); got != tt.want {
t.Errorf("childIsPickFirst() = %v, want %v", got, tt.want)
}
})
}
}
Loading

0 comments on commit db4bb28

Please sign in to comment.