Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clientv3: fix secure endpoint failover, refactor with gRPC 1.22 upgrade #10911

Merged
merged 12 commits into from
Jul 26, 2019
Merged
118 changes: 70 additions & 48 deletions clientv3/balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,45 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package balancer implements client balancer.
package balancer

import (
"fmt"
"strconv"
"sync"
"time"

"go.etcd.io/etcd/clientv3/balancer/connectivity"
"go.etcd.io/etcd/clientv3/balancer/picker"

"go.uber.org/zap"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
grpcconnectivity "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
_ "google.golang.org/grpc/resolver/dns" // register DNS resolver
_ "google.golang.org/grpc/resolver/passthrough" // register passthrough resolver
)

// Config defines balancer configurations.
type Config struct {
// Policy configures balancer policy.
Policy picker.Policy

// Picker implements gRPC picker.
// Leave empty if "Policy" field is not custom.
// TODO: currently custom policy is not supported.
// Picker picker.Picker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove comments about picker configuration and Custom picker enum in this PR and instead open an issue about it? It's a adds quite a bit of noise in this PR, might be better to just leave it out of the code until we decide to implement something. Same for custom balancers.


// Name defines an additional name for balancer.
// Useful for balancer testing to avoid register conflicts.
// If empty, defaults to policy name.
Name string

// Logger configures balancer logging.
// If nil, logs are discarded.
Logger *zap.Logger
}

// RegisterBuilder creates and registers a builder. Since this function calls balancer.Register, it
// must be invoked at initialization time.
func RegisterBuilder(cfg Config) {
Expand Down Expand Up @@ -59,16 +80,13 @@ func (b *builder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balan

addrToSc: make(map[resolver.Address]balancer.SubConn),
scToAddr: make(map[balancer.SubConn]resolver.Address),
scToSt: make(map[balancer.SubConn]connectivity.State),
scToSt: make(map[balancer.SubConn]grpcconnectivity.State),

currentConn: nil,
csEvltr: &connectivityStateEvaluator{},
currentConn: nil,
connectivityRecorder: connectivity.New(b.cfg.Logger),

// initialize picker always returns "ErrNoSubConnAvailable"
Picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}
if bb.lg == nil {
bb.lg = zap.NewNop()
picker: picker.NewErr(balancer.ErrNoSubConnAvailable),
}

// TODO: support multiple connections
Expand Down Expand Up @@ -112,13 +130,12 @@ type baseBalancer struct {

addrToSc map[resolver.Address]balancer.SubConn
scToAddr map[balancer.SubConn]resolver.Address
scToSt map[balancer.SubConn]connectivity.State
scToSt map[balancer.SubConn]grpcconnectivity.State

currentConn balancer.ClientConn
currentState connectivity.State
csEvltr *connectivityStateEvaluator
currentConn balancer.ClientConn
connectivityRecorder connectivity.Recorder

picker.Picker
picker picker.Picker
}

// HandleResolvedAddrs implements "grpc/balancer.Balancer" interface.
Expand All @@ -128,7 +145,11 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
bb.lg.Warn("HandleResolvedAddrs called with error", zap.String("balancer-id", bb.id), zap.Error(err))
return
}
bb.lg.Info("resolved", zap.String("balancer-id", bb.id), zap.Strings("addresses", addrsToStrings(addrs)))
bb.lg.Info("resolved",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.Strings("addresses", addrsToStrings(addrs)),
)

bb.mu.Lock()
defer bb.mu.Unlock()
Expand All @@ -139,12 +160,13 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
if _, ok := bb.addrToSc[addr]; !ok {
sc, err := bb.currentConn.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{})
if err != nil {
bb.lg.Warn("NewSubConn failed", zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
bb.lg.Warn("NewSubConn failed", zap.String("picker", bb.picker.String()), zap.String("balancer-id", bb.id), zap.Error(err), zap.String("address", addr.Addr))
continue
}
bb.lg.Info("created subconn", zap.String("address", addr.Addr))
bb.addrToSc[addr] = sc
bb.scToAddr[sc] = addr
bb.scToSt[sc] = connectivity.Idle
bb.scToSt[sc] = grpcconnectivity.Idle
sc.Connect()
}
}
Expand All @@ -157,6 +179,7 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)

bb.lg.Info(
"removed subconn",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("address", addr.Addr),
zap.String("subconn", scToString(sc)),
Expand All @@ -171,95 +194,94 @@ func (bb *baseBalancer) HandleResolvedAddrs(addrs []resolver.Address, err error)
}

