Skip to content

Commit

Permalink
[vtadmin] Update vtctld dialer to validate connectivity (#9915)
Browse files Browse the repository at this point in the history
* Add WaitForReady to vtctldclient interface

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Implement WaitForReady in grpcvtctldclient

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Call WaitForReady in VTAdmin's vtctld proxy + add a test

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Nit: typo

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Add a grpc-connectivity-timeout config flag

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Use WaitForReady instead of time.Sleep to detect client shutdown in TestRedial

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Fix TestDial by adding ConnectivityTimeout option

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Dedupe test logic with initVtctlServer helper

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Add WaitForReady tests in grpcvtctldclient/client_test.go

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Nits: wording + grammar

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Don't early return from Dial on Close errors

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Import the correct logging framework >:(

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Remove extraneous log statements

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Add defaultConnectivityTimeout var for use in unit tests

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Add WaitForReady to fakevtctldclient

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* Use defaultConnectivityTimeout in proxy_test

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>

* var defaultConnectivityTimeout -> const

Signed-off-by: Sara Bee <855595+doeg@users.noreply.github.com>
  • Loading branch information
doeg authored Mar 22, 2022
1 parent 9dfef96 commit 1b59109
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 21 deletions.
12 changes: 12 additions & 0 deletions go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ func New() *Fake {
}
}

func (d *Fake) Clear() {
d.gates = &gates{
byTag: map[string][]*vtadminpb.VTGate{},
byName: map[string]*vtadminpb.VTGate{},
}

d.vtctlds = &vtctlds{
byTag: map[string][]*vtadminpb.Vtctld{},
byName: map[string]*vtadminpb.Vtctld{},
}
}

// AddTaggedGates adds the given gates to the discovery fake, associating each
// gate with each tag. To tag different gates with multiple tags, call multiple
// times with the same gates but different tag slices. Gates are uniquely
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vtadmin/vtctldclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vtctldclient

import (
"fmt"
"time"

"github.com/spf13/pflag"

Expand All @@ -36,8 +37,12 @@ type Config struct {
CredentialsPath string

Cluster *vtadminpb.Cluster

ConnectivityTimeout time.Duration
}

const defaultConnectivityTimeout = 2 * time.Second

// Parse returns a new config with the given cluster and discovery, after
// attempting to parse the command-line pflags into that Config. See
// (*Config).Parse() for more details.
Expand All @@ -61,6 +66,8 @@ func Parse(cluster *vtadminpb.Cluster, disco discovery.Discovery, args []string)
func (c *Config) Parse(args []string) error {
fs := pflag.NewFlagSet("", pflag.ContinueOnError)

fs.DurationVar(&c.ConnectivityTimeout, "grpc-connectivity-timeout", defaultConnectivityTimeout, "The maximum duration to wait for a gRPC connection to be established to the vtctld.")

credentialsTmplStr := fs.String("credentials-path-tmpl", "",
"Go template used to specify a path to a credentials file, which is a json file containing "+
"a Username and Password. Templates are given the context of the vtctldclient.Config, "+
Expand Down
16 changes: 9 additions & 7 deletions go/vt/vtadmin/vtctldclient/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ func TestParse(t *testing.T) {
require.NoError(t, err)

expected := &Config{
Cluster: nil,
Discovery: nil,
Credentials: nil,
CredentialsPath: "",
Cluster: nil,
Discovery: nil,
Credentials: nil,
CredentialsPath: "",
ConnectivityTimeout: defaultConnectivityTimeout,
}
assert.Equal(t, expected, cfg)
})
Expand Down Expand Up @@ -88,9 +89,10 @@ func TestParse(t *testing.T) {
Cluster: &vtadminpb.Cluster{
Name: "testcluster",
},
Discovery: nil,
Credentials: creds,
CredentialsPath: credsfile.Name(),
Discovery: nil,
Credentials: creds,
CredentialsPath: credsfile.Name(),
ConnectivityTimeout: defaultConnectivityTimeout,
}

assert.Equal(t, expected, cfg)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type VtctldClient struct {
// incorrectly.
var _ vtctldclient.VtctldClient = (*VtctldClient)(nil)

func (fake *VtctldClient) WaitForReady(ctx context.Context) error { return nil }

// CreateKeyspace is part of the vtctldclient.VtctldClient interface.
func (fake *VtctldClient) CreateKeyspace(ctx context.Context, req *vtctldatapb.CreateKeyspaceRequest, opts ...grpc.CallOption) (*vtctldatapb.CreateKeyspaceResponse, error) {
if fake.CreateKeyspaceShouldErr {
Expand Down
44 changes: 37 additions & 7 deletions go/vt/vtadmin/vtctldclient/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtadmin/cluster/discovery"
"vitess.io/vitess/go/vt/vtadmin/debug"
"vitess.io/vitess/go/vt/vtadmin/vtadminproto"
Expand Down Expand Up @@ -104,19 +105,31 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error {

if vtctld.VtctldClient != nil {
if !vtctld.closed {
span.Annotate("is_noop", true)
span.Annotate("vtctld_host", vtctld.host)
waitCtx, waitCancel := context.WithTimeout(ctx, vtctld.cfg.ConnectivityTimeout)
defer waitCancel()

vtctld.lastPing = time.Now()
if err := vtctld.VtctldClient.WaitForReady(waitCtx); err == nil {
// Our cached connection is still open and ready, so we're good to go.
span.Annotate("is_noop", true)
span.Annotate("vtctld_host", vtctld.host)

return nil
vtctld.lastPing = time.Now()

return nil
}
// If WaitForReady returns an error, that indicates our cached connection
// is no longer valid. We fall through to close the cached connection,
// discover a new vtctld, and establish a new connection.
}

span.Annotate("is_stale", true)

// close before reopen. this is safe to call on an already-closed client.
if err := vtctld.Close(); err != nil {
return fmt.Errorf("error closing possibly-stale connection before re-dialing: %w", err)
// Even if the client connection does not shut down cleanly, we don't want to block
// Dial from discovering a new vtctld. This makes VTAdmin's dialer more resilient,
// but, as a caveat, it _can_ potentially leak improperly-closed gRPC connections.
log.Errorf("error closing possibly-stale connection before re-dialing: %w", err)
}
}

Expand Down Expand Up @@ -144,6 +157,19 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error {
return err
}

waitCtx, waitCancel := context.WithTimeout(ctx, vtctld.cfg.ConnectivityTimeout)
defer waitCancel()

if err := client.WaitForReady(waitCtx); err != nil {
// If the gRPC connection does not transition to a READY state within the context timeout,
// then return an error. The onus to redial (or not) is on the caller of the Dial function.
// As an enhancement, we could update this Dial function to try redialing the discovered vtctld
// a few times with a backoff before giving up.
log.Infof("Could not transition to READY state for gRPC connection to %s: %s\n", addr, err.Error())
return err
}

log.Infof("Established gRPC connection to vtctld %s\n", addr)
vtctld.dialedAt = time.Now()
vtctld.host = addr
vtctld.VtctldClient = client
Expand All @@ -161,12 +187,16 @@ func (vtctld *ClientProxy) Close() error {
}

err := vtctld.VtctldClient.Close()

// Mark the vtctld connection as "closed" from the proxy side even if
// the client connection does not shut down cleanly. This makes VTAdmin's dialer more resilient,
// but, as a caveat, it _can_ potentially leak improperly-closed gRPC connections.
vtctld.closed = true

if err != nil {
return err
}

vtctld.closed = true

return nil
}

Expand Down
112 changes: 107 additions & 5 deletions go/vt/vtadmin/vtctldclient/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -34,16 +35,25 @@ type fakeVtctld struct {
vtctlservicepb.VtctlServer
}

func TestDial(t *testing.T) {
func initVtctlServer() (net.Listener, *grpc.Server, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

defer listener.Close()
if err != nil {
return nil, nil, err
}

vtctld := &fakeVtctld{}
server := grpc.NewServer()
vtctlservicepb.RegisterVtctlServer(server, vtctld)

return listener, server, err
}

func TestDial(t *testing.T) {
listener, server, err := initVtctlServer()
require.NoError(t, err)

defer listener.Close()

go server.Serve(listener)
defer server.Stop()

Expand All @@ -57,7 +67,8 @@ func TestDial(t *testing.T) {
Id: "test",
Name: "testcluster",
},
Discovery: disco,
Discovery: disco,
ConnectivityTimeout: defaultConnectivityTimeout,
})

// We don't have a vtctld host until we call Dial
Expand All @@ -67,3 +78,94 @@ func TestDial(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, listener.Addr().String(), proxy.host)
}

// TestRedial tests that vtadmin-api is able to recover from a lost connection to
// a vtctld by rediscovering and redialing a new one.
func TestRedial(t *testing.T) {
// Initialize vtctld #1
listener1, server1, err := initVtctlServer()
require.NoError(t, err)

defer listener1.Close()

go server1.Serve(listener1)
defer server1.Stop()

// Initialize vtctld #2
listener2, server2, err := initVtctlServer()
require.NoError(t, err)

defer listener2.Close()

go server2.Serve(listener2)
defer server2.Stop()

// Register both vtctlds with VTAdmin
disco := fakediscovery.New()
disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{
Hostname: listener1.Addr().String(),
}, &vtadminpb.Vtctld{
Hostname: listener2.Addr().String(),
})

proxy := New(&Config{
Cluster: &vtadminpb.Cluster{
Id: "test",
Name: "testcluster",
},
Discovery: disco,
ConnectivityTimeout: defaultConnectivityTimeout,
})

// We don't have a vtctld host until we call Dial
require.Empty(t, proxy.host)

// Check for a successful connection to whichever vtctld we discover first.
err = proxy.Dial(context.Background())
assert.NoError(t, err)

// vtadmin's fakediscovery package discovers vtctlds in random order. Rather
// than force some cumbersome sequential logic, we can just do a switcheroo
// here in the test to determine our "current" and (expected) "next" vtctlds.
var currentVtctld *grpc.Server
var nextAddr string

switch proxy.host {
case listener1.Addr().String():
currentVtctld = server1
nextAddr = listener2.Addr().String()

case listener2.Addr().String():
currentVtctld = server2
nextAddr = listener1.Addr().String()
default:
t.Fatalf("invalid proxy host %s", proxy.host)
}

// Remove the shut down vtctld from VTAdmin's service discovery (clumsily).
// Otherwise, when redialing, we may redial the vtctld that we just shut down.
disco.Clear()
disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{
Hostname: nextAddr,
})

// Force an ungraceful shutdown of the gRPC server to which we're connected
currentVtctld.Stop()

// Wait for the client connection to shut down. (If we redial too quickly,
// we get into a race condition with gRPC's internal retry logic.
// (Using WaitForReady here _does_ expose more function internals than is ideal for a unit test,
// but it's far less flaky than using time.Sleep.)
for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err = proxy.VtctldClient.WaitForReady(ctx); err != nil {
break
}
}

// Finally, check that we discover, dial + establish a new connection to the remaining vtctld.
err = proxy.Dial(context.Background())
assert.NoError(t, err)
assert.Equal(t, nextAddr, proxy.host)
}
50 changes: 50 additions & 0 deletions go/vt/vtctl/grpcvtctldclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ limitations under the License.
package grpcvtctldclient

import (
"context"
"errors"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/vtctl/grpcclientcommon"
Expand All @@ -28,6 +33,11 @@ import (
vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice"
)

var (
ErrConnectionShutdown = errors.New("gRPCVtctldClient in a SHUTDOWN state")
ErrConnectionTimeout = errors.New("gRPC connection wait time exceeded")
)

const connClosedMsg = "grpc: the client connection is closed"

type gRPCVtctldClient struct {
Expand Down Expand Up @@ -69,6 +79,7 @@ func NewWithDialOpts(addr string, failFast grpcclient.FailFast, opts ...grpc.Dia
}, nil
}

// Close is part of the vtctldclient.VtctldClient interface.
func (client *gRPCVtctldClient) Close() error {
err := client.cc.Close()
if err == nil {
Expand All @@ -78,6 +89,45 @@ func (client *gRPCVtctldClient) Close() error {
return err
}

// WaitForReady is part of the vtctldclient.VtctldClient interface.
func (client *gRPCVtctldClient) WaitForReady(ctx context.Context) error {
// The gRPC implementation of WaitForReady uses the gRPC Connectivity API
// See https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
for {
select {
// A READY connection to the vtctld could not be established
// within the context timeout. The caller should close their
// existing connection and establish a new one.
case <-ctx.Done():
return ErrConnectionTimeout

// Wait to transition to READY state
default:
connState := client.cc.GetState()

switch connState {
case connectivity.Ready:
return nil

// Per https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md,
// a client that enters the SHUTDOWN state never leaves this state, and all new RPCs should
// fail immediately. Further polling is futile, in other words, and so we
// return an error immediately to indicate that the caller can close the connection.
case connectivity.Shutdown:
return ErrConnectionShutdown

// If the connection is IDLE, CONNECTING, or in a TRANSIENT_FAILURE mode,
// then we wait to see if it will transition to a READY state.
default:
if !client.cc.WaitForStateChange(ctx, connState) {
// If the client has failed to transition, fail so that the caller can close the connection.
return fmt.Errorf("failed to transition from state %s", connState)
}
}
}
}
}

func init() {
vtctldclient.Register("grpc", gRPCVtctldClientFactory)
}
Loading

0 comments on commit 1b59109

Please sign in to comment.