Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[3.4] Upgrade grpc-go to 1.29.1 and Backport "Introduce grpc-1.30+ compatible client/v3/naming API." #16795

Merged
merged 1 commit into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions Documentation/dev-guide/grpc_naming.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,54 +6,56 @@ etcd provides a gRPC resolver to support an alternative name system that fetches

## Using etcd discovery with go-grpc

The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client and given a target for resolution:
The etcd client provides a gRPC resolver for resolving gRPC endpoints with an etcd backend. The resolver is initialized with an etcd client:

```go
import (
"go.etcd.io/etcd/clientv3"
etcdnaming "go.etcd.io/etcd/clientv3/naming"
resolver "go.etcd.io/etcd/clientv3/naming/resolver"

"google.golang.org/grpc"
)

...

cli, cerr := clientv3.NewFromURL("http://localhost:2379")
r := &etcdnaming.GRPCResolver{Client: cli}
b := grpc.RoundRobin(r)
conn, gerr := grpc.Dial("my-service", grpc.WithBalancer(b), grpc.WithBlock(), ...)
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
```

## Managing service endpoints

The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "my-service/") with JSON-encoded go-grpc `naming.Update` values as potential service endpoints. Endpoints are added to the service by creating new keys and removed from the service by deleting keys.
The etcd resolver treats all keys under the prefix of the resolution target following a "/" (e.g., "foo/bar/my-service/")
with JSON-encoded (historically go-grpc `naming.Update`) values as potential service endpoints.
Endpoints are added to the service by creating new keys and removed from the service by deleting keys.

### Adding an endpoint

New endpoints can be added to the service through `etcdctl`:

```sh
ETCDCTL_API=3 etcdctl put my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
ETCDCTL_API=3 etcdctl put foo/bar/my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
```

The etcd client's `GRPCResolver.Update` method can also register new endpoints with a key matching the `Addr`:
The etcd client's `endpoints.Manager` method can also register new endpoints with a key matching the `Addr`:

```go
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Add, Addr: "1.2.3.4", Metadata: "..."})

em := endpoints.NewManager(client, "foo/bar/my-service")
err := em.AddEndpoint(context.TODO(),"foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
```

### Deleting an endpoint

Hosts can be deleted from the service through `etcdctl`:

```sh
ETCDCTL_API=3 etcdctl del my-service/1.2.3.4
ETCDCTL_API=3 etcdctl del foo/bar/my-service/1.2.3.4
```

The etcd client's `GRPCResolver.Update` method also supports deleting endpoints:
The etcd client's `endpoints.Manager` method also supports deleting endpoints:

```go
r.Update(context.TODO(), "my-service", naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
em := endpoints.NewManager(client, "foo/bar/my-service")
err := em.DeleteEndpoint(context.TODO(), "foo/bar/my-service/e1");
```

### Registering an endpoint with a lease
Expand All @@ -65,3 +67,21 @@ lease=`ETCDCTL_API=3 etcdctl lease grant 5 | cut -f2 -d' '`
ETCDCTL_API=3 etcdctl put --lease=$lease my-service/1.2.3.4 '{"Addr":"1.2.3.4","Metadata":"..."}'
ETCDCTL_API=3 etcdctl lease keep-alive $lease
```
In the golang:

```go
em := endpoints.NewManager(client, "foo/bar/my-service")
err := endpoints.AddEndpoint(context.TODO(), "foo/bar/my-service/e1", endpoints.Endpoint{Addr:"1.2.3.4"});
```

### Atomically updating endpoints

If it's desired to modify multiple endpoints in a single transaction, `endpoints.Manager` can be used directly:

```
em := endpoints.NewManager(c, "foo")

