From 79bbc8fdb7270c05fb2f387d6f99af42377c2d94 Mon Sep 17 00:00:00 2001 From: Ashish Ranjan Date: Sat, 31 Jul 2021 07:17:51 +0800 Subject: [PATCH 1/2] client/v3: refresh the token when ErrUserEmpty is received while retrying To fix a bug in the retry logic caused when the auth token is cleared after receiving `ErrInvalidAuthToken` from the server and the subsequent call to `getToken` also fails due to some reason (eg. context deadline exceeded). This leaves the client without a token and the retry will continue to fail with `ErrUserEmpty` unless the token is refreshed. --- client/v3/retry_interceptor.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/client/v3/retry_interceptor.go b/client/v3/retry_interceptor.go index 9586c334a3d..7198f1450e4 100644 --- a/client/v3/retry_interceptor.go +++ b/client/v3/retry_interceptor.go @@ -73,7 +73,7 @@ func (c *Client) unaryClientInterceptor(optFuncs ...retryOption) grpc.UnaryClien // its the callCtx deadline or cancellation, in which case try again. continue } - if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken { + if c.shouldRefreshToken(lastErr, callOpts) { // clear auth token before refreshing it. // call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side, // if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165) @@ -148,6 +148,17 @@ func (c *Client) streamClientInterceptor(optFuncs ...retryOption) grpc.StreamCli } } +// shouldRefreshToken checks whether there's a need to refresh the token based on the error and callOptions, +// and returns a boolean value. +func (c *Client) shouldRefreshToken(err error, callOpts *options) bool { + if rpctypes.Error(err) == rpctypes.ErrUserEmpty { + // refresh the token when username, password is present but the server returns ErrUserEmpty + // which is possible when the client token is cleared somehow + return c.authTokenBundle != nil // equal to c.Username != "" && c.Password != "" + } + return callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken +} + // type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a // proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish // a new ClientStream according to the retry policy. @@ -245,7 +256,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{} // its the callCtx deadline or cancellation, in which case try again. return true, err } - if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken { + if s.client.shouldRefreshToken(err, s.callOpts) { // clear auth token to avoid failure when call getToken s.client.authTokenBundle.UpdateAuthToken("") From dec6f72d6857c24e4c8acc096fd8a651f9571096 Mon Sep 17 00:00:00 2001 From: Hitoshi Mitake Date: Sat, 28 Aug 2021 23:47:14 +0900 Subject: [PATCH 2/2] *: implement a retry logic for auth old revision in the client --- api/v3rpc/rpctypes/error.go | 3 + client/v3/retry_interceptor.go | 4 +- client/v3/retry_interceptor_test.go | 124 ++++++++++++++++++++++++++++ server/etcdserver/api/v3rpc/util.go | 1 + 4 files changed, 131 insertions(+), 1 deletion(-) create mode 100644 client/v3/retry_interceptor_test.go diff --git a/api/v3rpc/rpctypes/error.go b/api/v3rpc/rpctypes/error.go index 5ea2cf88dd7..b9f4842da25 100644 --- a/api/v3rpc/rpctypes/error.go +++ b/api/v3rpc/rpctypes/error.go @@ -65,6 +65,7 @@ var ( ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err() ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err() ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err() + ErrGRPCAuthOldRevision = status.New(codes.InvalidArgument, "etcdserver: revision of auth store is old").Err() ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err() ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err() @@ -131,6 +132,7 @@ var ( ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled, ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken, ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt, + ErrorDesc(ErrGRPCAuthOldRevision): ErrGRPCAuthOldRevision, ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader, ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader, @@ -195,6 +197,7 @@ var ( ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted) ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled) ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken) + ErrAuthOldRevision = Error(ErrGRPCAuthOldRevision) ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt) ErrNoLeader = Error(ErrGRPCNoLeader) diff --git a/client/v3/retry_interceptor.go b/client/v3/retry_interceptor.go index 7198f1450e4..04f157a1dcb 100644 --- a/client/v3/retry_interceptor.go +++ b/client/v3/retry_interceptor.go @@ -156,7 +156,9 @@ func (c *Client) shouldRefreshToken(err error, callOpts *options) bool { // which is possible when the client token is cleared somehow return c.authTokenBundle != nil // equal to c.Username != "" && c.Password != "" } - return callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken + + return callOpts.retryAuth && + (rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes.Error(err) == rpctypes.ErrAuthOldRevision) } // type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a diff --git a/client/v3/retry_interceptor_test.go b/client/v3/retry_interceptor_test.go new file mode 100644 index 00000000000..b850b56e0f5 --- /dev/null +++ b/client/v3/retry_interceptor_test.go @@ -0,0 +1,124 @@ +package clientv3 + +import ( + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" + "go.etcd.io/etcd/client/v3/credentials" + grpccredentials "google.golang.org/grpc/credentials" + "testing" +) + +type dummyAuthTokenBundle struct{} + +func (d dummyAuthTokenBundle) TransportCredentials() grpccredentials.TransportCredentials { + return nil +} + +func (d dummyAuthTokenBundle) PerRPCCredentials() grpccredentials.PerRPCCredentials { + return nil +} + +func (d dummyAuthTokenBundle) NewWithMode(mode string) (grpccredentials.Bundle, error) { + return nil, nil +} + +func (d dummyAuthTokenBundle) UpdateAuthToken(token string) { +} + +func TestClientShouldRefreshToken(t *testing.T) { + type fields struct { + authTokenBundle credentials.Bundle + } + type args struct { + err error + callOpts *options + } + + optsWithTrue := &options{ + retryAuth: true, + } + optsWithFalse := &options{ + retryAuth: false, + } + + tests := []struct { + name string + fields fields + args args + want bool + }{ + { + name: "ErrUserEmpty and non nil authTokenBundle", + fields: fields{ + authTokenBundle: &dummyAuthTokenBundle{}, + }, + args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue}, + want: true, + }, + { + name: "ErrUserEmpty and nil authTokenBundle", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue}, + want: false, + }, + { + name: "ErrGRPCInvalidAuthToken and retryAuth", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithTrue}, + want: true, + }, + { + name: "ErrGRPCInvalidAuthToken and !retryAuth", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithFalse}, + want: false, + }, + { + name: "ErrGRPCAuthOldRevision and retryAuth", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithTrue}, + want: true, + }, + { + name: "ErrGRPCAuthOldRevision and !retryAuth", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithFalse}, + want: false, + }, + { + name: "Other error and retryAuth", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCAuthFailed, optsWithTrue}, + want: false, + }, + { + name: "Other error and !retryAuth", + fields: fields{ + authTokenBundle: nil, + }, + args: args{rpctypes.ErrGRPCAuthFailed, optsWithFalse}, + want: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Client{ + authTokenBundle: tt.fields.authTokenBundle, + } + if got := c.shouldRefreshToken(tt.args.err, tt.args.callOpts); got != tt.want { + t.Errorf("shouldRefreshToken() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/server/etcdserver/api/v3rpc/util.go b/server/etcdserver/api/v3rpc/util.go index f61fae03b96..91072b70358 100644 --- a/server/etcdserver/api/v3rpc/util.go +++ b/server/etcdserver/api/v3rpc/util.go @@ -84,6 +84,7 @@ var toGRPCErrorMap = map[error]error{ auth.ErrAuthNotEnabled: rpctypes.ErrGRPCAuthNotEnabled, auth.ErrInvalidAuthToken: rpctypes.ErrGRPCInvalidAuthToken, auth.ErrInvalidAuthMgmt: rpctypes.ErrGRPCInvalidAuthMgmt, + auth.ErrAuthOldRevision: rpctypes.ErrGRPCAuthOldRevision, // In sync with status.FromContextError context.Canceled: rpctypes.ErrGRPCCanceled,