diff --git a/clientv3/client.go b/clientv3/client.go index 5dc93af200a2..9c590a4dd166 100644 --- a/clientv3/client.go +++ b/clientv3/client.go @@ -32,7 +32,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -269,7 +268,11 @@ func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts } return conn, err } - opts = append(opts, grpc.WithDialer(f)) + opts = append(opts, + grpc.WithDialer(f), + grpc.WithUnaryInterceptor(newUnaryClientInterceptor()), + grpc.WithStreamInterceptor(newStreamClientInterceptor()), + ) creds := c.creds if _, _, scheme := parseEndpoint(endpoint); len(scheme) != 0 { @@ -284,6 +287,30 @@ func (c *Client) dialSetupOpts(endpoint string, dopts ...grpc.DialOption) (opts return opts } +func newUnaryClientInterceptor() grpc.UnaryClientInterceptor { + return func(ctx context.Context, + method string, + req, reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption) error { + ctx = withVersion(ctx) + return invoker(ctx, method, req, reply, cc, opts...) + } +} + +func newStreamClientInterceptor() grpc.StreamClientInterceptor { + return func(ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, + opts ...grpc.CallOption) (grpc.ClientStream, error) { + ctx = withVersion(ctx) + return streamer(ctx, desc, cc, method, opts...) + } +} + // Dial connects to a single endpoint using the client's config. func (c *Client) Dial(endpoint string) (*grpc.ClientConn, error) { return c.dial(endpoint) @@ -356,13 +383,6 @@ func (c *Client) dial(endpoint string, dopts ...grpc.DialOption) (*grpc.ClientCo return conn, nil } -// WithRequireLeader requires client requests to only succeed -// when the cluster has a leader. -func WithRequireLeader(ctx context.Context) context.Context { - md := metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) - return metadata.NewOutgoingContext(ctx, md) -} - func newClient(cfg *Config) (*Client, error) { if cfg == nil { cfg = &Config{} diff --git a/clientv3/ctx.go b/clientv3/ctx.go new file mode 100644 index 000000000000..00307fdd70f5 --- /dev/null +++ b/clientv3/ctx.go @@ -0,0 +1,62 @@ +// Copyright 2020 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "context" + "strings" + + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + "github.com/coreos/etcd/version" + "google.golang.org/grpc/metadata" +) + +// WithRequireLeader requires client requests to only succeed +// when the cluster has a leader. +func WithRequireLeader(ctx context.Context) context.Context { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { // no outgoing metadata ctx key, create one + md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) + return metadata.NewOutgoingContext(ctx, md) + } + // overwrite/add 'hasleader' key/value + metadataSet(md, rpctypes.MetadataRequireLeaderKey, rpctypes.MetadataHasLeader) + return metadata.NewOutgoingContext(ctx, md) +} + +// embeds client version +func withVersion(ctx context.Context) context.Context { + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { // no outgoing metadata ctx key, create one + md = metadata.Pairs(rpctypes.MetadataClientAPIVersionKey, version.APIVersion) + return metadata.NewOutgoingContext(ctx, md) + } + // overwrite/add version key/value + metadataSet(md, rpctypes.MetadataClientAPIVersionKey, version.APIVersion) + return metadata.NewOutgoingContext(ctx, md) +} + +func metadataGet(md metadata.MD, k string) []string { + k = strings.ToLower(k) + return md[k] +} + +func metadataSet(md metadata.MD, k string, vals ...string) { + if len(vals) == 0 { + return + } + k = strings.ToLower(k) + md[k] = vals +} diff --git a/clientv3/ctx_test.go b/clientv3/ctx_test.go new file mode 100644 index 000000000000..4a357dae6855 --- /dev/null +++ b/clientv3/ctx_test.go @@ -0,0 +1,67 @@ +// Copyright 2020 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package clientv3 + +import ( + "context" + "reflect" + "testing" + + "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" + "github.com/coreos/etcd/version" + "google.golang.org/grpc/metadata" +) + +func TestMetadataWithRequireLeader(t *testing.T) { + ctx := context.TODO() + md, ok := metadata.FromOutgoingContext(ctx) + if ok { + t.Fatal("expected no outgoing metadata ctx key") + } + + // add a conflicting key with some other value + md = metadata.Pairs(rpctypes.MetadataRequireLeaderKey, "invalid") + // add a key, and expect not be overwritten + metadataSet(md, "hello", "1", "2") + ctx = metadata.NewOutgoingContext(ctx, md) + + // expect overwrites but still keep other keys + ctx = WithRequireLeader(ctx) + md, ok = metadata.FromOutgoingContext(ctx) + if !ok { + t.Fatal("expected outgoing metadata ctx key") + } + if ss := metadataGet(md, rpctypes.MetadataRequireLeaderKey); !reflect.DeepEqual(ss, []string{rpctypes.MetadataHasLeader}) { + t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataRequireLeaderKey, ss) + } + if ss := metadataGet(md, "hello"); !reflect.DeepEqual(ss, []string{"1", "2"}) { + t.Fatalf("unexpected metadata for 'hello' %v", ss) + } +} + +func TestMetadataWithClientAPIVersion(t *testing.T) { + ctx := withVersion(WithRequireLeader(context.TODO())) + + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + t.Fatal("expected outgoing metadata ctx key") + } + if ss := metadataGet(md, rpctypes.MetadataRequireLeaderKey); !reflect.DeepEqual(ss, []string{rpctypes.MetadataHasLeader}) { + t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataRequireLeaderKey, ss) + } + if ss := metadataGet(md, rpctypes.MetadataClientAPIVersionKey); !reflect.DeepEqual(ss, []string{version.APIVersion}) { + t.Fatalf("unexpected metadata for %q %v", rpctypes.MetadataClientAPIVersionKey, ss) + } +}