From 2d3fa2d549bf34c075805725df47c6ec73590704 Mon Sep 17 00:00:00 2001 From: Dan Mace Date: Mon, 3 Aug 2020 17:13:19 -0400 Subject: [PATCH] etcdserver: fix incorrect metrics generated when clients cancel watches Before this patch, a client which cancels the context for a watch results in the server generating a `rpctypes.ErrGRPCNoLeader` error that leads the recording of a gRPC `Unavailable` metric in association with the client watch cancellation. The metric looks like this: grpc_server_handled_total{grpc_code="Unavailable",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} So, the watch server has misidentified the error as a server error and then propagates the mistake to metrics, leading to a false indicator that the leader has been lost. This false signal then leads to false alerting. The commit 9c103dd0dedfc723cd4f33b6a5e81343d8a6bae7 introduced an interceptor which wraps watch streams requiring a leader, causing those streams to be actively canceled when leader loss is detected. However, the error handling code assumes all stream context cancellations are from the interceptor. This assumption is broken when the context was canceled because of a client stream cancelation. The core challenge is lack of information conveyed via `context.Context` which is shared by both the send and receive sides of the stream handling and is subject to cancellation by all paths (including the gRPC library itself). If any piece of the system cancels the shared context, there's no way for a context consumer to understand who cancelled the context or why. To solve the ambiguity of the stream interceptor code specifically, this patch introduces a custom context struct which the interceptor uses to expose a custom error through the context when the interceptor decides to actively cancel a stream. Now the consuming side can more safely assume a generic context cancellation can be propagated as a cancellation, and the server generated leader error is preserved and propagated normally without any special inference. When a client cancels the stream, there remains a race in the error handling code between the send and receive goroutines whereby the underlying gRPC error is lost in the case where the send path returns and is handled first, but this issue can be taken separately as no matter which paths wins, we can detect a generic cancellation. This is a replacement of https://github.com/etcd-io/etcd/pull/11375. Fixes #10289, #9725, #9576, #9166 --- etcdserver/api/v3rpc/interceptor.go | 48 +++++++++++++++++++++++--- etcdserver/api/v3rpc/rpctypes/error.go | 2 ++ etcdserver/api/v3rpc/watch.go | 20 ++++++++--- tests/e2e/metrics_test.go | 5 ++- 4 files changed, 65 insertions(+), 10 deletions(-) diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 45ea5d734fdd..77c7c1fd5975 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -25,11 +25,12 @@ import ( "go.etcd.io/etcd/v3/pkg/types" "go.etcd.io/etcd/v3/raft" - pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" + + pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" ) const ( @@ -231,8 +232,13 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor return rpctypes.ErrGRPCNoLeader } - cctx, cancel := context.WithCancel(ss.Context()) - ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss} + cancelCtx, cancelFn := context.WithCancel(ss.Context()) + monitorCtx := &leaderMonitoringContext{ + Context: cancelCtx, + cancel: cancelFn, + } + cancelForLeaderLoss := context.CancelFunc(monitorCtx.CancelForLeaderLoss) + ss = serverStreamWithCtx{ctx: monitorCtx, cancel: &cancelForLeaderLoss, ServerStream: ss} smap.mu.Lock() smap.streams[ss] = struct{}{} @@ -242,7 +248,7 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor smap.mu.Lock() delete(smap.streams, ss) smap.mu.Unlock() - cancel() + monitorCtx.Cancel() }() } } @@ -251,6 +257,40 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor } } +// leaderMonitoringContext wraps a context and provides a custom error when +// the CancelForLeaderLoss() method is used to cancel the context. This is +// so downstream context users can disambiguate the reason for the cancellation +// which could be from the client (for example) or from this interceptor code. +type leaderMonitoringContext struct { + context.Context + + lock sync.Mutex + cancel context.CancelFunc + cancelReason error +} + +func (c *leaderMonitoringContext) Cancel() { + c.lock.Lock() + defer c.lock.Unlock() + c.cancel() +} + +func (c *leaderMonitoringContext) CancelForLeaderLoss() { + c.lock.Lock() + defer c.lock.Unlock() + c.cancelReason = rpctypes.ErrGRPCNoLeader + c.cancel() +} + +func (c *leaderMonitoringContext) Err() error { + c.lock.Lock() + defer c.lock.Unlock() + if c.cancelReason != nil { + return c.cancelReason + } + return c.Context.Err() +} + type serverStreamWithCtx struct { grpc.ServerStream ctx context.Context diff --git a/etcdserver/api/v3rpc/rpctypes/error.go b/etcdserver/api/v3rpc/rpctypes/error.go index 26e3fd378c14..24fdc85736c4 100644 --- a/etcdserver/api/v3rpc/rpctypes/error.go +++ b/etcdserver/api/v3rpc/rpctypes/error.go @@ -35,6 +35,8 @@ var ( ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err() ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err() + ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err() + ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err() ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err() ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err() diff --git a/etcdserver/api/v3rpc/watch.go b/etcdserver/api/v3rpc/watch.go index 2144779fc4da..30b1115597d4 100644 --- a/etcdserver/api/v3rpc/watch.go +++ b/etcdserver/api/v3rpc/watch.go @@ -21,14 +21,14 @@ import ( "sync" "time" + "go.uber.org/zap" + "go.etcd.io/etcd/v3/auth" "go.etcd.io/etcd/v3/etcdserver" "go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" "go.etcd.io/etcd/v3/mvcc" "go.etcd.io/etcd/v3/mvcc/mvccpb" - - "go.uber.org/zap" ) const minWatchProgressInterval = 100 * time.Millisecond @@ -197,15 +197,25 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { } }() + // TODO: There's a race here. When a stream is closed (e.g. due to a cancellation), + // the underlying error (e.g. a gRPC stream error) may be returned and handled + // through errc if the recv goroutine finishes before the send goroutine. + // When the recv goroutine wins, the stream error is retained. When recv loses + // the race, the underlying error is lost (unless the root error is propagated + // through Context.Err() which is not always the case (as callers have to decide + // to implement a custom context to do so). The stdlib context package builtins + // may be insufficient to carry semantically useful errors around and should be + // revisited. select { case err = <-errc: + if err == context.Canceled { + err = rpctypes.ErrGRPCWatchCanceled + } close(sws.ctrlStream) - case <-stream.Context().Done(): err = stream.Context().Err() - // the only server-side cancellation is noleader for now. if err == context.Canceled { - err = rpctypes.ErrGRPCNoLeader + err = rpctypes.ErrGRPCWatchCanceled } } diff --git a/tests/e2e/metrics_test.go b/tests/e2e/metrics_test.go index e20b0f88d5c1..9666f169ebb4 100644 --- a/tests/e2e/metrics_test.go +++ b/tests/e2e/metrics_test.go @@ -49,6 +49,7 @@ func metricsTest(cx ctlCtx) { {"/metrics", fmt.Sprintf("etcd_mvcc_delete_total 3")}, {"/metrics", fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)}, {"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))}, + {"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)}, {"/health", `{"health":"true","reason":""}`}, } { i++ @@ -58,7 +59,9 @@ func metricsTest(cx ctlCtx) { if err := ctlV3Del(cx, []string{fmt.Sprintf("%d", i)}, 1); err != nil { cx.t.Fatal(err) } - + if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil { + cx.t.Fatal(err) + } if err := cURLGet(cx.epc, cURLReq{endpoint: test.endpoint, expected: test.expected, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil { cx.t.Fatalf("failed get with curl (%v)", err) }