diff --git a/.words b/.words index 1f6695c8a3f..7e36e7d257f 100644 --- a/.words +++ b/.words @@ -30,6 +30,7 @@ etcd gRPC goroutine goroutines +hasleader healthcheck hostname iff diff --git a/clientv3/client.go b/clientv3/client.go index ddaed0f6601..3a4341b22ed 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -37,7 +37,6 @@ import ( "google.golang.org/grpc/codes" grpccredentials "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -393,13 +392,6 @@ func (c *Client) dialWithBalancerCreds(ep string) grpccredentials.TransportCrede return creds } -// WithRequireLeader requires client requests to only succeed -// when the cluster has a leader. -func WithRequireLeader(ctx context.Context) context.Context { - md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - return metadata.NewOutgoingContext(ctx, md) -} - func newClient(cfg *Config) (*Client, error) { if cfg == nil { cfg = &Config{} diff --git a/clientv3/ctx.go b/clientv3/ctx.go new file mode 100644 index 00000000000..869b0fa6911 --- /dev/null +++ b/clientv3/ctx.go @@ -0,0 +1,48 @@ +// Copyright 2020 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "context" + + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/version" + "google.golang.org/grpc/metadata" +) + +// WithRequireLeader requires client requests to only succeed +// when the cluster has a leader. +func WithRequireLeader(ctx context.Context) context.Context { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { // no outgoing metadata ctx key, create one + md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) + return metadata.NewOutgoingContext(ctx, md) + } + // overwrite/add 'hasleader' key/value + md.Set(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) + return metadata.NewOutgoingContext(ctx, md) +} + +// embeds client version +func withVersion(ctx context.Context) context.Context { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { // no outgoing metadata ctx key, create one + md = metadata.Pairs(rpctypes.MetadataClientAPIVersionKey, version.APIVersion) + return metadata.NewOutgoingContext(ctx, md) + } + // overwrite/add version key/value + md.Set(rpctypes.MetadataClientAPIVersionKey, version.APIVersion) + return metadata.NewOutgoingContext(ctx, md) +} diff --git a/clientv3/ctx_test.go b/clientv3/ctx_test.go new file mode 100644 index 00000000000..89d966643e9 --- /dev/null +++ b/clientv3/ctx_test.go @@ -0,0 +1,67 @@ +// Copyright 2020 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "context" + "reflect" + "testing" + + "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" + "go.etcd.io/etcd/version" + "google.golang.org/grpc/metadata" +) + +func TestMetadataWithRequireLeader(t *testing.T) { + ctx := context.TODO() + md, ok := metadata.FromOutgoingContext(ctx) + if ok { + t.Fatal("expected no outgoing metadata ctx key") + } + + // add a conflicting key with some other value + md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, "invalid") + // add a key, and expect not be overwritten + md.Set("hello", "1", "2") + ctx = metadata.NewOutgoingContext(ctx, md) + + // expect overwrites but still keep other keys + ctx = WithRequireLeader(ctx) + md, ok = metadata.FromOutgoingContext(ctx) + if !ok { + t.Fatal("expected outgoing metadata ctx key") + } + if ss := md.Get(rpctypes.MetadataRequireLeaderKey); !reflect.DeepEqual(ss, []string{rpctypes.MetadataHasLeader}) { + t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataRequireLeaderKey, ss) + } + if ss := md.Get("hello"); !reflect.DeepEqual(ss, []string{"1", "2"}) { + t.Fatalf("unexpected metadata for 'hello' %v", ss) + } +} + +func TestMetadataWithClientAPIVersion(t *testing.T) { + ctx := withVersion(WithRequireLeader(context.TODO())) + + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + t.Fatal("expected outgoing metadata ctx key") + } + if ss := md.Get(rpctypes.MetadataRequireLeaderKey); !reflect.DeepEqual(ss, []string{rpctypes.MetadataHasLeader}) { + t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataRequireLeaderKey, ss) + } + if ss := md.Get(rpctypes.MetadataClientAPIVersionKey); !reflect.DeepEqual(ss, []string{version.APIVersion}) { + t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataClientAPIVersionKey, ss) + } +} diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 4f2a28a3e62..af8d1cad715 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -17,8 +17,10 @@ package integration import ( "bytes" "context" + "fmt" "os" "reflect" + "strconv" "strings" "testing" "time" @@ -28,6 +30,7 @@ import ( "go.etcd.io/etcd/integration" "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/testutil" + "go.etcd.io/etcd/version" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -208,6 +211,22 @@ func TestKVPutWithRequireLeader(t *testing.T) { t.Fatal(err) } + cnt, err := clus.Members[0].Metric( + "etcd_server_client_requests_total", + `type="unary"`, + fmt.Sprintf(`client_api_version="%v"`, version.APIVersion), + ) + if err != nil { + t.Fatal(err) + } + cv, err := strconv.ParseInt(cnt, 10, 32) + if err != nil { + t.Fatal(err) + } + if cv < 1 { // >1 when retried + t.Fatalf("expected at least 1, got %q", cnt) + } + // clients may give timeout errors since the members are stopped; take // the clients so that terminating the cluster won't complain clus.Client(1).Close() diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index 007cbc38ec6..59137b36096 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -20,6 +20,7 @@ import ( "math/rand" "reflect" "sort" + "strconv" "testing" "time" @@ -29,6 +30,7 @@ import ( "go.etcd.io/etcd/integration" mvccpb "go.etcd.io/etcd/mvcc/mvccpb" "go.etcd.io/etcd/pkg/testutil" + "go.etcd.io/etcd/version" "google.golang.org/grpc/metadata" ) @@ -839,6 +841,22 @@ func TestWatchWithRequireLeader(t *testing.T) { if _, ok := <-chNoLeader; !ok { t.Fatalf("expected response, got closed channel") } + + cnt, err := clus.Members[0].Metric( + "etcd_server_client_requests_total", + `type="stream"`, + fmt.Sprintf(`client_api_version="%v"`, version.APIVersion), + ) + if err != nil { + t.Fatal(err) + } + cv, err := strconv.ParseInt(cnt, 10, 32) + if err != nil { + t.Fatal(err) + } + if cv < 2 { // >2 when retried + t.Fatalf("expected at least 2, got %q", cnt) + } } // TestWatchWithFilter checks that watch filtering works. diff --git a/clientv3/retry_interceptor.go b/clientv3/retry_interceptor.go index aac679ecccb..2c266e55bec 100644 --- a/clientv3/retry_interceptor.go +++ b/clientv3/retry_interceptor.go @@ -38,6 +38,7 @@ import ( func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.UnaryClientInterceptor { intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + ctx = withVersion(ctx) grpcOpts, retryOpts := filterCallOptions(opts) callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) // short circuit for simplicity, and avoiding allocations. @@ -103,6 +104,7 @@ func (c *Client) unaryClientInterceptor(logger *zap.Logger, optFuncs ...retryOpt func (c *Client) streamClientInterceptor(logger *zap.Logger, optFuncs ...retryOption) grpc.StreamClientInterceptor { intOpts := reuseOrNewWithCallOptions(defaultOptions, optFuncs) return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + ctx = withVersion(ctx) grpcOpts, retryOpts := filterCallOptions(opts) callOpts := reuseOrNewWithCallOptions(intOpts, retryOpts) // short circuit for simplicity, and avoiding allocations. diff --git a/etcdserver/api/v3rpc/interceptor.go b/etcdserver/api/v3rpc/interceptor.go index 00cad41ae52..bd102490e75 100644 --- a/etcdserver/api/v3rpc/interceptor.go +++ b/etcdserver/api/v3rpc/interceptor.go @@ -53,6 +53,12 @@ func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor { md, ok := metadata.FromIncomingContext(ctx) if ok { + ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey) + if len(vs) > 0 { + ver = vs[0] + } + clientRequests.WithLabelValues("unary", ver).Inc() + if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { if s.Leader() == types.ID(raft.None) { return nil, rpctypes.ErrGRPCNoLeader @@ -184,6 +190,12 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor md, ok := metadata.FromIncomingContext(ss.Context()) if ok { + ver, vs := "unknown", md.Get(rpctypes.MetadataClientAPIVersionKey) + if len(vs) > 0 { + ver = vs[0] + } + clientRequests.WithLabelValues("stream", ver).Inc() + if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader { if s.Leader() == types.ID(raft.None) { return rpctypes.ErrGRPCNoLeader @@ -202,7 +214,6 @@ func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor smap.mu.Unlock() cancel() }() - } } diff --git a/etcdserver/api/v3rpc/metrics.go b/etcdserver/api/v3rpc/metrics.go index d633d27c2cb..a4ee723c52f 100644 --- a/etcdserver/api/v3rpc/metrics.go +++ b/etcdserver/api/v3rpc/metrics.go @@ -39,10 +39,20 @@ var ( }, []string{"Type", "API"}, ) + + clientRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "server", + Name: "client_requests_total", + Help: "The total number of client requests per client version.", + }, + []string{"type", "client_api_version"}, + ) ) func init() { prometheus.MustRegister(sentBytes) prometheus.MustRegister(receivedBytes) prometheus.MustRegister(streamFailures) + prometheus.MustRegister(clientRequests) } diff --git a/etcdserver/api/v3rpc/rpctypes/md.go b/etcdserver/api/v3rpc/rpctypes/md.go index 5c590e1aec9..90b8b835b16 100644 --- a/etcdserver/api/v3rpc/rpctypes/md.go +++ b/etcdserver/api/v3rpc/rpctypes/md.go @@ -17,4 +17,6 @@ package rpctypes var ( MetadataRequireLeaderKey = "hasleader" MetadataHasLeader = "true" + + MetadataClientAPIVersionKey = "client-api-version" ) diff --git a/integration/cluster.go b/integration/cluster.go index 6a1a95f7f00..50af3c2b9ec 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -1143,7 +1143,7 @@ func (m *member) Terminate(t testing.TB) { } // Metric gets the metric value for a member -func (m *member) Metric(metricName string) (string, error) { +func (m *member) Metric(metricName string, expectLabels ...string) (string, error) { cfgtls := transport.TLSInfo{} tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second) if err != nil { @@ -1161,9 +1161,20 @@ func (m *member) Metric(metricName string) (string, error) { } lines := strings.Split(string(b), "\n") for _, l := range lines { - if strings.HasPrefix(l, metricName) { - return strings.Split(l, " ")[1], nil + if !strings.HasPrefix(l, metricName) { + continue + } + ok := true + for _, lv := range expectLabels { + if !strings.Contains(l, lv) { + ok = false + break + } + } + if !ok { + continue } + return strings.Split(l, " ")[1], nil } return "", nil }