diff --git a/balancer/grpclb/grpclb.go b/balancer/grpclb/grpclb.go index f2ddfc3788ed..111344af345e 100644 --- a/balancer/grpclb/grpclb.go +++ b/balancer/grpclb/grpclb.go @@ -32,12 +32,15 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" grpclbstate "google.golang.org/grpc/balancer/grpclb/state" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/backoff" + internalgrpclog "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/internal/resolver/dns" "google.golang.org/grpc/resolver" @@ -145,20 +148,21 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal manualResolver: r, subConns: make(map[resolver.Address]balancer.SubConn), scStates: make(map[balancer.SubConn]connectivity.State), - picker: &errPicker{err: balancer.ErrNoSubConnAvailable}, + picker: base.NewErrPicker(balancer.ErrNoSubConnAvailable), clientStats: newRPCStats(), backoff: backoff.DefaultExponential, // TODO: make backoff configurable. } + lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[grpclb %p] ", lb)) var err error if opt.CredsBundle != nil { lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer) if err != nil { - logger.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err) + lb.logger.Warningf("Failed to create credentials used for connecting to grpclb: %v", err) } lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer) if err != nil { - logger.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err) + lb.logger.Warningf("Failed to create credentials used for connecting to backends returned by grpclb: %v", err) } } @@ -170,6 +174,7 @@ type lbBalancer struct { dialTarget string // user's dial target target string // same as dialTarget unless overridden in service config opt balancer.BuildOptions + logger *internalgrpclog.PrefixLogger usePickFirst bool @@ -236,12 +241,12 @@ type lbBalancer struct { // Caller must hold lb.mu. func (lb *lbBalancer) regeneratePicker(resetDrop bool) { if lb.state == connectivity.TransientFailure { - lb.picker = &errPicker{err: fmt.Errorf("all SubConns are in TransientFailure, last connection error: %v", lb.connErr)} + lb.picker = base.NewErrPicker(fmt.Errorf("all SubConns are in TransientFailure, last connection error: %v", lb.connErr)) return } if lb.state == connectivity.Connecting { - lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} + lb.picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable) return } @@ -268,7 +273,7 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) { // // This doesn't seem to be necessary after the connecting check above. // Kept for safety. - lb.picker = &errPicker{err: balancer.ErrNoSubConnAvailable} + lb.picker = base.NewErrPicker(balancer.ErrNoSubConnAvailable) return } if lb.inFallback { @@ -322,21 +327,21 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State { // UpdateSubConnState is unused; NewSubConn's options always specifies // updateSubConnState as the listener. func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { - logger.Errorf("grpclb: UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs) + lb.logger.Errorf("UpdateSubConnState(%v, %+v) called unexpectedly", sc, scs) } func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { s := scs.ConnectivityState - if logger.V(2) { - logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s) + if lb.logger.V(2) { + lb.logger.Infof("SubConn state change: %p, %v", sc, s) } lb.mu.Lock() defer lb.mu.Unlock() oldS, ok := lb.scStates[sc] if !ok { - if logger.V(2) { - logger.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s) + if lb.logger.V(2) { + lb.logger.Infof("Received state change for an unknown SubConn: %p, %v", sc, s) } return } @@ -441,8 +446,8 @@ func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) { if lb.usePickFirst == newUsePickFirst { return } - if logger.V(2) { - logger.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst) + if lb.logger.V(2) { + lb.logger.Infof("Switching mode. Is pick_first used for backends? %v", newUsePickFirst) } lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst) } @@ -453,8 +458,8 @@ func (lb *lbBalancer) ResolverError(error) { } func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error { - if logger.V(2) { - logger.Infof("lbBalancer: UpdateClientConnState: %+v", ccs) + if lb.logger.V(2) { + lb.logger.Infof("UpdateClientConnState: %s", pretty.ToJSON(ccs)) } gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig) lb.handleServiceConfig(gc) @@ -482,7 +487,9 @@ func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error } else if lb.ccRemoteLB == nil { // First time receiving resolved addresses, create a cc to remote // balancers. - lb.newRemoteBalancerCCWrapper() + if err := lb.newRemoteBalancerCCWrapper(); err != nil { + return err + } // Start the fallback goroutine. go lb.fallbackToBackendsAfter(lb.fallbackTimeout) } diff --git a/balancer/grpclb/grpclb_picker.go b/balancer/grpclb/grpclb_picker.go index 39bc5cc71e81..20c5f2ec3967 100644 --- a/balancer/grpclb/grpclb_picker.go +++ b/balancer/grpclb/grpclb_picker.go @@ -98,15 +98,6 @@ func (s *rpcStats) knownReceived() { atomic.AddInt64(&s.numCallsFinished, 1) } -type errPicker struct { - // Pick always returns this err. - err error -} - -func (p *errPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { - return balancer.PickResult{}, p.err -} - // rrPicker does roundrobin on subConns. It's typically used when there's no // response from remote balancer, and grpclb falls back to the resolved // backends. diff --git a/balancer/grpclb/grpclb_remote_balancer.go b/balancer/grpclb/grpclb_remote_balancer.go index edb66a90a3b1..c8fe1edd8e53 100644 --- a/balancer/grpclb/grpclb_remote_balancer.go +++ b/balancer/grpclb/grpclb_remote_balancer.go @@ -27,11 +27,8 @@ import ( "time" "github.com/golang/protobuf/proto" - timestamppb "github.com/golang/protobuf/ptypes/timestamp" - "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/balancer" - lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal/backoff" @@ -39,13 +36,28 @@ import ( "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/resolver" + + timestamppb "github.com/golang/protobuf/ptypes/timestamp" + lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" ) +func serverListEqual(a, b []*lbpb.Server) bool { + if len(a) != len(b) { + return false + } + for i := 0; i < len(a); i++ { + if !proto.Equal(a[i], b[i]) { + return false + } + } + return true +} + // processServerList updates balancer's internal state, create/remove SubConns // and regenerates picker using the received serverList. func (lb *lbBalancer) processServerList(l *lbpb.ServerList) { - if logger.V(2) { - logger.Infof("lbBalancer: processing server list: %+v", l) + if lb.logger.V(2) { + lb.logger.Infof("Processing server list: %#v", l) } lb.mu.Lock() defer lb.mu.Unlock() @@ -55,9 +67,9 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) { lb.serverListReceived = true // If the new server list == old server list, do nothing. - if cmp.Equal(lb.fullServerList, l.Servers, cmp.Comparer(proto.Equal)) { - if logger.V(2) { - logger.Infof("lbBalancer: new serverlist same as the previous one, ignoring") + if serverListEqual(lb.fullServerList, l.Servers) { + if lb.logger.V(2) { + lb.logger.Infof("Ignoring new server list as it is the same as the previous one") } return } @@ -78,9 +90,8 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) { ipStr = fmt.Sprintf("[%s]", ipStr) } addr := imetadata.Set(resolver.Address{Addr: fmt.Sprintf("%s:%d", ipStr, s.Port)}, md) - if logger.V(2) { - logger.Infof("lbBalancer: server list entry[%d]: ipStr:|%s|, port:|%d|, load balancer token:|%v|", - i, ipStr, s.Port, s.LoadBalanceToken) + if lb.logger.V(2) { + lb.logger.Infof("Server list entry:|%d|, ipStr:|%s|, port:|%d|, load balancer token:|%v|", i, ipStr, s.Port, s.LoadBalanceToken) } backendAddrs = append(backendAddrs, addr) } @@ -149,7 +160,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback // This bypasses the cc wrapper with SubConn cache. sc, err := lb.cc.ClientConn.NewSubConn(backendAddrs, opts) if err != nil { - logger.Warningf("grpclb: failed to create new SubConn: %v", err) + lb.logger.Warningf("Failed to create new SubConn: %v", err) return } sc.Connect() @@ -174,7 +185,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback opts.StateListener = func(scs balancer.SubConnState) { lb.updateSubConnState(sc, scs) } sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts) if err != nil { - logger.Warningf("grpclb: failed to create new SubConn: %v", err) + lb.logger.Warningf("Failed to create new SubConn: %v", err) continue } lb.subConns[addrWithoutAttrs] = sc // Use the addr without MD as key for the map. @@ -217,7 +228,7 @@ type remoteBalancerCCWrapper struct { wg sync.WaitGroup } -func (lb *lbBalancer) newRemoteBalancerCCWrapper() { +func (lb *lbBalancer) newRemoteBalancerCCWrapper() error { var dopts []grpc.DialOption if creds := lb.opt.DialCreds; creds != nil { dopts = append(dopts, grpc.WithTransportCredentials(creds)) @@ -248,9 +259,10 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() { // // The grpclb server addresses will set field ServerName, and creds will // receive ServerName as authority. - cc, err := grpc.DialContext(context.Background(), lb.manualResolver.Scheme()+":///grpclb.subClientConn", dopts...) + target := lb.manualResolver.Scheme() + ":///grpclb.subClientConn" + cc, err := grpc.Dial(target, dopts...) if err != nil { - logger.Fatalf("failed to dial: %v", err) + return fmt.Errorf("grpc.Dial(%s): %v", target, err) } ccw := &remoteBalancerCCWrapper{ cc: cc, @@ -261,6 +273,7 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() { lb.ccRemoteLB = ccw ccw.wg.Add(1) go ccw.watchRemoteBalancer() + return nil } // close closed the ClientConn to remote balancer, and waits until all @@ -408,9 +421,9 @@ func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() { default: if err != nil { if err == errServerTerminatedConnection { - logger.Info(err) + ccw.lb.logger.Infof("Call to remote balancer failed: %v", err) } else { - logger.Warning(err) + ccw.lb.logger.Warningf("Call to remote balancer failed: %v", err) } } } diff --git a/interop/observability/go.mod b/interop/observability/go.mod index e5da798dcf69..3f4302e3c78a 100644 --- a/interop/observability/go.mod +++ b/interop/observability/go.mod @@ -22,7 +22,6 @@ require ( github.com/envoyproxy/protoc-gen-validate v1.0.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.3 // indirect - github.com/google/go-cmp v0.5.9 // indirect github.com/google/s2a-go v0.1.4 // indirect github.com/google/uuid v1.3.1 // indirect github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect