diff --git a/go/vt/vtgateproxy/discovery.go b/go/vt/vtgateproxy/discovery.go index e71fd14f3c9..a396237bad3 100644 --- a/go/vt/vtgateproxy/discovery.go +++ b/go/vt/vtgateproxy/discovery.go @@ -27,6 +27,7 @@ import ( "sync" "time" + "google.golang.org/grpc/attributes" "google.golang.org/grpc/resolver" "vitess.io/vitess/go/stats" @@ -57,6 +58,8 @@ import ( // type: Only select from hosts of this type (required) // +const PoolTypeAttr = "PoolType" + // Resolver(https://godoc.org/google.golang.org/grpc/resolver#Resolver). type JSONGateResolver struct { target resolver.Target @@ -385,7 +388,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error { var addrs []resolver.Address for _, target := range targets { - addrs = append(addrs, resolver.Address{Addr: target.Addr}) + addrs = append(addrs, resolver.Address{Addr: target.Addr, Attributes: attributes.New(PoolTypeAttr, r.poolType)}) } log.V(100).Infof("updated targets for %s to %v", r.target.URL.String(), targets) diff --git a/go/vt/vtgateproxy/firstready_balancer.go b/go/vt/vtgateproxy/firstready_balancer.go index ed884224c64..d9dd380861b 100644 --- a/go/vt/vtgateproxy/firstready_balancer.go +++ b/go/vt/vtgateproxy/firstready_balancer.go @@ -42,7 +42,7 @@ import ( // newBuilder creates a new first_ready balancer builder. func newBuilder() balancer.Builder { - return base.NewBalancerBuilder("first_ready", &frPickerBuilder{}, base.Config{HealthCheck: true}) + return base.NewBalancerBuilder("first_ready", &frPickerBuilder{currentConns: map[string]balancer.SubConn{}}, base.Config{HealthCheck: true}) } func init() { @@ -54,8 +54,8 @@ func init() { // Once a conn is chosen and is in the ready state, it will remain as the // active subconn even if other connections become available. type frPickerBuilder struct { - mu sync.Mutex - currentConn balancer.SubConn + mu sync.Mutex + currentConns map[string]balancer.SubConn } func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { @@ -68,33 +68,35 @@ func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker { f.mu.Lock() defer f.mu.Unlock() - // If we've already chosen a subconn, and it is still in the ready list, then - // no need to change state - if f.currentConn != nil { - log.V(100).Infof("first_ready: currentConn is active, checking if still ready") - for sc := range info.ReadySCs { - if f.currentConn == sc { - log.V(100).Infof("first_ready: currentConn still active - not changing") - return f - } + // First check if there is a previously chosen subconn in the ready list + for sc, scInfo := range info.ReadySCs { + // poolType is a required attribute to be set by discovery + poolType := scInfo.Address.Attributes.Value(PoolTypeAttr).(string) + currentConn := f.currentConns[poolType] + if currentConn == sc { + log.V(100).Infof("first_ready: conn for pool %s still active - keeping addr %s", poolType, scInfo.Address.Addr) + return &frPicker{sc} } } // Otherwise either we don't have an active conn or the conn we were using is // no longer active, so pick an arbitrary new one out of the map. - log.V(100).Infof("first_ready: currentConn is not active, picking a new one") - for sc := range info.ReadySCs { - f.currentConn = sc - break + for sc, scInfo := range info.ReadySCs { + poolType := scInfo.Address.Attributes.Value(PoolTypeAttr).(string) + log.V(100).Infof("first_ready: conn for pool %s not active - picked %s", poolType, scInfo.Address.Addr) + f.currentConns[poolType] = sc + return &frPicker{sc} } - return f + return base.NewErrPicker(errors.New("error should not be reachable")) } -// Pick simply returns the currently chosen conn -func (f *frPickerBuilder) Pick(balancer.PickInfo) (balancer.PickResult, error) { - f.mu.Lock() - defer f.mu.Unlock() +// frPicker is a trivial picker that just wraps the one chosen conn +type frPicker struct { + currentConn balancer.SubConn +} - return balancer.PickResult{SubConn: f.currentConn}, nil +// Pick simply returns the currently chosen conn +func (p *frPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + return balancer.PickResult{SubConn: p.currentConn}, nil }