From afe831a9d0992a2e908a4e165e706aa835e0c249 Mon Sep 17 00:00:00 2001 From: Andrew Kim Date: Fri, 15 Feb 2019 17:49:02 -0500 Subject: [PATCH] add optional timeout to CSI lib connection utils --- connection/connection.go | 19 +++++++++++++++++-- connection/connection_test.go | 6 +++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/connection/connection.go b/connection/connection.go index 0a9903f40..db43123ca 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -58,7 +58,7 @@ const terminationLogPath = "/dev/termination-log" // For other connections, the default behavior from gRPC is used and // loss of connection is not detected reliably. func Connect(address string, options ...Option) (*grpc.ClientConn, error) { - return connect(address, []grpc.DialOption{}, options) + return connect(address, options) } // Option is the type of all optional parameters for Connect. @@ -74,6 +74,14 @@ func OnConnectionLoss(reconnect func() bool) Option { } } +// WithTimeout sets the connection timeout duration +// If not specified, all connections will retry forever. +func WithTimeout(timeout time.Duration) Option { + return func(o *options) { + o.timeout = timeout + } +} + // ExitOnConnectionLoss returns callback for OnConnectionLoss() that writes // an error to /dev/termination-log and exits. func ExitOnConnectionLoss() func() bool { @@ -89,21 +97,28 @@ func ExitOnConnectionLoss() func() bool { type options struct { reconnect func() bool + timeout time.Duration } // connect is the internal implementation of Connect. It has more options to enable testing. -func connect(address string, dialOptions []grpc.DialOption, connectOptions []Option) (*grpc.ClientConn, error) { +func connect(address string, connectOptions []Option) (*grpc.ClientConn, error) { var o options for _, option := range connectOptions { option(&o) } + var dialOptions []grpc.DialOption dialOptions = append(dialOptions, grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container. grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure. grpc.WithBlock(), // Block until connection succeeds. grpc.WithUnaryInterceptor(LogGRPC), // Log all messages. ) + + if o.timeout != 0 { + dialOptions = append(dialOptions, grpc.WithTimeout(o.timeout)) + } + unixPrefix := "unix://" if strings.HasPrefix(address, "/") { // It looks like filesystem path. diff --git a/connection/connection_test.go b/connection/connection_test.go index c87fb0aa4..54e6f6863 100644 --- a/connection/connection_test.go +++ b/connection/connection_test.go @@ -150,13 +150,13 @@ func TestWaitForServer(t *testing.T) { } } -func TestTimout(t *testing.T) { +func TestConnectWithTimout(t *testing.T) { tmp := tmpDir(t) defer os.RemoveAll(tmp) startTime := time.Now() - timeout := 5 * time.Second - conn, err := connect(path.Join(tmp, "no-such.sock"), []grpc.DialOption{grpc.WithTimeout(timeout)}, nil) + timeout := 1 * time.Second + conn, err := Connect(path.Join(tmp, "no-such.sock"), WithTimeout(timeout)) endTime := time.Now() if assert.Error(t, err, "connection should fail") { assert.InEpsilon(t, timeout, endTime.Sub(startTime), 1, "connection timeout")