Skip to content

Commit

Permalink
Merge pull request #13999 from mitake/backport-13308-to-3.4
Browse files Browse the repository at this point in the history
Backport PR 13308 to release 3.4
  • Loading branch information
ptabor authored May 6, 2022
2 parents c50b726 + 757a8e8 commit 76147c9
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 2 deletions.
26 changes: 24 additions & 2 deletions clientv3/retry_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt
// its the callCtx deadline or cancellation, in which case try again.
continue
}
if callOpts.retryAuth && rpctypes.Error(lastErr) == rpctypes.ErrInvalidAuthToken {
if c.shouldRefreshToken(lastErr, callOpts) {
// clear auth token before refreshing it.
// call c.Auth.Authenticate with an invalid token will always fail the auth check on the server-side,
// if the server has not apply the patch of pr #12165 (https://github.com/etcd-io/etcd/pull/12165)
// and a rpctypes.ErrInvalidAuthToken will recursively call c.getToken until system run out of resource.
c.authTokenBundle.UpdateAuthToken("")

gterr := c.getToken(ctx)
if gterr != nil {
logger.Warn(
Expand Down Expand Up @@ -142,6 +148,19 @@ func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOp
}
}

// shouldRefreshToken checks whether there's a need to refresh the token based on the error and callOptions,
// and returns a boolean value.
func (c *Client) shouldRefreshToken(err error, callOpts *options) bool {
if rpctypes.Error(err) == rpctypes.ErrUserEmpty {
// refresh the token when username, password is present but the server returns ErrUserEmpty
// which is possible when the client token is cleared somehow
return c.authTokenBundle != nil // equal to c.Username != "" && c.Password != ""
}

return callOpts.retryAuth &&
(rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken || rpctypes.Error(err) == rpctypes.ErrAuthOldRevision)
}

// type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
// proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
// a new ClientStream according to the retry policy.
Expand Down Expand Up @@ -239,7 +258,10 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m interface{}
// its the callCtx deadline or cancellation, in which case try again.
return true, err
}
if s.callOpts.retryAuth && rpctypes.Error(err) == rpctypes.ErrInvalidAuthToken {
if s.client.shouldRefreshToken(err, s.callOpts) {
// clear auth token to avoid failure when call getToken
s.client.authTokenBundle.UpdateAuthToken("")

gterr := s.client.getToken(s.ctx)
if gterr != nil {
s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gterr))
Expand Down
124 changes: 124 additions & 0 deletions clientv3/retry_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package clientv3

import (
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
"go.etcd.io/etcd/client/v3/credentials"
grpccredentials "google.golang.org/grpc/credentials"
"testing"
)

type dummyAuthTokenBundle struct{}

func (d dummyAuthTokenBundle) TransportCredentials() grpccredentials.TransportCredentials {
return nil
}

func (d dummyAuthTokenBundle) PerRPCCredentials() grpccredentials.PerRPCCredentials {
return nil
}

func (d dummyAuthTokenBundle) NewWithMode(mode string) (grpccredentials.Bundle, error) {
return nil, nil
}

func (d dummyAuthTokenBundle) UpdateAuthToken(token string) {
}

func TestClientShouldRefreshToken(t *testing.T) {
type fields struct {
authTokenBundle credentials.Bundle
}
type args struct {
err error
callOpts *options
}

optsWithTrue := &options{
retryAuth: true,
}
optsWithFalse := &options{
retryAuth: false,
}

tests := []struct {
name string
fields fields
args args
want bool
}{
{
name: "ErrUserEmpty and non nil authTokenBundle",
fields: fields{
authTokenBundle: &dummyAuthTokenBundle{},
},
args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue},
want: true,
},
{
name: "ErrUserEmpty and nil authTokenBundle",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCUserEmpty, optsWithTrue},
want: false,
},
{
name: "ErrGRPCInvalidAuthToken and retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithTrue},
want: true,
},
{
name: "ErrGRPCInvalidAuthToken and !retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCInvalidAuthToken, optsWithFalse},
want: false,
},
{
name: "ErrGRPCAuthOldRevision and retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithTrue},
want: true,
},
{
name: "ErrGRPCAuthOldRevision and !retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthOldRevision, optsWithFalse},
want: false,
},
{
name: "Other error and retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthFailed, optsWithTrue},
want: false,
},
{
name: "Other error and !retryAuth",
fields: fields{
authTokenBundle: nil,
},
args: args{rpctypes.ErrGRPCAuthFailed, optsWithFalse},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{
authTokenBundle: tt.fields.authTokenBundle,
}
if got := c.shouldRefreshToken(tt.args.err, tt.args.callOpts); got != tt.want {
t.Errorf("shouldRefreshToken() = %v, want %v", got, tt.want)
}
})
}
}
3 changes: 3 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ var (
ErrGRPCAuthNotEnabled = status.New(codes.FailedPrecondition, "etcdserver: authentication is not enabled").Err()
ErrGRPCInvalidAuthToken = status.New(codes.Unauthenticated, "etcdserver: invalid auth token").Err()
ErrGRPCInvalidAuthMgmt = status.New(codes.InvalidArgument, "etcdserver: invalid auth management").Err()
ErrGRPCAuthOldRevision = status.New(codes.InvalidArgument, "etcdserver: revision of auth store is old").Err()

ErrGRPCNoLeader = status.New(codes.Unavailable, "etcdserver: no leader").Err()
ErrGRPCNotLeader = status.New(codes.FailedPrecondition, "etcdserver: not leader").Err()
Expand Down Expand Up @@ -121,6 +122,7 @@ var (
ErrorDesc(ErrGRPCAuthNotEnabled): ErrGRPCAuthNotEnabled,
ErrorDesc(ErrGRPCInvalidAuthToken): ErrGRPCInvalidAuthToken,
ErrorDesc(ErrGRPCInvalidAuthMgmt): ErrGRPCInvalidAuthMgmt,
ErrorDesc(ErrGRPCAuthOldRevision): ErrGRPCAuthOldRevision,

ErrorDesc(ErrGRPCNoLeader): ErrGRPCNoLeader,
ErrorDesc(ErrGRPCNotLeader): ErrGRPCNotLeader,
Expand Down Expand Up @@ -179,6 +181,7 @@ var (
ErrPermissionNotGranted = Error(ErrGRPCPermissionNotGranted)
ErrAuthNotEnabled = Error(ErrGRPCAuthNotEnabled)
ErrInvalidAuthToken = Error(ErrGRPCInvalidAuthToken)
ErrAuthOldRevision = Error(ErrGRPCAuthOldRevision)
ErrInvalidAuthMgmt = Error(ErrGRPCInvalidAuthMgmt)

ErrNoLeader = Error(ErrGRPCNoLeader)
Expand Down
1 change: 1 addition & 0 deletions etcdserver/api/v3rpc/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var toGRPCErrorMap = map[error]error{
auth.ErrAuthNotEnabled: rpctypes.ErrGRPCAuthNotEnabled,
auth.ErrInvalidAuthToken: rpctypes.ErrGRPCInvalidAuthToken,
auth.ErrInvalidAuthMgmt: rpctypes.ErrGRPCInvalidAuthMgmt,
auth.ErrAuthOldRevision: rpctypes.ErrGRPCAuthOldRevision,
}

func togRPCError(err error) error {
Expand Down

0 comments on commit 76147c9

Please sign in to comment.