Skip to content

Commit

Permalink
etcdserver: fix incorrect metrics generated when clients cancel watches
Browse files Browse the repository at this point in the history
Before this patch, a client which cancels the context for a watch results in the
server generating a `rpctypes.ErrGRPCNoLeader` error that leads the recording of
a gRPC `Unavailable` metric in association with the client watch cancellation.
The metric looks like this:

    grpc_server_handled_total{grpc_code="Unavailable",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"}

So, the watch server has misidentified the error as a server error and then
propagates the mistake to metrics, leading to a false indicator that the leader
has been lost. This false signal then leads to false alerting.

This patch improves the behavior by:

1. Performing a deeper analysis during stream closure to more conclusively
determine that a leader has actually been lost before propagating a
ErrGRPCNoLeader error.

2. Returning a ErrGRPCWatchCanceled error if no conclusion can be drawn
regarding leader loss.

There remains an assumption that absence of evidence of leader loss means a
client cancelled, but in practice this seems less likely to break down whereas
client cancellations are frequent and expected.

This is a continuation of the work already done in etcd-io#11375.

Fixes etcd-io#10289, etcd-io#9725, etcd-io#9576, etcd-io#9166
  • Loading branch information
ironcladlou committed Aug 3, 2020
1 parent 1af6d61 commit 09281c6
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 3 deletions.
2 changes: 2 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var (
ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err()

ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err()

ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err()
ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err()
ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
Expand Down
18 changes: 16 additions & 2 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@ import (
"sync"
"time"

"google.golang.org/grpc/metadata"

"go.etcd.io/etcd/v3/auth"
"go.etcd.io/etcd/v3/etcdserver"
"go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes"
pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb"
"go.etcd.io/etcd/v3/mvcc"
"go.etcd.io/etcd/v3/mvcc/mvccpb"
"go.etcd.io/etcd/v3/pkg/types"
"go.etcd.io/etcd/v3/raft"

"go.uber.org/zap"
)
Expand Down Expand Up @@ -191,9 +195,19 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {

case <-stream.Context().Done():
err = stream.Context().Err()
// the only server-side cancellation is noleader for now.
if err == context.Canceled {
err = rpctypes.ErrGRPCNoLeader
// Try to determine a more specific cancellation error. Use the stream context
// and local leader state to detect leader loss. If the reason is inconclusive,
// assume a client cancellation.
if md, hasMetadata := metadata.FromIncomingContext(stream.Context()); hasMetadata {
if rl := md[rpctypes.MetadataRequireLeaderKey]; len(rl) > 0 && rl[0] == rpctypes.MetadataHasLeader {
if sws.sg.Leader() == types.ID(raft.None) {
err = rpctypes.ErrGRPCNoLeader
}
}
} else {
err = rpctypes.ErrGRPCWatchCanceled
}
}
}

Expand Down
5 changes: 4 additions & 1 deletion tests/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func metricsTest(cx ctlCtx) {
{"/metrics", fmt.Sprintf("etcd_mvcc_delete_total 3")},
{"/metrics", fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)},
{"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))},
{"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)},
{"/health", `{"health":"true","reason":""}`},
} {
i++
Expand All @@ -58,7 +59,9 @@ func metricsTest(cx ctlCtx) {
if err := ctlV3Del(cx, []string{fmt.Sprintf("%d", i)}, 1); err != nil {
cx.t.Fatal(err)
}

if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil {
cx.t.Fatal(err)
}
if err := cURLGet(cx.epc, cURLReq{endpoint: test.endpoint, expected: test.expected, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}
Expand Down

0 comments on commit 09281c6

Please sign in to comment.