err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
```
135 changes: 135 additions & 0 deletions clientv3/integration/naming/endpoints_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package naming

import (
"context"
"reflect"
"testing"

etcd "go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/naming/endpoints"

"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"
)

func TestEndpointManager(t *testing.T) {
t.Skip("Not implemented yet")

defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

em, err := endpoints.NewManager(clus.RandClient(), "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}
ctx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
w, err := em.NewWatchChannel(ctx)
if err != nil {
t.Fatal("failed to establish watch", err)
}

e1 := endpoints.Endpoint{Addr: "127.0.0.1", Metadata: "metadata"}
err = em.AddEndpoint(context.TODO(), "foo/a1", e1)
if err != nil {
t.Fatal("failed to add foo", err)
}

us := <-w

if us == nil {
t.Fatal("failed to get update", err)
}

wu := endpoints.Update{
Op: endpoints.Add,
Key: "foo/a1",
Endpoint: e1,
}

if !reflect.DeepEqual(us[0], wu) {
t.Fatalf("up = %#v, want %#v", us[0], wu)
}

err = em.DeleteEndpoint(context.TODO(), "foo/a1")
if err != nil {
t.Fatalf("failed to udpate %v", err)
}

us = <-w
if err != nil {
t.Fatalf("failed to get udpate %v", err)
}

wu = endpoints.Update{
Op: endpoints.Delete,
Key: "foo/a1",
}

if !reflect.DeepEqual(us, wu) {
t.Fatalf("up = %#v, want %#v", us[1], wu)
}
}

// TestEndpointManagerAtomicity ensures the resolver will initialize
// correctly with multiple hosts and correctly receive multiple
// updates in a single revision.
func TestEndpointManagerAtomicity(t *testing.T) {
t.Skip("Not implemented yet")

defer testutil.AfterTest(t)

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

c := clus.RandClient()
em, err := endpoints.NewManager(c, "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}

err = em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
endpoints.NewAddUpdateOpts("foo/host", endpoints.Endpoint{Addr: "127.0.0.1:2000"}),
endpoints.NewAddUpdateOpts("foo/host2", endpoints.Endpoint{Addr: "127.0.0.1:2001"})})
if err != nil {
t.Fatal(err)
}

ctx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
w, err := em.NewWatchChannel(ctx)
if err != nil {
t.Fatal(err)
}

updates := <-w
if len(updates) != 2 {
t.Fatalf("expected two updates, got %+v", updates)
}

_, err = c.Txn(context.TODO()).Then(etcd.OpDelete("foo/host"), etcd.OpDelete("foo/host2")).Commit()
if err != nil {
t.Fatal(err)
}

updates = <-w
if len(updates) != 2 || (updates[0].Op != endpoints.Delete && updates[1].Op != endpoints.Delete) {
t.Fatalf("expected two delete updates, got %+v", updates)
}
}
70 changes: 70 additions & 0 deletions clientv3/integration/naming/resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2016 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package naming

import (
"context"
"testing"

"google.golang.org/grpc"

"go.etcd.io/etcd/clientv3/naming/endpoints"
"go.etcd.io/etcd/clientv3/naming/resolver"
"go.etcd.io/etcd/integration"
"go.etcd.io/etcd/pkg/testutil"
)

// This test mimics scenario described in grpc_naming.md doc.

func TestEtcdGrpcResolver(t *testing.T) {
t.Skip("Not implemented yet")

defer testutil.AfterTest(t)

// s1 := // TODO: Dummy GRPC service listening on 127.0.0.1:20000
// s2 := // TODO: Dummy GRPC service listening on 127.0.0.1:20001

clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

em, err := endpoints.NewManager(clus.RandClient(), "foo")
if err != nil {
t.Fatal("failed to create EndpointManager", err)
}

e1 := endpoints.Endpoint{Addr: "127.0.0.1:20000"}
e2 := endpoints.Endpoint{Addr: "127.0.0.1:20001"}

err = em.AddEndpoint(context.TODO(), "foo/e1", e1)
if err != nil {
t.Fatal("failed to add foo", err)
}
etcdResolver, err := resolver.NewBuilder(clus.RandClient())

conn, err := grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
if err != nil {
t.Fatal("failed to connect to foo (e1)", err)
}

// TODO: send requests to conn, ensure s1 received it.

em.DeleteEndpoint(context.TODO(), "foo/e1")
em.AddEndpoint(context.TODO(), "foo/e2", e2)

// TODO: Send requests to conn and make sure s2 receive it.
// Might require restarting s1 to break the existing (open) connection.

conn.GetState() // this line is to avoid compiler warning that conn is unused.
}
31 changes: 17 additions & 14 deletions clientv3/naming/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,44 +12,47 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package naming provides an etcd-backed gRPC resolver for discovering gRPC services.
// Package naming provides:
// - subpackage endpoints: an abstraction layer to store and read endpoints
// information from etcd.
// - subpackage resolver: an etcd-backed gRPC resolver for discovering gRPC
// services based on the endpoints configuration
//
// To use, first import the packages:
//
// import (
// "go.etcd.io/etcd/clientv3"
// etcdnaming "go.etcd.io/etcd/clientv3/naming"
//
// "go.etcd.io/etcd/clientv3/naming/endpoints"
// "go.etcd.io/etcd/clientv3/naming/resolver"
// "google.golang.org/grpc"
// "google.golang.org/grpc/naming"
// )
//
// First, register new endpoint addresses for a service:
//
// func etcdAdd(c *clientv3.Client, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr})
// em := endpoints.NewManager(c, service)
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr})
// }
//
// Dial an RPC service using the etcd gRPC resolver and a gRPC Balancer:
//
// func etcdDial(c *clientv3.Client, service string) (*grpc.ClientConn, error) {
// r := &etcdnaming.GRPCResolver{Client: c}
// b := grpc.RoundRobin(r)
// return grpc.Dial(service, grpc.WithBalancer(b))
// etcdResolver, err := resolver.NewBuilder(c);
// if err { return nil, err }
// return grpc.Dial("etc://foo", grpc.WithResolvers(etcdResolver))
// }
//
// Optionally, force delete an endpoint:
//
// func etcdDelete(c *clientv3, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Delete, Addr: "1.2.3.4"})
// em := endpoints.NewManager(c, service)
// return em.DeleteEndpoint(c.Ctx(), service+"/"+addr)
// }
//
// Or register an expiring endpoint with a lease:
//
// func etcdLeaseAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
// r := &etcdnaming.GRPCResolver{Client: c}
// return r.Update(c.Ctx(), service, naming.Update{Op: naming.Add, Addr: addr}, clientv3.WithLease(lid))
// func etcdAdd(c *clientv3.Client, lid clientv3.LeaseID, service, addr string) error {
// em := endpoints.NewManager(c, service)
// return em.AddEndpoint(c.Ctx(), service+"/"+addr, endpoints.Endpoint{Addr:addr}, clientv3.WithLease(lid))
// }
package naming
Loading