Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[vtadmin] Update vtctld dialer to validate connectivity #9915

Merged
merged 17 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions go/vt/vtadmin/cluster/discovery/fakediscovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ func New() *Fake {
}
}

func (d *Fake) Clear() {
d.gates = &gates{
byTag: map[string][]*vtadminpb.VTGate{},
byName: map[string]*vtadminpb.VTGate{},
}

d.vtctlds = &vtctlds{
byTag: map[string][]*vtadminpb.Vtctld{},
byName: map[string]*vtadminpb.Vtctld{},
}
}

// AddTaggedGates adds the given gates to the discovery fake, associating each
// gate with each tag. To tag different gates with multiple tags, call multiple
// times with the same gates but different tag slices. Gates are uniquely
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtadmin/vtctldclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package vtctldclient

import (
"fmt"
"time"

"github.com/spf13/pflag"

Expand All @@ -36,6 +37,8 @@ type Config struct {
CredentialsPath string

Cluster *vtadminpb.Cluster

ConnectivityTimeout time.Duration
}

// Parse returns a new config with the given cluster and discovery, after
Expand All @@ -61,6 +64,8 @@ func Parse(cluster *vtadminpb.Cluster, disco discovery.Discovery, args []string)
func (c *Config) Parse(args []string) error {
fs := pflag.NewFlagSet("", pflag.ContinueOnError)

fs.DurationVar(&c.ConnectivityTimeout, "grpc-connectivity-timeout", 2*time.Second, "The maximum duration to wait for a gRPC connection to be established to the vtctld.")

credentialsTmplStr := fs.String("credentials-path-tmpl", "",
"Go template used to specify a path to a credentials file, which is a json file containing "+
"a Username and Password. Templates are given the context of the vtctldclient.Config, "+
Expand Down
44 changes: 37 additions & 7 deletions go/vt/vtadmin/vtctldclient/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vtadmin/cluster/discovery"
"vitess.io/vitess/go/vt/vtadmin/debug"
"vitess.io/vitess/go/vt/vtadmin/vtadminproto"
Expand Down Expand Up @@ -104,19 +105,31 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error {

if vtctld.VtctldClient != nil {
if !vtctld.closed {
span.Annotate("is_noop", true)
span.Annotate("vtctld_host", vtctld.host)
waitCtx, waitCancel := context.WithTimeout(ctx, vtctld.cfg.ConnectivityTimeout)
defer waitCancel()

vtctld.lastPing = time.Now()
if err := vtctld.VtctldClient.WaitForReady(waitCtx); err == nil {
// Our cached connection is still open and ready, so we're good to go.
span.Annotate("is_noop", true)
span.Annotate("vtctld_host", vtctld.host)

return nil
vtctld.lastPing = time.Now()

return nil
}
// If WaitForReady returns an error, that indicates our cached connection
// is no longer valid. We fall through to close the cached connection,
// discover a new vtctld, and establish a new connection.
}

span.Annotate("is_stale", true)

// close before reopen. this is safe to call on an already-closed client.
if err := vtctld.Close(); err != nil {
return fmt.Errorf("error closing possibly-stale connection before re-dialing: %w", err)
// Even if the client connection does not shut down cleanly, we don't want to block
// Dial from discovering a new vtctld. This makes VTAdmin's dialer more resilient,
// but, as a caveat, it _can_ potentially leak improperly-closed gRPC connections.
log.Errorf("error closing possibly-stale connection before re-dialing: %w", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more context about the leaked connections thing in the PR description.

}
}

Expand Down Expand Up @@ -144,6 +157,19 @@ func (vtctld *ClientProxy) Dial(ctx context.Context) error {
return err
}

waitCtx, waitCancel := context.WithTimeout(ctx, vtctld.cfg.ConnectivityTimeout)
defer waitCancel()

if err := client.WaitForReady(waitCtx); err != nil {
// If the gRPC connection does not transition to a READY state within the context timeout,
// then return an error. The onus to redial (or not) is on the caller of the Dial function.
// As an enhancement, we could update this Dial function to try redialing the discovered vtctld
// a few times with a backoff before giving up.
log.Infof("Could not transition to READY state for gRPC connection to %s: %s\n", addr, err.Error())
return err
}

log.Infof("Established gRPC connection to vtctld %s\n", addr)
vtctld.dialedAt = time.Now()
vtctld.host = addr
vtctld.VtctldClient = client
Expand All @@ -161,12 +187,16 @@ func (vtctld *ClientProxy) Close() error {
}

err := vtctld.VtctldClient.Close()

// Mark the vtctld connection as "closed" from the proxy side even if
// the client connection does not shut down cleanly. This makes VTAdmin's dialer more resilient,
// but, as a caveat, it _can_ potentially leak improperly-closed gRPC connections.
vtctld.closed = true

if err != nil {
return err
}

vtctld.closed = true

return nil
}

Expand Down
112 changes: 107 additions & 5 deletions go/vt/vtadmin/vtctldclient/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"net"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -34,16 +35,25 @@ type fakeVtctld struct {
vtctlservicepb.VtctlServer
}

func TestDial(t *testing.T) {
func initVtctlServer() (net.Listener, *grpc.Server, error) {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

defer listener.Close()
if err != nil {
return nil, nil, err
}

vtctld := &fakeVtctld{}
server := grpc.NewServer()
vtctlservicepb.RegisterVtctlServer(server, vtctld)

return listener, server, err
}

func TestDial(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what you're seeing running just this test in isolation is super interesting, do you mind filing an issue and i can dig into it? i'm not really sure what's going, but it could be a "macs suck at local networking" issue, or something not completely correct in the code, but it's hard to say (and i don't think we should block this PR, which I still maintain is strictly an improvement over the current, uh, Situation)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: #9943

Thanks for taking a look. I'm super interested in what you find! I spent way too long looking at gRPC internals for this PR 😭

listener, server, err := initVtctlServer()
require.NoError(t, err)

defer listener.Close()

go server.Serve(listener)
defer server.Stop()

Expand All @@ -57,7 +67,8 @@ func TestDial(t *testing.T) {
Id: "test",
Name: "testcluster",
},
Discovery: disco,
Discovery: disco,
ConnectivityTimeout: 2 * time.Second,
})

// We don't have a vtctld host until we call Dial
Expand All @@ -67,3 +78,94 @@ func TestDial(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, listener.Addr().String(), proxy.host)
}

// TestRedial tests that vtadmin-api is able to recover from a lost connection to
// a vtctld by rediscovering and redialing a new one.
func TestRedial(t *testing.T) {
// Initialize vtctld #1
listener1, server1, err := initVtctlServer()
require.NoError(t, err)

defer listener1.Close()

go server1.Serve(listener1)
defer server1.Stop()

// Initialize vtctld #2
listener2, server2, err := initVtctlServer()
require.NoError(t, err)

defer listener2.Close()

go server2.Serve(listener2)
defer server2.Stop()

// Register both vtctlds with VTAdmin
disco := fakediscovery.New()
disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{
Hostname: listener1.Addr().String(),
}, &vtadminpb.Vtctld{
Hostname: listener2.Addr().String(),
})

proxy := New(&Config{
Cluster: &vtadminpb.Cluster{
Id: "test",
Name: "testcluster",
},
Discovery: disco,
ConnectivityTimeout: 2 * time.Second,
})

// We don't have a vtctld host until we call Dial
require.Empty(t, proxy.host)

// Check for a successful connection to whichever vtctld we discover first.
err = proxy.Dial(context.Background())
assert.NoError(t, err)

// vtadmin's fakediscovery package discovers vtctlds in random order. Rather
// than force some cumbersome sequential logic, we can just do a switcheroo
// here in the test to determine our "current" and (expected) "next" vtctlds.
var currentVtctld *grpc.Server
var nextAddr string

switch proxy.host {
case listener1.Addr().String():
currentVtctld = server1
nextAddr = listener2.Addr().String()

case listener2.Addr().String():
currentVtctld = server2
nextAddr = listener1.Addr().String()
default:
t.Fatalf("invalid proxy host %s", proxy.host)
}

// Remove the shut down vtctld from VTAdmin's service discovery (clumsily).
// Otherwise, when redialing, we may redial the vtctld that we just shut down.
disco.Clear()
disco.AddTaggedVtctlds(nil, &vtadminpb.Vtctld{
Hostname: nextAddr,
})

// Force an ungraceful shutdown of the gRPC server to which we're connected
currentVtctld.Stop()

// Wait for the client connection to shut down. (If we redial too quickly,
// we get into a race condition with gRPC's internal retry logic.
// (Using WaitForReady here _does_ expose more function internals than is ideal for a unit test,
// but it's far less flaky than using time.Sleep.)
for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if err = proxy.VtctldClient.WaitForReady(ctx); err != nil {
break
}
}

// Finally, check that we discover, dial + establish a new connection to the remaining vtctld.
err = proxy.Dial(context.Background())
assert.NoError(t, err)
assert.Equal(t, nextAddr, proxy.host)
}
50 changes: 50 additions & 0 deletions go/vt/vtctl/grpcvtctldclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ limitations under the License.
package grpcvtctldclient

import (
"context"
"errors"
"fmt"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/vtctl/grpcclientcommon"
Expand All @@ -28,6 +33,11 @@ import (
vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice"
)

var (
ErrConnectionShutdown = errors.New("gRPCVtctldClient in a SHUTDOWN state")
ErrConnectionTimeout = errors.New("gRPC connection wait time exceeded")
)

const connClosedMsg = "grpc: the client connection is closed"

type gRPCVtctldClient struct {
Expand Down Expand Up @@ -69,6 +79,7 @@ func NewWithDialOpts(addr string, failFast grpcclient.FailFast, opts ...grpc.Dia
}, nil
}

// Close is part of the vtctldclient.VtctldClient interface.
func (client *gRPCVtctldClient) Close() error {
err := client.cc.Close()
if err == nil {
Expand All @@ -78,6 +89,45 @@ func (client *gRPCVtctldClient) Close() error {
return err
}

// WaitForReady is part of the vtctldclient.VtctldClient interface.
func (client *gRPCVtctldClient) WaitForReady(ctx context.Context) error {
// The gRPC implementation of WaitForReady uses the gRPC Connectivity API
// See https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
for {
select {
// A READY connection to the vtctld could not be established
// within the context timeout. The caller should close their
// existing connection and establish a new one.
case <-ctx.Done():
return ErrConnectionTimeout

// Wait to transition to READY state
default:
connState := client.cc.GetState()

switch connState {
case connectivity.Ready:
return nil

// Per https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md,
// a client that enters the SHUTDOWN state never leaves this state, and all new RPCs should
// fail immediately. Further polling is futile, in other words, and so we
// return an error immediately to indicate that the caller can close the connection.
case connectivity.Shutdown:
return ErrConnectionShutdown

// If the connection is IDLE, CONNECTING, or in a TRANSIENT_FAILURE mode,
// then we wait to see if it will transition to a READY state.
default:
if !client.cc.WaitForStateChange(ctx, connState) {
// If the client has failed to transition, fail so that the caller can close the connection.
return fmt.Errorf("failed to transition from state %s", connState)
}
}
}
}
}

func init() {
vtctldclient.Register("grpc", gRPCVtctldClientFactory)
}
30 changes: 29 additions & 1 deletion go/vt/vtctl/grpcvtctldclient/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package grpcvtctldclient_test
package grpcvtctldclient

import (
"context"
Expand All @@ -35,6 +35,34 @@ import (
vtctlservicepb "vitess.io/vitess/go/vt/proto/vtctlservice"
)

func TestWaitForReady(t *testing.T) {
ts := memorytopo.NewServer("cell1")
vtctld := testutil.NewVtctldServerWithTabletManagerClient(t, ts, nil, func(ts *topo.Server) vtctlservicepb.VtctldServer {
return grpcvtctldserver.NewVtctldServer(ts)
})

testutil.WithTestServer(t, vtctld, func(t *testing.T, client vtctldclient.VtctldClient) {
ctx := context.Background()
err := client.WaitForReady(ctx)
assert.NoError(t, err)
})
}

func TestWaitForReadyShutdown(t *testing.T) {
ts := memorytopo.NewServer("cell1")
vtctld := testutil.NewVtctldServerWithTabletManagerClient(t, ts, nil, func(ts *topo.Server) vtctlservicepb.VtctldServer {
return grpcvtctldserver.NewVtctldServer(ts)
})

testutil.WithTestServer(t, vtctld, func(t *testing.T, client vtctldclient.VtctldClient) {
client.Close()
ctx := context.Background()
err := client.WaitForReady(ctx)

assert.Error(t, ErrConnectionShutdown, err)
})
}

func TestFindAllShardsInKeyspace(t *testing.T) {
ctx := context.Background()
ts := memorytopo.NewServer("cell1")
Expand Down
Loading