diff --git a/functional/tester/stresser_key.go b/functional/tester/stresser_key.go index 54efddb28a0..b3e46cc0e75 100644 --- a/functional/tester/stresser_key.go +++ b/functional/tester/stresser_key.go @@ -32,7 +32,8 @@ import ( "go.uber.org/zap" "golang.org/x/time/rate" "google.golang.org/grpc" - "google.golang.org/grpc/transport" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) type keyStresser struct { @@ -128,44 +129,7 @@ func (s *keyStresser) run() { continue } - switch rpctypes.ErrorDesc(err) { - case context.DeadlineExceeded.Error(): - // This retries when request is triggered at the same time as - // leader failure. When we terminate the leader, the request to - // that leader cannot be processed, and times out. Also requests - // to followers cannot be forwarded to the old leader, so timing out - // as well. We want to keep stressing until the cluster elects a - // new leader and start processing requests again. - case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error(): - // This retries when request is triggered at the same time as - // leader failure and follower nodes receive time out errors - // from losing their leader. Followers should retry to connect - // to the new leader. - case etcdserver.ErrStopped.Error(): - // one of the etcd nodes stopped from failure injection - case transport.ErrConnClosing.Desc: - // server closed the transport (failure injected node) - case rpctypes.ErrNotCapable.Error(): - // capability check has not been done (in the beginning) - case rpctypes.ErrTooManyRequests.Error(): - // hitting the recovering member. - case raft.ErrProposalDropped.Error(): - // removed member, or leadership has changed (old leader got raftpb.MsgProp) - case context.Canceled.Error(): - // from stresser.Cancel method: - return - case grpc.ErrClientConnClosing.Error(): - // from stresser.Cancel method: - return - default: - s.lg.Warn( - "stress run exiting", - zap.String("stress-type", "KV"), - zap.String("endpoint", s.m.EtcdClientEndpoint), - zap.String("error-type", reflect.TypeOf(err).String()), - zap.String("error-desc", rpctypes.ErrorDesc(err)), - zap.Error(err), - ) + if !s.isRetryableError(err) { return } @@ -178,6 +142,61 @@ func (s *keyStresser) run() { } } +func (s *keyStresser) isRetryableError(err error) bool { + switch rpctypes.ErrorDesc(err) { + // retryable + case context.DeadlineExceeded.Error(): + // This retries when request is triggered at the same time as + // leader failure. When we terminate the leader, the request to + // that leader cannot be processed, and times out. Also requests + // to followers cannot be forwarded to the old leader, so timing out + // as well. We want to keep stressing until the cluster elects a + // new leader and start processing requests again. + return true + case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error(): + // This retries when request is triggered at the same time as + // leader failure and follower nodes receive time out errors + // from losing their leader. Followers should retry to connect + // to the new leader. + return true + case etcdserver.ErrStopped.Error(): + // one of the etcd nodes stopped from failure injection + return true + case rpctypes.ErrNotCapable.Error(): + // capability check has not been done (in the beginning) + return true + case rpctypes.ErrTooManyRequests.Error(): + // hitting the recovering member. + return true + case raft.ErrProposalDropped.Error(): + // removed member, or leadership has changed (old leader got raftpb.MsgProp) + return true + + // not retryable. + case context.Canceled.Error(): + // from stresser.Cancel method: + return false + case grpc.ErrClientConnClosing.Error(): + // from stresser.Cancel method: + return false + } + + if status.Convert(err).Code() == codes.Unavailable { + // gRPC connection errors are translated to status.Unavailable + return true + } + + s.lg.Warn( + "stress run exiting", + zap.String("stress-type", "KV"), + zap.String("endpoint", s.m.EtcdClientEndpoint), + zap.String("error-type", reflect.TypeOf(err).String()), + zap.String("error-desc", rpctypes.ErrorDesc(err)), + zap.Error(err), + ) + return false +} + func (s *keyStresser) Pause() map[string]int { return s.Close() } diff --git a/integration/v3_grpc_inflight_test.go b/integration/v3_grpc_inflight_test.go index 08c1a1f2369..7b4d12a12fd 100644 --- a/integration/v3_grpc_inflight_test.go +++ b/integration/v3_grpc_inflight_test.go @@ -25,7 +25,8 @@ import ( "github.com/coreos/etcd/pkg/testutil" "google.golang.org/grpc" - "google.golang.org/grpc/transport" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // TestV3MaintenanceDefragmentInflightRange ensures inflight range requests @@ -82,9 +83,10 @@ func TestV3KVInflightRangeRequests(t *testing.T) { defer wg.Done() _, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false)) if err != nil { + errCode := status.Convert(err).Code() errDesc := rpctypes.ErrorDesc(err) - if err != nil && !(errDesc == context.Canceled.Error() || errDesc == transport.ErrConnClosing.Desc) { - t.Fatalf("inflight request should be canceled with '%v' or '%v', got '%v'", context.Canceled.Error(), transport.ErrConnClosing.Desc, errDesc) + if err != nil && !(errDesc == context.Canceled.Error() || errCode == codes.Unavailable) { + t.Fatalf("inflight request should be canceled with '%v' or code Unavailable, got '%v' with code '%s'", context.Canceled.Error(), errDesc, errCode) } } }()