// HandleSubConnStateChange implements "grpc/balancer.Balancer" interface.
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
func (bb *baseBalancer) HandleSubConnStateChange(sc balancer.SubConn, s grpcconnectivity.State) {
bb.mu.Lock()
defer bb.mu.Unlock()

old, ok := bb.scToSt[sc]
if !ok {
bb.lg.Warn(
"state change for an unknown subconn",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("subconn", scToString(sc)),
zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("state", s.String()),
)
return
}

bb.lg.Info(
"state changed",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.Bool("connected", s == connectivity.Ready),
zap.Bool("connected", s == grpcconnectivity.Ready),
zap.String("subconn", scToString(sc)),
zap.Int("subconn-size", len(bb.scToAddr)),
zap.String("address", bb.scToAddr[sc].Addr),
zap.String("old-state", old.String()),
zap.String("new-state", s.String()),
)

bb.scToSt[sc] = s
switch s {
case connectivity.Idle:
case grpcconnectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
case grpcconnectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scToSt. Remove state for this sc here.
delete(bb.scToAddr, sc)
delete(bb.scToSt, sc)
}

oldAggrState := bb.currentState
bb.currentState = bb.csEvltr.recordTransition(old, s)
oldAggrState := bb.connectivityRecorder.GetCurrentState()
bb.connectivityRecorder.RecordTransition(old, s)

// Regenerate picker when one of the following happens:
// Update balancer picker when one of the following happens:
// - this sc became ready from not-ready
// - this sc became not-ready from ready
// - the aggregated state of balancer became TransientFailure from non-TransientFailure
// - the aggregated state of balancer became non-TransientFailure from TransientFailure
if (s == connectivity.Ready) != (old == connectivity.Ready) ||
(bb.currentState == connectivity.TransientFailure) != (oldAggrState == connectivity.TransientFailure) {
bb.regeneratePicker()
if (s == grpcconnectivity.Ready) != (old == grpcconnectivity.Ready) ||
(bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure) != (oldAggrState == grpcconnectivity.TransientFailure) {
bb.updatePicker()
}

bb.currentConn.UpdateBalancerState(bb.currentState, bb.Picker)
bb.currentConn.UpdateBalancerState(bb.connectivityRecorder.GetCurrentState(), bb.picker)
}

func (bb *baseBalancer) regeneratePicker() {
if bb.currentState == connectivity.TransientFailure {
func (bb *baseBalancer) updatePicker() {
if bb.connectivityRecorder.GetCurrentState() == grpcconnectivity.TransientFailure {
bb.picker = picker.NewErr(balancer.ErrTransientFailure)
bb.lg.Info(
"generated transient error picker",
"updated picker to transient error picker",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
)
bb.Picker = picker.NewErr(balancer.ErrTransientFailure)
return
}

// only pass ready subconns to picker
scs := make([]balancer.SubConn, 0)
addrToSc := make(map[resolver.Address]balancer.SubConn)
scToAddr := make(map[balancer.SubConn]resolver.Address)
for addr, sc := range bb.addrToSc {
if st, ok := bb.scToSt[sc]; ok && st == connectivity.Ready {
scs = append(scs, sc)
addrToSc[addr] = sc
if st, ok := bb.scToSt[sc]; ok && st == grpcconnectivity.Ready {
scToAddr[sc] = addr
}
}

switch bb.policy {
case picker.RoundrobinBalanced:
bb.Picker = picker.NewRoundrobinBalanced(bb.lg, scs, addrToSc, scToAddr)

default:
panic(fmt.Errorf("invalid balancer picker policy (%d)", bb.policy))
}

bb.picker = picker.New(picker.Config{
Policy: bb.policy,
Logger: bb.lg,
SubConnToResolverAddress: scToAddr,
})
bb.lg.Info(
"generated picker",
"updated picker",
zap.String("picker", bb.picker.String()),
zap.String("balancer-id", bb.id),
zap.String("policy", bb.policy.String()),
zap.Strings("subconn-ready", scsToStrings(addrToSc)),
zap.Int("subconn-size", len(addrToSc)),
zap.Strings("subconn-ready", scsToStrings(scToAddr)),
zap.Int("subconn-size", len(scToAddr)),
)
}

Expand Down
36 changes: 0 additions & 36 deletions clientv3/balancer/config.go

This file was deleted.

58 changes: 0 additions & 58 deletions clientv3/balancer/connectivity.go

This file was deleted.

Loading