Skip to content

Commit

Permalink
Merge branch 'master' into Cindy/SNI
Browse files Browse the repository at this point in the history
  • Loading branch information
cindyxue authored and Cindy Xue committed Jun 30, 2020
2 parents d2cc061 + 6809848 commit 96232cc
Show file tree
Hide file tree
Showing 75 changed files with 1,673 additions and 529 deletions.
16 changes: 9 additions & 7 deletions balancer/base/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
"google.golang.org/grpc/resolver"
)

var logger = grpclog.Component("balancer")

type baseBuilder struct {
name string
pickerBuilder PickerBuilder
Expand Down Expand Up @@ -91,8 +93,8 @@ func (b *baseBalancer) ResolverError(err error) {

func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// TODO: handle s.ResolverState.ServiceConfig?
if grpclog.V(2) {
grpclog.Infoln("base.baseBalancer: got new ClientConn state: ", s)
if logger.V(2) {
logger.Info("base.baseBalancer: got new ClientConn state: ", s)
}
// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
Expand All @@ -104,7 +106,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// a is a new address (not existing in b.subConns).
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck})
if err != nil {
grpclog.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
b.subConns[a] = sc
Expand Down Expand Up @@ -168,13 +170,13 @@ func (b *baseBalancer) regeneratePicker() {

func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
if grpclog.V(2) {
grpclog.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
if logger.V(2) {
logger.Infof("base.baseBalancer: handle SubConn state change: %p, %v", sc, s)
}
oldS, ok := b.scStates[sc]
if !ok {
if grpclog.V(2) {
grpclog.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
if logger.V(2) {
logger.Infof("base.baseBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
}
return
}
Expand Down
21 changes: 11 additions & 10 deletions balancer/grpclb/grpclb.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const (
)

var errServerTerminatedConnection = errors.New("grpclb: failed to recv server list: server terminated connection")
var logger = grpclog.Component("grpclb")

func convertDuration(d *durationpb.Duration) time.Duration {
if d == nil {
Expand Down Expand Up @@ -150,11 +151,11 @@ func (b *lbBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) bal
if opt.CredsBundle != nil {
lb.grpclbClientConnCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBalancer)
if err != nil {
grpclog.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err)
logger.Warningf("lbBalancer: client connection creds NewWithMode failed: %v", err)
}
lb.grpclbBackendCreds, err = opt.CredsBundle.NewWithMode(internal.CredsBundleModeBackendFromBalancer)
if err != nil {
grpclog.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err)
logger.Warningf("lbBalancer: backend creds NewWithMode failed: %v", err)
}
}

Expand Down Expand Up @@ -310,16 +311,16 @@ func (lb *lbBalancer) aggregateSubConnStates() connectivity.State {

func (lb *lbBalancer) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) {
s := scs.ConnectivityState
if grpclog.V(2) {
grpclog.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
if logger.V(2) {
logger.Infof("lbBalancer: handle SubConn state change: %p, %v", sc, s)
}
lb.mu.Lock()
defer lb.mu.Unlock()

oldS, ok := lb.scStates[sc]
if !ok {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
if logger.V(2) {
logger.Infof("lbBalancer: got state changes for an unknown SubConn: %p, %v", sc, s)
}
return
}
Expand Down Expand Up @@ -393,8 +394,8 @@ func (lb *lbBalancer) handleServiceConfig(gc *grpclbServiceConfig) {
if lb.usePickFirst == newUsePickFirst {
return
}
if grpclog.V(2) {
grpclog.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst)
if logger.V(2) {
logger.Infof("lbBalancer: switching mode, new usePickFirst: %+v", newUsePickFirst)
}
lb.refreshSubConns(lb.backendAddrs, lb.inFallback, newUsePickFirst)
}
Expand All @@ -405,8 +406,8 @@ func (lb *lbBalancer) ResolverError(error) {
}

func (lb *lbBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
if logger.V(2) {
logger.Infof("lbBalancer: UpdateClientConnState: %+v", ccs)
}
gc, _ := ccs.BalancerConfig.(*grpclbServiceConfig)
lb.handleServiceConfig(gc)
Expand Down
23 changes: 11 additions & 12 deletions balancer/grpclb/grpclb_remote_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/balancer"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/keepalive"
Expand All @@ -44,8 +43,8 @@ import (
// processServerList updates balancer's internal state, create/remove SubConns
// and regenerates picker using the received serverList.
func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: processing server list: %+v", l)
if logger.V(2) {
logger.Infof("lbBalancer: processing server list: %+v", l)
}
lb.mu.Lock()
defer lb.mu.Unlock()
Expand All @@ -56,8 +55,8 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {

// If the new server list == old server list, do nothing.
if cmp.Equal(lb.fullServerList, l.Servers, cmp.Comparer(proto.Equal)) {
if grpclog.V(2) {
grpclog.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
if logger.V(2) {
logger.Infof("lbBalancer: new serverlist same as the previous one, ignoring")
}
return
}
Expand All @@ -81,8 +80,8 @@ func (lb *lbBalancer) processServerList(l *lbpb.ServerList) {
Addr: fmt.Sprintf("%s:%d", ipStr, s.Port),
Metadata: &md,
}
if grpclog.V(2) {
grpclog.Infof("lbBalancer: server list entry[%d]: ipStr:|%s|, port:|%d|, load balancer token:|%v|",
if logger.V(2) {
logger.Infof("lbBalancer: server list entry[%d]: ipStr:|%s|, port:|%d|, load balancer token:|%v|",
i, ipStr, s.Port, s.LoadBalanceToken)
}
backendAddrs = append(backendAddrs, addr)
Expand Down Expand Up @@ -150,7 +149,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
// This bypasses the cc wrapper with SubConn cache.
sc, err := lb.cc.cc.NewSubConn(backendAddrs, opts)
if err != nil {
grpclog.Warningf("grpclb: failed to create new SubConn: %v", err)
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
return
}
sc.Connect()
Expand All @@ -173,7 +172,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
// Use addrWithMD to create the SubConn.
sc, err := lb.cc.NewSubConn([]resolver.Address{addr}, opts)
if err != nil {
grpclog.Warningf("grpclb: failed to create new SubConn: %v", err)
logger.Warningf("grpclb: failed to create new SubConn: %v", err)
continue
}
lb.subConns[addrWithoutMD] = sc // Use the addr without MD as key for the map.
Expand Down Expand Up @@ -245,7 +244,7 @@ func (lb *lbBalancer) newRemoteBalancerCCWrapper() {
// receive ServerName as authority.
cc, err := grpc.DialContext(context.Background(), lb.manualResolver.Scheme()+":///grpclb.subClientConn", dopts...)
if err != nil {
grpclog.Fatalf("failed to dial: %v", err)
logger.Fatalf("failed to dial: %v", err)
}
ccw := &remoteBalancerCCWrapper{
cc: cc,
Expand Down Expand Up @@ -373,9 +372,9 @@ func (ccw *remoteBalancerCCWrapper) watchRemoteBalancer() {
default:
if err != nil {
if err == errServerTerminatedConnection {
grpclog.Info(err)
logger.Info(err)
} else {
grpclog.Warning(err)
logger.Warning(err)
}
}
}
Expand Down
45 changes: 18 additions & 27 deletions balancer/grpclb/grpclb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,7 @@ func newLoadBalancer(numberOfBackends int, statsChan chan *lbpb.ClientStats) (ts
var grpclbConfig = `{"loadBalancingConfig": [{"grpclb": {}}]}`

func (s) TestGRPCLB(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r := manual.NewBuilderWithScheme("whatever")

tss, cleanup, err := newLoadBalancer(1, nil)
if err != nil {
Expand All @@ -419,7 +418,7 @@ func (s) TestGRPCLB(t *testing.T) {
creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand All @@ -444,8 +443,7 @@ func (s) TestGRPCLB(t *testing.T) {

// The remote balancer sends response with duplicates to grpclb client.
func (s) TestGRPCLBWeighted(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r := manual.NewBuilderWithScheme("whatever")

tss, cleanup, err := newLoadBalancer(2, nil)
if err != nil {
Expand All @@ -470,7 +468,7 @@ func (s) TestGRPCLBWeighted(t *testing.T) {
creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down Expand Up @@ -510,8 +508,7 @@ func (s) TestGRPCLBWeighted(t *testing.T) {
}

func (s) TestDropRequest(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r := manual.NewBuilderWithScheme("whatever")

tss, cleanup, err := newLoadBalancer(2, nil)
if err != nil {
Expand All @@ -536,7 +533,7 @@ func (s) TestDropRequest(t *testing.T) {
creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down Expand Up @@ -661,8 +658,7 @@ func (s) TestDropRequest(t *testing.T) {

// When the balancer in use disconnects, grpclb should connect to the next address from resolved balancer address list.
func (s) TestBalancerDisconnects(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r := manual.NewBuilderWithScheme("whatever")

var (
tests []*testServers
Expand Down Expand Up @@ -694,7 +690,7 @@ func (s) TestBalancerDisconnects(t *testing.T) {
creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down Expand Up @@ -739,8 +735,7 @@ func (s) TestFallback(t *testing.T) {
balancer.Register(newLBBuilderWithFallbackTimeout(100 * time.Millisecond))
defer balancer.Register(newLBBuilder())

r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r := manual.NewBuilderWithScheme("whatever")

tss, cleanup, err := newLoadBalancer(1, nil)
if err != nil {
Expand Down Expand Up @@ -771,7 +766,7 @@ func (s) TestFallback(t *testing.T) {
creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down Expand Up @@ -867,8 +862,7 @@ func (s) TestFallback(t *testing.T) {
}

func (s) TestExplicitFallback(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r := manual.NewBuilderWithScheme("whatever")

tss, cleanup, err := newLoadBalancer(1, nil)
if err != nil {
Expand Down Expand Up @@ -899,7 +893,7 @@ func (s) TestExplicitFallback(t *testing.T) {
creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down Expand Up @@ -971,15 +965,14 @@ func (s) TestExplicitFallback(t *testing.T) {

func (s) TestFallBackWithNoServerAddress(t *testing.T) {
resolveNowCh := make(chan struct{}, 1)
r, cleanup := manual.GenerateAndRegisterManualResolver()
r := manual.NewBuilderWithScheme("whatever")
r.ResolveNowCallback = func(resolver.ResolveNowOptions) {
select {
case <-resolveNowCh:
default:
}
resolveNowCh <- struct{}{}
}
defer cleanup()

tss, cleanup, err := newLoadBalancer(1, nil)
if err != nil {
Expand Down Expand Up @@ -1009,7 +1002,7 @@ func (s) TestFallBackWithNoServerAddress(t *testing.T) {
creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down Expand Up @@ -1090,8 +1083,7 @@ func (s) TestFallBackWithNoServerAddress(t *testing.T) {
}

func (s) TestGRPCLBPickFirst(t *testing.T) {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r := manual.NewBuilderWithScheme("whatever")

tss, cleanup, err := newLoadBalancer(3, nil)
if err != nil {
Expand Down Expand Up @@ -1120,7 +1112,7 @@ func (s) TestGRPCLBPickFirst(t *testing.T) {
creds := serverNameCheckCreds{}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds), grpc.WithContextDialer(fakeNameDialer))
if err != nil {
t.Fatalf("Failed to dial to the backend %v", err)
Expand Down Expand Up @@ -1245,8 +1237,7 @@ func checkStats(stats, expected *rpcStats) error {
}

func runAndCheckStats(t *testing.T, drop bool, statsChan chan *lbpb.ClientStats, runRPCs func(*grpc.ClientConn), statsWant *rpcStats) error {
r, cleanup := manual.GenerateAndRegisterManualResolver()
defer cleanup()
r := manual.NewBuilderWithScheme("whatever")

tss, cleanup, err := newLoadBalancer(1, statsChan)
if err != nil {
Expand All @@ -1270,7 +1261,7 @@ func runAndCheckStats(t *testing.T, drop bool, statsChan chan *lbpb.ClientStats,

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName,
cc, err := grpc.DialContext(ctx, r.Scheme()+":///"+beServerName, grpc.WithResolvers(r),
grpc.WithTransportCredentials(&creds),
grpc.WithPerRPCCredentials(failPreRPCCred{}),
grpc.WithContextDialer(fakeNameDialer))
Expand Down
Loading

0 comments on commit 96232cc

Please sign in to comment.