diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index a1123cedc7f8..7a1c700abd5e 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -129,19 +129,8 @@ func newLBBuilderWithFallbackTimeout(fallbackTimeout time.Duration) balancer.Bui } } -// newLBBuilderWithPickFirst creates a grpclb builder with pick-first. -func newLBBuilderWithPickFirst() balancer.Builder { - return &lbBuilder{ - usePickFirst: true, - } -} - type lbBuilder struct { fallbackTimeout time.Duration - - // TODO: delete this when balancer can handle service config. This should be - // updated by service config. - usePickFirst bool // Use roundrobin or pickfirst for backends. } func (b *lbBuilder) Name() string { @@ -167,7 +156,6 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal cc: newLBCacheClientConn(cc), target: target, opt: opt, - usePickFirst: b.usePickFirst, fallbackTimeout: b.fallbackTimeout, doneCh: make(chan struct{}), @@ -231,11 +219,14 @@ type lbBalancer struct { // serverList contains anything new. Each generate picker will also have // reference to this list to do the first layer pick. fullServerList []*lbpb.Server + // Backend addresses. It's kept so the addresses are available when + // switching between round_robin and pickfirst. + backendAddrs []resolver.Address // All backends addresses, with metadata set to nil. This list contains all // backend addresses in the same order and with the same duplicates as in // serverlist. When generating picker, a SubConn slice with the same order // but with only READY SCs will be gerenated. - backendAddrs []resolver.Address + backendAddrsWithoutMetadata []resolver.Address // Roundrobin functionalities. state connectivity.State subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn. @@ -275,7 +266,7 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) { break } } else { - for _, a := range lb.backendAddrs { + for _, a := range lb.backendAddrsWithoutMetadata { if sc, ok := lb.subConns[a]; ok { if st, ok := lb.scStates[sc]; ok && st == connectivity.Ready { readySCs = append(readySCs, sc) @@ -339,6 +330,11 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State { } func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) { + panic("not used") +} + +func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { + s := scs.ConnectivityState if grpclog.V(2) { grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s) } @@ -371,7 +367,7 @@ func (lb *lbBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivi if lb.state != connectivity.Ready { if !lb.inFallback && !lb.remoteBalancerConnected { // Enter fallback. - lb.refreshSubConns(lb.resolvedBackendAddrs, false) + lb.refreshSubConns(lb.resolvedBackendAddrs, false, lb.usePickFirst) } } } @@ -410,7 +406,7 @@ func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) { return } // Enter fallback. - lb.refreshSubConns(lb.resolvedBackendAddrs, false) + lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst) lb.mu.Unlock() } @@ -418,9 +414,31 @@ func (lb *lbBalancer) fallbackToBackendsAfter(fallbackTimeout time.Duration) { // clientConn. The remoteLB clientConn will handle creating/removing remoteLB // connections. func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { + panic("not used") +} + +func (lb *lbBalancer) handleServiceConfig(sc string) { + lb.mu.Lock() + defer lb.mu.Unlock() + + newUsePickFirst := childIsPickFirst(sc) + if lb.usePickFirst == newUsePickFirst { + return + } if grpclog.V(2) { - grpclog.Infof("lbBalancer: handleResolvedResult: %+v", addrs) + grpclog.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst) } + lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst) + lb.regeneratePicker(true) +} + +func (lb *lbBalancer) UpdateResolverState(rs resolver.State) { + if grpclog.V(2) { + grpclog.Infof("lbBalancer: UpdateResolverState: %+v", rs) + } + lb.handleServiceConfig(rs.ServiceConfig) + + addrs := rs.Addresses if len(addrs) <= 0 { return } @@ -457,7 +475,7 @@ func (lb *lbBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error) { // This means we received a new list of resolved backends, and we are // still in fallback mode. Need to update the list of backends we are // using to the new list of backends. - lb.refreshSubConns(lb.resolvedBackendAddrs, false) + lb.refreshSubConns(lb.resolvedBackendAddrs, true, lb.usePickFirst) } lb.mu.Unlock() } diff --git a/balancer/grpclb/grpclb_config.go b/balancer/grpclb/grpclb_config.go new file mode 100644 index 000000000000..91458862551c --- /dev/null +++ b/balancer/grpclb/grpclb_config.go @@ -0,0 +1,87 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpclb + +import ( + "encoding/json" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer/roundrobin" +) + +type serviceConfig struct { + LoadBalancingConfig *[]map[string]*grpclbServiceConfig +} + +type grpclbServiceConfig struct { + ChildPolicy *[]map[string]json.RawMessage +} + +func parseFullServiceConfig(s string) *serviceConfig { + var ret serviceConfig + err := json.Unmarshal([]byte(s), &ret) + if err != nil { + return nil + } + return &ret +} + +func parseServiceConfig(s string) *grpclbServiceConfig { + parsedSC := parseFullServiceConfig(s) + if parsedSC == nil { + return nil + } + lbConfigs := parsedSC.LoadBalancingConfig + if lbConfigs == nil { + return nil + } + for _, lbC := range *lbConfigs { + if v, ok := lbC[grpclbName]; ok { + return v + } + } + return nil +} + +const ( + roundRobinName = roundrobin.Name + pickFirstName = grpc.PickFirstBalancerName +) + +func childIsPickFirst(s string) bool { + parsedSC := parseServiceConfig(s) + if parsedSC == nil { + return false + } + childConfigs := parsedSC.ChildPolicy + if childConfigs == nil { + return false + } + for _, childC := range *childConfigs { + // If round_robin exists before pick_first, return false + if _, ok := childC[roundRobinName]; ok { + return false + } + // If pick_first is before round_robin, return true + if _, ok := childC[pickFirstName]; ok { + return true + } + } + return false +} diff --git a/balancer/grpclb/grpclb_config_test.go b/balancer/grpclb/grpclb_config_test.go new file mode 100644 index 000000000000..e7ed82d3076e --- /dev/null +++ b/balancer/grpclb/grpclb_config_test.go @@ -0,0 +1,154 @@ +/* + * + * Copyright 2019 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package grpclb + +import ( + "encoding/json" + "reflect" + "testing" +) + +func Test_parseFullServiceConfig(t *testing.T) { + tests := []struct { + name string + s string + want *serviceConfig + }{ + { + name: "empty", + s: "", + want: nil, + }, + { + name: "success1", + s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`, + want: &serviceConfig{ + LoadBalancingConfig: &[]map[string]*grpclbServiceConfig{ + {"grpclb": &grpclbServiceConfig{ + ChildPolicy: &[]map[string]json.RawMessage{ + {"pick_first": json.RawMessage("{}")}, + }, + }}, + }, + }, + }, + { + name: "success2", + s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`, + want: &serviceConfig{ + LoadBalancingConfig: &[]map[string]*grpclbServiceConfig{ + {"grpclb": &grpclbServiceConfig{ + ChildPolicy: &[]map[string]json.RawMessage{ + {"round_robin": json.RawMessage("{}")}, + {"pick_first": json.RawMessage("{}")}, + }, + }}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := parseFullServiceConfig(tt.s); !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseFullServiceConfig() = %+v, want %+v", got, tt.want) + } + }) + } +} + +func Test_parseServiceConfig(t *testing.T) { + tests := []struct { + name string + s string + want *grpclbServiceConfig + }{ + { + name: "empty", + s: "", + want: nil, + }, + { + name: "success1", + s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`, + want: &grpclbServiceConfig{ + ChildPolicy: &[]map[string]json.RawMessage{ + {"pick_first": json.RawMessage("{}")}, + }, + }, + }, + { + name: "success2", + s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`, + want: &grpclbServiceConfig{ + ChildPolicy: &[]map[string]json.RawMessage{ + {"round_robin": json.RawMessage("{}")}, + {"pick_first": json.RawMessage("{}")}, + }, + }, + }, + { + name: "no_grpclb", + s: `{"loadBalancingConfig":[{"notgrpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := parseServiceConfig(tt.s); !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseFullServiceConfig() = %+v, want %+v", got, tt.want) + } + }) + } +} + +func Test_childIsPickFirst(t *testing.T) { + tests := []struct { + name string + s string + want bool + }{ + { + name: "invalid", + s: "", + want: false, + }, + { + name: "pickfirst_only", + s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}`, + want: true, + }, + { + name: "pickfirst_before_rr", + s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}},{"round_robin":{}}]}}]}`, + want: true, + }, + { + name: "rr_before_pickfirst", + s: `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"round_robin":{}},{"pick_first":{}}]}}]}`, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := childIsPickFirst(tt.s); got != tt.want { + t.Errorf("childIsPickFirst() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index 7ed886f060af..b3c1e21eeaa3 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -87,14 +87,14 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) { // Call refreshSubConns to create/remove SubConns. If we are in fallback, // this is also exiting fallback. - lb.refreshSubConns(backendAddrs, true) + lb.refreshSubConns(backendAddrs, true, lb.usePickFirst) } // refreshSubConns creates/removes SubConns with backendAddrs, and refreshes // balancer state and picker. // // Caller must hold lb.mu. -func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCLBServer bool) { +func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback bool, pickFirst bool) { defer func() { // Regenerate and update picker after refreshing subconns because with // cache, even if SubConn was newed/removed, there might be no state @@ -103,14 +103,28 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCL lb.updateStateAndPicker(true, true) }() - lb.inFallback = !fromGRPCLBServer + lb.inFallback = fallback opts := balancer.NewSubConnOptions{} - if fromGRPCLBServer { + if !fallback { opts.CredsBundle = lb.grpclbBackendCreds } - lb.backendAddrs = nil + lb.backendAddrs = backendAddrs + lb.backendAddrsWithoutMetadata = nil + + if lb.usePickFirst != pickFirst { + // Remove all SubConns when switching modes. + for a, sc := range lb.subConns { + if lb.usePickFirst { + lb.cc.cc.RemoveSubConn(sc) + } else { + lb.cc.RemoveSubConn(sc) + } + delete(lb.subConns, a) + } + lb.usePickFirst = pickFirst + } if lb.usePickFirst { var sc balancer.SubConn @@ -134,7 +148,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCL return } - // addrsSet is the set converted from backendAddrs, it's used to quick + // addrsSet is the set converted from backendAddrsWithoutMetadata, it's used to quick // lookup for an address. addrsSet := make(map[resolver.Address]struct{}) // Create new SubConns. @@ -142,7 +156,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fromGRPCL addrWithoutMD := addr addrWithoutMD.Metadata = nil addrsSet[addrWithoutMD] = struct{}{} - lb.backendAddrs = append(lb.backendAddrs, addrWithoutMD) + lb.backendAddrsWithoutMetadata = append(lb.backendAddrsWithoutMetadata, addrWithoutMD) if _, ok := lb.subConns[addrWithoutMD]; !ok { // Use addrWithMD to create the SubConn. @@ -282,7 +296,7 @@ func (lb *lbBalancer) watchRemoteBalancer() { // aggregated state is not Ready. if !lb.inFallback && lb.state != connectivity.Ready { // Entering fallback. - lb.refreshSubConns(lb.resolvedBackendAddrs, false) + lb.refreshSubConns(lb.resolvedBackendAddrs, false, lb.usePickFirst) } lb.mu.Unlock() diff --git a/balancer/grpclb/grpclb_test.go b/balancer/grpclb/grpclb_test.go index 2f4e3e01eede..740060fd4f73 100644 --- a/balancer/grpclb/grpclb_test.go +++ b/balancer/grpclb/grpclb_test.go @@ -807,10 +807,8 @@ func TestFallback(t *testing.T) { } } -// The remote balancer sends response with duplicates to grpclb client. func TestGRPCLBPickFirst(t *testing.T) { - balancer.Register(newLBBuilderWithPickFirst()) - defer balancer.Register(newLBBuilder()) + const grpclbServiceConfigWithPickFirst = `{"loadBalancingConfig":[{"grpclb":{"childPolicy":[{"pick_first":{}}]}}]}` defer leakcheck.Check(t) @@ -854,57 +852,90 @@ func TestGRPCLBPickFirst(t *testing.T) { defer cc.Close() testC := testpb.NewTestServiceClient(cc) - r.UpdateState(resolver.State{Addresses: []resolver.Address{{ - Addr: tss.lbAddr, - Type: resolver.GRPCLB, - ServerName: lbServerName, - }}}) + var ( + p peer.Peer + result string + ) + tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]} - var p peer.Peer + // Start with sub policy pick_first. - portPicked1 := 0 - tss.ls.sls <- &lbpb.ServerList{Servers: beServers[1:2]} + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{ + Addr: tss.lbAddr, + Type: resolver.GRPCLB, + ServerName: lbServerName, + }}, + ServiceConfig: grpclbServiceConfigWithPickFirst, + }) + + result = "" for i := 0; i < 1000; i++ { if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) } - if portPicked1 == 0 { - portPicked1 = p.Addr.(*net.TCPAddr).Port - continue - } - if portPicked1 != p.Addr.(*net.TCPAddr).Port { - t.Fatalf("Different backends are picked for RPCs: %v vs %v", portPicked1, p.Addr.(*net.TCPAddr).Port) - } + result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port]) + } + if seq := "00000"; !strings.Contains(result, strings.Repeat(seq, 100)) { + t.Errorf("got result sequence %q, want patten %q", result, seq) } - portPicked2 := portPicked1 - tss.ls.sls <- &lbpb.ServerList{Servers: beServers[:1]} + tss.ls.sls <- &lbpb.ServerList{Servers: beServers[2:]} + result = "" for i := 0; i < 1000; i++ { if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) } - if portPicked2 == portPicked1 { - portPicked2 = p.Addr.(*net.TCPAddr).Port - continue - } - if portPicked2 != p.Addr.(*net.TCPAddr).Port { - t.Fatalf("Different backends are picked for RPCs: %v vs %v", portPicked2, p.Addr.(*net.TCPAddr).Port) - } + result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port]) + } + if seq := "22222"; !strings.Contains(result, strings.Repeat(seq, 100)) { + t.Errorf("got result sequence %q, want patten %q", result, seq) } - portPicked := portPicked2 tss.ls.sls <- &lbpb.ServerList{Servers: beServers[1:]} + result = "" for i := 0; i < 1000; i++ { if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) } - if portPicked == portPicked2 { - portPicked = p.Addr.(*net.TCPAddr).Port - continue + result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port]) + } + if seq := "22222"; !strings.Contains(result, strings.Repeat(seq, 100)) { + t.Errorf("got result sequence %q, want patten %q", result, seq) + } + + // Switch sub policy to roundrobin. + + r.UpdateState(resolver.State{ + Addresses: []resolver.Address{{ + Addr: tss.lbAddr, + Type: resolver.GRPCLB, + ServerName: lbServerName, + }}, + ServiceConfig: `{}`, + }) + + result = "" + for i := 0; i < 1000; i++ { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + t.Fatalf("_.EmptyCall(_, _) = _, %v, want _, ", err) } - if portPicked != p.Addr.(*net.TCPAddr).Port { - t.Fatalf("Different backends are picked for RPCs: %v vs %v", portPicked, p.Addr.(*net.TCPAddr).Port) + result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port]) + } + if seq := "121212"; !strings.Contains(result, strings.Repeat(seq, 100)) { + t.Errorf("got result sequence %q, want patten %q", result, seq) + } + + tss.ls.sls <- &lbpb.ServerList{Servers: beServers[0:3]} + result = "" + for i := 0; i < 1000; i++ { + if _, err := testC.EmptyCall(context.Background(), &testpb.Empty{}, grpc.WaitForReady(true), grpc.Peer(&p)); err != nil { + t.Fatalf("%v.EmptyCall(_, _) = _, %v, want _, ", testC, err) } + result += strconv.Itoa(portsToIndex[p.Addr.(*net.TCPAddr).Port]) + } + if seq := "012012012"; !strings.Contains(result, strings.Repeat(seq, 2)) { + t.Errorf("got result sequence %q, want patten %q", result, seq) } }