Skip to content

Commit

Permalink
Merge pull request #9934 from mbrannock/master
Browse files Browse the repository at this point in the history
Eliminate direct use of the gRPC transport package
  • Loading branch information
gyuho authored Jul 19, 2018
2 parents f76cf95 + 520bd50 commit e9f710e
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 42 deletions.
97 changes: 58 additions & 39 deletions functional/tester/stresser_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import (
"go.uber.org/zap"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/transport"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type keyStresser struct {
Expand Down Expand Up @@ -128,44 +129,7 @@ func (s *keyStresser) run() {
continue
}

switch rpctypes.ErrorDesc(err) {
case context.DeadlineExceeded.Error():
// This retries when request is triggered at the same time as
// leader failure. When we terminate the leader, the request to
// that leader cannot be processed, and times out. Also requests
// to followers cannot be forwarded to the old leader, so timing out
// as well. We want to keep stressing until the cluster elects a
// new leader and start processing requests again.
case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
// This retries when request is triggered at the same time as
// leader failure and follower nodes receive time out errors
// from losing their leader. Followers should retry to connect
// to the new leader.
case etcdserver.ErrStopped.Error():
// one of the etcd nodes stopped from failure injection
case transport.ErrConnClosing.Desc:
// server closed the transport (failure injected node)
case rpctypes.ErrNotCapable.Error():
// capability check has not been done (in the beginning)
case rpctypes.ErrTooManyRequests.Error():
// hitting the recovering member.
case raft.ErrProposalDropped.Error():
// removed member, or leadership has changed (old leader got raftpb.MsgProp)
case context.Canceled.Error():
// from stresser.Cancel method:
return
case grpc.ErrClientConnClosing.Error():
// from stresser.Cancel method:
return
default:
s.lg.Warn(
"stress run exiting",
zap.String("stress-type", "KV"),
zap.String("endpoint", s.m.EtcdClientEndpoint),
zap.String("error-type", reflect.TypeOf(err).String()),
zap.String("error-desc", rpctypes.ErrorDesc(err)),
zap.Error(err),
)
if !s.isRetryableError(err) {
return
}

Expand All @@ -178,6 +142,61 @@ func (s *keyStresser) run() {
}
}

func (s *keyStresser) isRetryableError(err error) bool {
switch rpctypes.ErrorDesc(err) {
// retryable
case context.DeadlineExceeded.Error():
// This retries when request is triggered at the same time as
// leader failure. When we terminate the leader, the request to
// that leader cannot be processed, and times out. Also requests
// to followers cannot be forwarded to the old leader, so timing out
// as well. We want to keep stressing until the cluster elects a
// new leader and start processing requests again.
return true
case etcdserver.ErrTimeoutDueToLeaderFail.Error(), etcdserver.ErrTimeout.Error():
// This retries when request is triggered at the same time as
// leader failure and follower nodes receive time out errors
// from losing their leader. Followers should retry to connect
// to the new leader.
return true
case etcdserver.ErrStopped.Error():
// one of the etcd nodes stopped from failure injection
return true
case rpctypes.ErrNotCapable.Error():
// capability check has not been done (in the beginning)
return true
case rpctypes.ErrTooManyRequests.Error():
// hitting the recovering member.
return true
case raft.ErrProposalDropped.Error():
// removed member, or leadership has changed (old leader got raftpb.MsgProp)
return true

// not retryable.
case context.Canceled.Error():
// from stresser.Cancel method:
return false
case grpc.ErrClientConnClosing.Error():
// from stresser.Cancel method:
return false
}

if status.Convert(err).Code() == codes.Unavailable {
// gRPC connection errors are translated to status.Unavailable
return true
}

s.lg.Warn(
"stress run exiting",
zap.String("stress-type", "KV"),
zap.String("endpoint", s.m.EtcdClientEndpoint),
zap.String("error-type", reflect.TypeOf(err).String()),
zap.String("error-desc", rpctypes.ErrorDesc(err)),
zap.Error(err),
)
return false
}

func (s *keyStresser) Pause() map[string]int {
return s.Close()
}
Expand Down
8 changes: 5 additions & 3 deletions integration/v3_grpc_inflight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
"github.com/coreos/etcd/pkg/testutil"

"google.golang.org/grpc"
"google.golang.org/grpc/transport"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

// TestV3MaintenanceDefragmentInflightRange ensures inflight range requests
Expand Down Expand Up @@ -82,9 +83,10 @@ func TestV3KVInflightRangeRequests(t *testing.T) {
defer wg.Done()
_, err := kvc.Range(ctx, &pb.RangeRequest{Key: []byte("foo"), Serializable: true}, grpc.FailFast(false))
if err != nil {
errCode := status.Convert(err).Code()
errDesc := rpctypes.ErrorDesc(err)
if err != nil && !(errDesc == context.Canceled.Error() || errDesc == transport.ErrConnClosing.Desc) {
t.Fatalf("inflight request should be canceled with '%v' or '%v', got '%v'", context.Canceled.Error(), transport.ErrConnClosing.Desc, errDesc)
if err != nil && !(errDesc == context.Canceled.Error() || errCode == codes.Unavailable) {
t.Fatalf("inflight request should be canceled with '%v' or code Unavailable, got '%v' with code '%s'", context.Canceled.Error(), errDesc, errCode)
}
}
}()
Expand Down

0 comments on commit e9f710e

Please sign in to comment.