Skip to content

Commit

Permalink
*: implement Endpoint Watch and new Resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
limeng01 committed Feb 8, 2021
1 parent dae29bb commit 8feb55f
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 96 deletions.
4 changes: 2 additions & 2 deletions Documentation/dev-guide/grpc_naming.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

cli, cerr := clientv3.NewFromURL("http://localhost:2379")
etcdResolver, err := resolver.NewBuilder(clus.RandClient());
conn, gerr := grpc.Dial("etcd://foo/bar/my-service", grpc.WithResolvers(etcdResolver))
conn, gerr := grpc.Dial("etcd:///foo/bar/my-service", grpc.WithResolvers(etcdResolver))
```

## Managing service endpoints
Expand Down Expand Up @@ -86,4 +86,4 @@ em := endpoints.NewManager(c, "foo")
err := em.Update(context.TODO(), []*endpoints.UpdateWithOpts{
endpoints.NewDeleteUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.4"}),
endpoints.NewAddUpdateOpts("foo/bar/my-service/e1", endpoints.Endpoint{Addr: "1.2.3.14"})})
```
```
17 changes: 15 additions & 2 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ type Client struct {

callOpts []grpc.CallOption

lg *zap.Logger
lgMu *sync.RWMutex
lg *zap.Logger
}

// New creates a new etcdv3 client from a given configuration.
Expand All @@ -112,7 +113,7 @@ func New(cfg Config) (*Client, error) {
// service interface implementations and do not need connection management.
func NewCtxClient(ctx context.Context) *Client {
cctx, cancel := context.WithCancel(ctx)
return &Client{ctx: cctx, cancel: cancel, lg: zap.NewNop()}
return &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex), lg: zap.NewNop()}
}

// NewFromURL creates a new etcdv3 client from a URL.
Expand All @@ -127,10 +128,21 @@ func NewFromURLs(urls []string) (*Client, error) {

// WithLogger sets a logger
func (c *Client) WithLogger(lg *zap.Logger) *Client {
c.lgMu.Lock()
c.lg = lg
c.lgMu.Unlock()
return c
}

// GetLogger gets the logger.
// NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger.
func (c *Client) GetLogger() *zap.Logger {
c.lgMu.RLock()
l := c.lg
c.lgMu.RUnlock()
return l
}

// Close shuts down the client's etcd connections.
func (c *Client) Close() error {
c.cancel()
Expand Down Expand Up @@ -382,6 +394,7 @@ func newClient(cfg *Config) (*Client, error) {
cancel: cancel,
mu: new(sync.RWMutex),
callOpts: defaultCallOpts,
lgMu: new(sync.RWMutex),
}

lcfg := logutil.DefaultZapLoggerConfig
Expand Down
2 changes: 1 addition & 1 deletion client/v3/naming/endpoints/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type Update struct {
}

// WatchChannel is used to deliver notifications about endpoints updates.
type WatchChannel chan []*Update
type WatchChannel <-chan []*Update

// Key2EndpointMap maps etcd key into struct describing the endpoint.
type Key2EndpointMap map[string]Endpoint
Expand Down
145 changes: 77 additions & 68 deletions client/v3/naming/endpoints/endpoints_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"

clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/naming/endpoints/internal"

"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
Expand Down Expand Up @@ -78,73 +78,82 @@ func (m *endpointManager) DeleteEndpoint(ctx context.Context, key string, opts .
}

func (m *endpointManager) NewWatchChannel(ctx context.Context) (WatchChannel, error) {
return nil, fmt.Errorf("Not implemented yet")

// TODO: Implementation to be inspired by:
// Next gets the next set of updates from the etcd resolver.
//// Calls to Next should be serialized; concurrent calls are not safe since
//// there is no way to reconcile the update ordering.
//func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
// if gw.wch == nil {
// // first Next() returns all addresses
// return gw.firstNext()
// }
// if gw.err != nil {
// return nil, gw.err
// }
//
// // process new events on target/*
// wr, ok := <-gw.wch
// if !ok {
// gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
// return nil, gw.err
// }
// if gw.err = wr.Err(); gw.err != nil {
// return nil, gw.err
// }
//
// updates := make([]*naming.Update, 0, len(wr.Events))
// for _, e := range wr.Events {
// var jupdate naming.Update
// var err error
// switch e.Type {
// case etcd.EventTypePut:
// err = json.Unmarshal(e.Kv.Value, &jupdate)
// jupdate.Op = naming.Add
// case etcd.EventTypeDelete:
// err = json.Unmarshal(e.PrevKv.Value, &jupdate)
// jupdate.Op = naming.Delete
// default:
// continue
// }
// if err == nil {
// updates = append(updates, &jupdate)
// }
// }
// return updates, nil
//}
//
//func (gw *gRPCWatcher) firstNext() ([]*naming.Update, error) {
// // Use serialized request so resolution still works if the target etcd
// // server is partitioned away from the quorum.
// resp, err := gw.c.Get(gw.ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
// if gw.err = err; err != nil {
// return nil, err
// }
//
// updates := make([]*naming.Update, 0, len(resp.Kvs))
// for _, kv := range resp.Kvs {
// var jupdate naming.Update
// if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
// continue
// }
// updates = append(updates, &jupdate)
// }
//
// opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
// gw.wch = gw.c.Watch(gw.ctx, gw.target, opts...)
// return updates, nil
//}
resp, err := m.client.Get(ctx, m.target, clientv3.WithPrefix(), clientv3.WithSerializable())
if err != nil {
return nil, err
}

lg := m.client.GetLogger()
initUpdates := make([]*Update, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
var iup internal.Update
if err := json.Unmarshal(kv.Value, &iup); err != nil {
lg.Warn("unmarshal endpoint update failed", zap.String("key", string(kv.Key)), zap.Error(err))
continue
}
up := &Update{
Op: Add,
Key: string(kv.Key),
Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata},
}
initUpdates = append(initUpdates, up)
}

upch := make(chan []*Update, 1)
if len(initUpdates) > 0 {
upch <- initUpdates
}
go m.watch(ctx, resp.Header.Revision+1, upch)
return upch, nil
}

func (m *endpointManager) watch(ctx context.Context, rev int64, upch chan []*Update) {
defer close(upch)

lg := m.client.GetLogger()
opts := []clientv3.OpOption{clientv3.WithRev(rev), clientv3.WithPrefix()}
wch := m.client.Watch(ctx, m.target, opts...)
for {
select {
case <-ctx.Done():
return
case wresp, ok := <-wch:
if !ok {
lg.Warn("watch closed", zap.String("target", m.target))
return
}
if wresp.Err() != nil {
lg.Warn("watch failed", zap.String("target", m.target), zap.Error(wresp.Err()))
return
}

deltaUps := make([]*Update, 0, len(wresp.Events))
for _, e := range wresp.Events {
var iup internal.Update
var err error
var op Operation
switch e.Type {
case clientv3.EventTypePut:
err = json.Unmarshal(e.Kv.Value, &iup)
op = Add
if err != nil {
lg.Warn("unmarshal endpoint update failed", zap.String("key", string(e.Kv.Key)), zap.Error(err))
continue
}
case clientv3.EventTypeDelete:
iup = internal.Update{Op: internal.Delete}
op = Delete
default:
continue
}
up := &Update{Op: op, Key: string(e.Kv.Key), Endpoint: Endpoint{Addr: iup.Addr, Metadata: iup.Metadata}}
deltaUps = append(deltaUps, up)
}
if len(deltaUps) > 0 {
upch <- deltaUps
}
}
}
}

func (m *endpointManager) List(ctx context.Context) (Key2EndpointMap, error) {
Expand Down
99 changes: 91 additions & 8 deletions client/v3/naming/resolver/resolver.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,107 @@
package resolver

import (
"context"
"sync"

clientv3 "go.etcd.io/etcd/client/v3"
"google.golang.org/grpc/resolver"
"go.etcd.io/etcd/client/v3/naming/endpoints"

"google.golang.org/grpc/codes"
gresolver "google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)

type builder struct {
// ...
c *clientv3.Client
}

func (b builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
// To be implemented...
// Using endpoints.NewWatcher() to subscribe for endpoints changes.
return nil, nil
func (b builder) Build(target gresolver.Target, cc gresolver.ClientConn, opts gresolver.BuildOptions) (gresolver.Resolver, error) {
r := &resolver{
c: b.c,
target: target.Endpoint,
cc: cc,
}
r.ctx, r.cancel = context.WithCancel(context.Background())

em, err := endpoints.NewManager(r.c, r.target)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "resolver: failed to new endpoint manager: %s", err)
}
r.wch, err = em.NewWatchChannel(r.ctx)
if err != nil {
return nil, status.Errorf(codes.Internal, "resolver: failed to new watch channer: %s", err)
}

r.wg.Add(1)
go r.watch()
return r, nil
}

func (b builder) Scheme() string {
return "etcd"
}

func NewBuilder(client *clientv3.Client) (resolver.Builder, error) {
return builder{}, nil
// NewBuilder creates a resolver builder.
func NewBuilder(client *clientv3.Client) (gresolver.Builder, error) {
return builder{c: client}, nil
}

type resolver struct {
c *clientv3.Client
target string
cc gresolver.ClientConn
wch endpoints.WatchChannel
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
}

func (r *resolver) watch() {
defer r.wg.Done()

allUps := make(map[string]*endpoints.Update)
for {
select {
case <-r.ctx.Done():
return
case ups, ok := <-r.wch:
if !ok {
return
}

for _, up := range ups {
switch up.Op {
case endpoints.Add:
allUps[up.Key] = up
case endpoints.Delete:
delete(allUps, up.Key)
}
}

addrs := convertToGRPCAddress(allUps)
r.cc.UpdateState(gresolver.State{Addresses: addrs})
}
}
}

func convertToGRPCAddress(ups map[string]*endpoints.Update) []gresolver.Address {
var addrs []gresolver.Address
for _, up := range ups {
addr := gresolver.Address{
Addr: up.Endpoint.Addr,
Metadata: up.Endpoint.Metadata,
}
addrs = append(addrs, addr)
}
return addrs
}

// ResolveNow is a no-op here.
// It's just a hint, resolver can ignore this if it's not necessary.
func (r *resolver) ResolveNow(gresolver.ResolveNowOptions) {}

func (r *resolver) Close() {
r.cancel()
r.wg.Wait()
}
Loading

0 comments on commit 8feb55f

Please sign in to comment.