diff --git a/go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go b/go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go index 4e388839212..5f94d7a4769 100644 --- a/go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go +++ b/go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go @@ -60,6 +60,18 @@ func New() *Fake { } } +func (d *Fake) Clear() { + d.gates = &gates{ + byTag: map[string][]*vtadminpb.VTGate{}, + byName: map[string]*vtadminpb.VTGate{}, + } + + d.vtctlds = &vtctlds{ + byTag: map[string][]*vtadminpb.Vtctld{}, + byName: map[string]*vtadminpb.Vtctld{}, + } +} + // AddTaggedGates adds the given gates to the discovery fake, associating each // gate with each tag. To tag different gates with multiple tags, call multiple // times with the same gates but different tag slices. Gates are uniquely diff --git a/go/vt/vtadmin/vtctldclient/config.go b/go/vt/vtadmin/vtctldclient/config.go index 2ff3ad1d34f..63f96f1a606 100644 --- a/go/vt/vtadmin/vtctldclient/config.go +++ b/go/vt/vtadmin/vtctldclient/config.go @@ -18,6 +18,7 @@ package vtctldclient import ( "fmt" + "time" "github.com/spf13/pflag" @@ -36,8 +37,12 @@ type Config struct { CredentialsPath string Cluster *vtadminpb.Cluster + + ConnectivityTimeout time.Duration } +const defaultConnectivityTimeout = 2 * time.Second + // Parse returns a new config with the given cluster and discovery, after // attempting to parse the command-line pflags into that Config. See // (*Config).Parse() for more details. @@ -61,6 +66,8 @@ func Parse(cluster *vtadminpb.Cluster, disco discovery.Discovery, args []string) func (c *Config) Parse(args []string) error { fs := pflag.NewFlagSet("", pflag.ContinueOnError) + fs.DurationVar(&c.ConnectivityTimeout, "grpc-connectivity-timeout", defaultConnectivityTimeout, "The maximum duration to wait for a gRPC connection to be established to the vtctld.") + credentialsTmplStr := fs.String("credentials-path-tmpl", "", "Go template used to specify a path to a credentials file, which is a json file containing "+ "a Username and Password. Templates are given the context of the vtctldclient.Config, "+ diff --git a/go/vt/vtadmin/vtctldclient/config_test.go b/go/vt/vtadmin/vtctldclient/config_test.go index 6ec50bc98dd..e90634b0072 100644 --- a/go/vt/vtadmin/vtctldclient/config_test.go +++ b/go/vt/vtadmin/vtctldclient/config_test.go @@ -50,10 +50,11 @@ func TestParse(t *testing.T) { require.NoError(t, err) expected := &Config{ - Cluster: nil, - Discovery: nil, - Credentials: nil, - CredentialsPath: "", + Cluster: nil, + Discovery: nil, + Credentials: nil, + CredentialsPath: "", + ConnectivityTimeout: defaultConnectivityTimeout, } assert.Equal(t, expected, cfg) }) @@ -88,9 +89,10 @@ func TestParse(t *testing.T) { Cluster: &vtadminpb.Cluster{ Name: "testcluster", }, - Discovery: nil, - Credentials: creds, - CredentialsPath: credsfile.Name(), + Discovery: nil, + Credentials: creds, + CredentialsPath: credsfile.Name(), + ConnectivityTimeout: defaultConnectivityTimeout, } assert.Equal(t, expected, cfg) diff --git a/go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go b/go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go index c767cf797ff..d1471a4c5c5 100644 --- a/go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go +++ b/go/vt/vtadmin/vtctldclient/fakevtctldclient/vtctldclient.go @@ -71,6 +71,8 @@ type VtctldClient struct { // incorrectly. var _ vtctldclient.VtctldClient = (*VtctldClient)(nil) +func (fake *VtctldClient) WaitForReady(ctx context.Context) error { return nil } + // CreateKeyspace is part of the vtctldclient.VtctldClient interface. func (fake *VtctldClient) CreateKeyspace(ctx context.Context, req *vtctldatapb.CreateKeyspaceRequest, opts ...grpc.CallOption) (*vtctldatapb.CreateKeyspaceResponse, error) { if fake.CreateKeyspaceShouldErr { diff --git a/go/vt/vtadmin/vtctldclient/proxy.go b/go/vt/vtadmin/vtctldclient/proxy.go index 46f12d2da73..10798255a17 100644 --- a/go/vt/vtadmin/vtctldclient/proxy.go +++ b/go/vt/vtadmin/vtctldclient/proxy.go @@ -26,6 +26,7 @@ import ( "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/grpcclient" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vtadmin/cluster/discovery" "vitess.io/vitess/go/vt/vtadmin/debug" "vitess.io/vitess/go/vt/vtadmin/vtadminproto" @@ -104,19 +105,31 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { if vtctld.VtctldClient != nil { if !vtctld.closed { - span.Annotate("is_noop", true) - span.Annotate("vtctld_host", vtctld.host) + waitCtx, waitCancel := context.WithTimeout(ctx, vtctld.cfg.ConnectivityTimeout) + defer waitCancel() - vtctld.lastPing = time.Now() + if err := vtctld.VtctldClient.WaitForReady(waitCtx); err == nil { + // Our cached connection is still open and ready, so we're good to go. + span.Annotate("is_noop", true) + span.Annotate("vtctld_host", vtctld.host) - return nil + vtctld.lastPing = time.Now() + + return nil + } + // If WaitForReady returns an error, that indicates our cached connection + // is no longer valid. We fall through to close the cached connection, + // discover a new vtctld, and establish a new connection. } span.Annotate("is_stale", true) // close before reopen. this is safe to call on an already-closed client. if err := vtctld.Close(); err != nil { - return fmt.Errorf("error closing possibly-stale connection before re-dialing: %w", err) + // Even if the client connection does not shut down cleanly, we don't want to block + // Dial from discovering a new vtctld. This makes VTAdmin's dialer more resilient, + // but, as a caveat, it _can_ potentially leak improperly-closed gRPC connections. + log.Errorf("error closing possibly-stale connection before re-dialing: %w", err) } } @@ -144,6 +157,19 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error { return err } + waitCtx, waitCancel := context.WithTimeout(ctx, vtctld.cfg.ConnectivityTimeout) + defer waitCancel() + + if err := client.WaitForReady(waitCtx); err != nil { + // If the gRPC connection does not transition to a READY state within the context timeout, + // then return an error. The onus to redial (or not) is on the caller of the Dial function. + // As an enhancement, we could update this Dial function to try redialing the discovered vtctld + // a few times with a backoff before giving up. + log.Infof("Could not transition to READY state for gRPC connection to %s: %s\n", addr, err.Error()) + return err + } + + log.Infof("Established gRPC connection to vtctld %s\n", addr) vtctld.dialedAt = time.Now() vtctld.host = addr vtctld.VtctldClient = client @@ -161,12 +187,16 @@ func (vtctld *ClientProxy) Close() error { } err := vtctld.VtctldClient.Close() + + // Mark the vtctld connection as "closed" from the proxy side even if + // the client connection does not shut down cleanly. This makes VTAdmin's dialer more resilient, + // but, as a caveat, it _can_ potentially leak improperly-closed gRPC connections. + vtctld.closed = true + if err != nil { return err } - vtctld.closed = true - return nil } diff --git a/go/vt/vtadmin/vtctldclient/proxy_test.go b/go/vt/vtadmin/vtctldclient/proxy_test.go index d256a030a54..3ad94e09d21 100644 --- a/go/vt/vtadmin/vtctldclient/proxy_test.go +++ b/go/vt/vtadmin/vtctldclient/proxy_test.go @@ -20,6 +20,7 @@ import ( "context" "net" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -34,16 +35,25 @@ type fakeVtctld struct { vtctlservicepb.VtctlServer } -func TestDial(t *testing.T) { +func initVtctlServer() (net.Listener, *grpc.Server, error) { listener, err := net.Listen("tcp", "127.0.0.1:0") - require.NoError(t, err) - - defer listener.Close() + if err != nil { + return nil, nil, err + } vtctld := &fakeVtctld{} server := grpc.NewServer() vtctlservicepb.RegisterVtctlServer(server, vtctld) + return listener, server, err +} + +func TestDial(t *testing.T) { + listener, server, err := initVtctlServer() + require.NoError(t, err) + + defer listener.Close() + go server.Serve(listener) defer server.Stop() @@ -57,7 +67,8 @@ func TestDial(t *testing.T) { Id: "test", Name: "testcluster", }, - Discovery: disco, + Discovery: disco, + ConnectivityTimeout: defaultConnectivityTimeout, }) // We don't have a vtctld host until we call Dial @@ -67,3 +78,94 @@ func TestDial(t *testing.T) { assert.NoError(t, err) assert.Equal(t, listener.Addr().String(), proxy.host) } + +// TestRedial tests that vtadmin-api is able to recover from a lost connection to +// a vtctld by rediscovering and redialing a new one. +func TestRedial(t *testing.T) { + // Initialize vtctld #1 + listener1, server1, err := initVtctlServer() + require.NoError(t, err) + + defer listener1.Close() + + go server1.Serve(listener1) + defer server1.Stop() + + // Initialize vtctld #2 + listener2, server2, err := initVtctlServer() + require.NoError(t, err) + + defer listener2.Close() + + go server2.Serve(listener2) + defer server2.Stop() + + // Register both vtctlds with VTAdmin + disco := fakediscovery.New() + disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{ + Hostname: listener1.Addr().String(), + }, &vtadminpb.Vtctld{ + Hostname: listener2.Addr().String(), + }) + + proxy := New(&Config{ + Cluster: &vtadminpb.Cluster{ + Id: "test", + Name: "testcluster", + }, + Discovery: disco, + ConnectivityTimeout: defaultConnectivityTimeout, + }) + + // We don't have a vtctld host until we call Dial + require.Empty(t, proxy.host) + + // Check for a successful connection to whichever vtctld we discover first. + err = proxy.Dial(context.Background()) + assert.NoError(t, err) + + // vtadmin's fakediscovery package discovers vtctlds in random order. Rather + // than force some cumbersome sequential logic, we can just do a switcheroo + // here in the test to determine our "current" and (expected) "next" vtctlds. + var currentVtctld *grpc.Server + var nextAddr string + + switch proxy.host { + case listener1.Addr().String(): + currentVtctld = server1 + nextAddr = listener2.Addr().String() + + case listener2.Addr().String(): + currentVtctld = server2 + nextAddr = listener1.Addr().String() + default: + t.Fatalf("invalid proxy host %s", proxy.host) + } + + // Remove the shut down vtctld from VTAdmin's service discovery (clumsily). + // Otherwise, when redialing, we may redial the vtctld that we just shut down. + disco.Clear() + disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{ + Hostname: nextAddr, + }) + + // Force an ungraceful shutdown of the gRPC server to which we're connected + currentVtctld.Stop() + + // Wait for the client connection to shut down. (If we redial too quickly, + // we get into a race condition with gRPC's internal retry logic. + // (Using WaitForReady here _does_ expose more function internals than is ideal for a unit test, + // but it's far less flaky than using time.Sleep.) + for { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err = proxy.VtctldClient.WaitForReady(ctx); err != nil { + break + } + } + + // Finally, check that we discover, dial + establish a new connection to the remaining vtctld. + err = proxy.Dial(context.Background()) + assert.NoError(t, err) + assert.Equal(t, nextAddr, proxy.host) +} diff --git a/go/vt/vtctl/grpcvtctldclient/client.go b/go/vt/vtctl/grpcvtctldclient/client.go index e688e998da6..2dc57a546fd 100644 --- a/go/vt/vtctl/grpcvtctldclient/client.go +++ b/go/vt/vtctl/grpcvtctldclient/client.go @@ -19,7 +19,12 @@ limitations under the License. package grpcvtctldclient import ( + "context" + "errors" + "fmt" + "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/vtctl/grpcclientcommon" @@ -28,6 +33,11 @@ import ( vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice" ) +var ( + ErrConnectionShutdown = errors.New("gRPCVtctldClient in a SHUTDOWN state") + ErrConnectionTimeout = errors.New("gRPC connection wait time exceeded") +) + const connClosedMsg = "grpc: the client connection is closed" type gRPCVtctldClient struct { @@ -69,6 +79,7 @@ func NewWithDialOpts(addr string, failFast grpcclient.FailFast, opts ...grpc.Dia }, nil } +// Close is part of the vtctldclient.VtctldClient interface. func (client *gRPCVtctldClient) Close() error { err := client.cc.Close() if err == nil { @@ -78,6 +89,45 @@ func (client *gRPCVtctldClient) Close() error { return err } +// WaitForReady is part of the vtctldclient.VtctldClient interface. +func (client *gRPCVtctldClient) WaitForReady(ctx context.Context) error { + // The gRPC implementation of WaitForReady uses the gRPC Connectivity API + // See https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md + for { + select { + // A READY connection to the vtctld could not be established + // within the context timeout. The caller should close their + // existing connection and establish a new one. + case <-ctx.Done(): + return ErrConnectionTimeout + + // Wait to transition to READY state + default: + connState := client.cc.GetState() + + switch connState { + case connectivity.Ready: + return nil + + // Per https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md, + // a client that enters the SHUTDOWN state never leaves this state, and all new RPCs should + // fail immediately. Further polling is futile, in other words, and so we + // return an error immediately to indicate that the caller can close the connection. + case connectivity.Shutdown: + return ErrConnectionShutdown + + // If the connection is IDLE, CONNECTING, or in a TRANSIENT_FAILURE mode, + // then we wait to see if it will transition to a READY state. + default: + if !client.cc.WaitForStateChange(ctx, connState) { + // If the client has failed to transition, fail so that the caller can close the connection. + return fmt.Errorf("failed to transition from state %s", connState) + } + } + } + } +} + func init() { vtctldclient.Register("grpc", gRPCVtctldClientFactory) } diff --git a/go/vt/vtctl/grpcvtctldclient/client_test.go b/go/vt/vtctl/grpcvtctldclient/client_test.go index 9f6654cc49d..f1a39d056ed 100644 --- a/go/vt/vtctl/grpcvtctldclient/client_test.go +++ b/go/vt/vtctl/grpcvtctldclient/client_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package grpcvtctldclient_test +package grpcvtctldclient import ( "context" @@ -35,6 +35,34 @@ import ( vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice" ) +func TestWaitForReady(t *testing.T) { + ts := memorytopo.NewServer("cell1") + vtctld := testutil.NewVtctldServerWithTabletManagerClient(t, ts, nil, func(ts *topo.Server) vtctlservicepb.VtctldServer { + return grpcvtctldserver.NewVtctldServer(ts) + }) + + testutil.WithTestServer(t, vtctld, func(t *testing.T, client vtctldclient.VtctldClient) { + ctx := context.Background() + err := client.WaitForReady(ctx) + assert.NoError(t, err) + }) +} + +func TestWaitForReadyShutdown(t *testing.T) { + ts := memorytopo.NewServer("cell1") + vtctld := testutil.NewVtctldServerWithTabletManagerClient(t, ts, nil, func(ts *topo.Server) vtctlservicepb.VtctldServer { + return grpcvtctldserver.NewVtctldServer(ts) + }) + + testutil.WithTestServer(t, vtctld, func(t *testing.T, client vtctldclient.VtctldClient) { + client.Close() + ctx := context.Background() + err := client.WaitForReady(ctx) + + assert.Error(t, ErrConnectionShutdown, err) + }) +} + func TestFindAllShardsInKeyspace(t *testing.T) { ctx := context.Background() ts := memorytopo.NewServer("cell1") diff --git a/go/vt/vtctl/localvtctldclient/client.go b/go/vt/vtctl/localvtctldclient/client.go index b870247fe07..3f91a162abc 100644 --- a/go/vt/vtctl/localvtctldclient/client.go +++ b/go/vt/vtctl/localvtctldclient/client.go @@ -17,6 +17,7 @@ limitations under the License. package localvtctldclient import ( + "context" "errors" "sync" @@ -37,6 +38,9 @@ type localVtctldClient struct { // Close is part of the vtctldclient.VtctldClient interface. func (client *localVtctldClient) Close() error { return nil } +// WaitForReady is part of the vtctldclient.VtctldClient interface. +func (client *localVtctldClient) WaitForReady(ctx context.Context) error { return nil } + //go:generate -command localvtctldclient go run ../vtctldclient/codegen //go:generate localvtctldclient -targetpkg localvtctldclient -impl localVtctldClient -out client_gen.go -local diff --git a/go/vt/vtctl/vtctldclient/client.go b/go/vt/vtctl/vtctldclient/client.go index e064b8bd9ae..c5be65bd9a2 100644 --- a/go/vt/vtctl/vtctldclient/client.go +++ b/go/vt/vtctl/vtctldclient/client.go @@ -3,16 +3,22 @@ package vtctldclient import ( + "context" "fmt" "log" vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice" ) -// VtctldClient augments the vtctlservicepb.VtctlClient interface with io.Closer. type VtctldClient interface { vtctlservicepb.VtctldClient + + // Close augments the vtctlservicepb.VtctlClient interface with io.Closer. Close() error + + // WaitForReady waits until the connection to the vtctld is in a ready state, + // or until the context times out. + WaitForReady(ctx context.Context) error } // Factory is a function that creates new VtctldClients.