Skip to content

Commit

Permalink
etcdserver: fix watch metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Sam Batschelet <sbatsche@redhat.com>
  • Loading branch information
hexfusion committed Nov 22, 2019
1 parent 63dd73c commit 0fb26df
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
2 changes: 2 additions & 0 deletions etcdserver/api/v3rpc/rpctypes/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ var (
ErrGRPCLeaseExist = status.New(codes.FailedPrecondition, "etcdserver: lease already exists").Err()
ErrGRPCLeaseTTLTooLarge = status.New(codes.OutOfRange, "etcdserver: too large lease TTL").Err()

ErrGRPCWatchCanceled = status.New(codes.Canceled, "etcdserver: watch canceled").Err()

ErrGRPCMemberExist = status.New(codes.FailedPrecondition, "etcdserver: member ID already exist").Err()
ErrGRPCPeerURLExist = status.New(codes.FailedPrecondition, "etcdserver: Peer URLs already exists").Err()
ErrGRPCMemberNotEnoughStarted = status.New(codes.FailedPrecondition, "etcdserver: re-configuration failed due to not enough started members").Err()
Expand Down
14 changes: 9 additions & 5 deletions etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package v3rpc

import (
"context"
"io"
"math/rand"
"sync"
Expand All @@ -29,6 +28,7 @@ import (
"go.etcd.io/etcd/mvcc/mvccpb"

"go.uber.org/zap"
"google.golang.org/grpc/metadata"
)

type watchServer struct {
Expand Down Expand Up @@ -189,19 +189,23 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
}
}()

defer sws.close()
select {
case err = <-errc:
close(sws.ctrlStream)

case <-stream.Context().Done():
err = stream.Context().Err()
// the only server-side cancellation is noleader for now.
if err == context.Canceled {
err = rpctypes.ErrGRPCNoLeader
if md, ok := metadata.FromIncomingContext(stream.Context()); ok {
rl := md.Get(rpctypes.MetadataRequireLeaderKey)
if len(rl) > 0 && rl[0] == "true" {
return rpctypes.ErrGRPCNoLeader
}
}
return rpctypes.ErrGRPCWatchCanceled
}
}

sws.close()
return err
}

Expand Down
4 changes: 4 additions & 0 deletions tests/e2e/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func metricsTest(cx ctlCtx) {
{"/metrics", fmt.Sprintf("etcd_mvcc_delete_total 3")},
{"/metrics", fmt.Sprintf(`etcd_server_version{server_version="%s"} 1`, version.Version)},
{"/metrics", fmt.Sprintf(`etcd_cluster_version{cluster_version="%s"} 1`, version.Cluster(version.Version))},
{"/metrics", fmt.Sprintf(`grpc_server_handled_total{grpc_code="Canceled",grpc_method="Watch",grpc_service="etcdserverpb.Watch",grpc_type="bidi_stream"} 6`)},
{"/health", `{"health":"true"}`},
} {
i++
Expand All @@ -59,6 +60,9 @@ func metricsTest(cx ctlCtx) {
cx.t.Fatal(err)
}

if err := ctlV3Watch(cx, []string{"k", "--rev", "1"}, []kvExec{{key: "k", val: "v"}}...); err != nil {
cx.t.Fatal(err)
}
if err := cURLGet(cx.epc, cURLReq{endpoint: test.endpoint, expected: test.expected, metricsURLScheme: cx.cfg.metricsURLScheme}); err != nil {
cx.t.Fatalf("failed get with curl (%v)", err)
}
Expand Down

0 comments on commit 0fb26df

Please sign in to comment.