Skip to content

Commit

Permalink
Introduce Kubernetes interface to etcd client
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Jun 19, 2024
1 parent a043da5 commit d198d8b
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 14 deletions.
2 changes: 2 additions & 0 deletions client/v3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Client struct {
Watcher
Auth
Maintenance
Kubernetes Kubernetes

conn *grpc.ClientConn

Expand Down Expand Up @@ -447,6 +448,7 @@ func newClient(cfg *Config) (*Client, error) {
client.KV = NewKV(client)
client.Lease = NewLease(client)
client.Watcher = NewWatcher(client)
client.Kubernetes = NewKubernetes(client)
client.Auth = NewAuth(client)
client.Maintenance = NewMaintenance(client)

Expand Down
202 changes: 202 additions & 0 deletions client/v3/kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
package clientv3

import (
"context"

pb "go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
)

func NewKubernetes(c *Client) Kubernetes {
return &kubernetes{kv: RetryKVClient(c), watcher: newWatchFromWatchClient(pb.NewWatchClient(c.conn), c)}
}

type Kubernetes interface {
Get(ctx context.Context, key string, opts GetOptions) (KubernetesGetResponse, error)
List(ctx context.Context, prefix string, opts ListOptions) (KubernetesListResponse, error)
Count(ctx context.Context, prefix string) (int64, error)
OptimisticPut(ctx context.Context, key string, value []byte, opts PutOptions) (KubernetesPutResponse, error)
OptimisticDelete(ctx context.Context, key string, opts DeleteOptions) (KubernetesDeleteResponse, error)
Watch(ctx context.Context, key string, opts WatchOptions) WatchChan
RequestProgress(ctx context.Context, opts RequestProgressOptions) error
}

type WatchID string

type GetOptions struct {
Revision int64
}

type ListOptions struct {
Revision int64
Limit int64
Continue string
}

type WatchOptions struct {
StreamKey string
Revision int64
Prefix bool
}

type PutOptions struct {
ExpectedRevision int64
LeaseID LeaseID
}

type DeleteOptions struct {
ExpectedRevision int64
}

type RequestProgressOptions struct {
StreamKey string
}

type KubernetesGetResponse struct {
KV *mvccpb.KeyValue
Revision int64
}

type KubernetesListResponse struct {
KVs []*mvccpb.KeyValue
Count int64
Revision int64
}

type KubernetesPutResponse struct {
KV *mvccpb.KeyValue
Succeeded bool
Revision int64
}

type KubernetesDeleteResponse struct {
KV *mvccpb.KeyValue
Succeeded bool
Revision int64
}

type kubernetes struct {
kv pb.KVClient
watcher *watcher
}

func (k kubernetes) Get(ctx context.Context, key string, opts GetOptions) (resp KubernetesGetResponse, err error) {
rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(key),
Revision: opts.Revision,
Limit: 1,
})
if err != nil {
return resp, toErr(ctx, err)
}
resp.Revision = rangeResp.Header.Revision
if len(rangeResp.Kvs) == 1 {
resp.KV = rangeResp.Kvs[0]
}
return resp, nil
}

func (k kubernetes) List(ctx context.Context, prefix string, opts ListOptions) (resp KubernetesListResponse, err error) {
rangeStart := prefix + opts.Continue
rangeEnd := GetPrefixRangeEnd(prefix)

rangeResp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(rangeStart),
RangeEnd: []byte(rangeEnd),
Limit: opts.Limit,
Revision: opts.Revision,
})
if err != nil {
return resp, toErr(ctx, err)
}
resp.KVs = rangeResp.Kvs
resp.Count = rangeResp.Count
resp.Revision = rangeResp.Header.Revision
return resp, nil
}

func (k kubernetes) Count(ctx context.Context, prefix string) (int64, error) {
resp, err := k.kv.Range(ctx, &pb.RangeRequest{
Key: []byte(prefix),
RangeEnd: []byte(GetPrefixRangeEnd(prefix)),
CountOnly: true,
})
if err != nil {
return 0, toErr(ctx, err)
}
return resp.Count, nil
}

func (k kubernetes) OptimisticPut(ctx context.Context, key string, value []byte, opts PutOptions) (resp KubernetesPutResponse, err error) {
put := &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: &pb.PutRequest{Key: []byte(key), Value: value, Lease: int64(opts.LeaseID)}}}

txnResp, err := k.optimisticTxn(ctx, key, opts.ExpectedRevision, put)
if err != nil {
return resp, toErr(ctx, err)
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
if !txnResp.Succeeded {
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
}
return resp, nil
}

func (k kubernetes) OptimisticDelete(ctx context.Context, key string, opts DeleteOptions) (resp KubernetesDeleteResponse, err error) {
del := &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: &pb.DeleteRangeRequest{Key: []byte(key)}}}

txnResp, err := k.optimisticTxn(ctx, key, opts.ExpectedRevision, del)
if err != nil {
return resp, toErr(ctx, err)
}
resp.Succeeded = txnResp.Succeeded
resp.Revision = txnResp.Header.Revision
if !txnResp.Succeeded {
resp.KV = kvFromTxnResponse(txnResp.Responses[0])
}
return resp, nil
}

func (k kubernetes) optimisticTxn(ctx context.Context, key string, expectRevision int64, onSuccess *pb.RequestOp) (*pb.TxnResponse, error) {
txn := &pb.TxnRequest{
Compare: []*pb.Compare{&pb.Compare{
Result: pb.Compare_EQUAL,
Target: pb.Compare_MOD,
Key: []byte(key),
TargetUnion: &pb.Compare_ModRevision{ModRevision: expectRevision},
}},
Success: []*pb.RequestOp{onSuccess},
Failure: []*pb.RequestOp{{Request: &pb.RequestOp_RequestRange{RequestRange: &pb.RangeRequest{Key: []byte(key), Limit: 1}}}},
}
return k.kv.Txn(ctx, txn)
}

func kvFromTxnResponse(resp *pb.ResponseOp) *mvccpb.KeyValue {
getResponse := resp.GetResponseRange()
if len(getResponse.Kvs) == 1 {
return getResponse.Kvs[0]
}
return nil
}

func (k kubernetes) Watch(ctx context.Context, key string, opts WatchOptions) WatchChan {
ctx = WithRequireLeader(ctx)
if opts.StreamKey == "" {
opts.StreamKey = streamKeyFromCtx(ctx)
}

wr := &watchRequest{
ctx: ctx,
key: key,
end: GetPrefixRangeEnd(key),
rev: opts.Revision,
progressNotify: true,
prevKV: true,
retc: make(chan chan WatchResponse, 1),
}
return k.watcher.watch(ctx, opts.StreamKey, wr)
}

func (k kubernetes) RequestProgress(ctx context.Context, opts RequestProgressOptions) error {
return k.watcher.requestProgress(ctx, opts.StreamKey)
}
36 changes: 22 additions & 14 deletions client/v3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ type watchGRPCStream struct {
// ctx controls internal remote.Watch requests
ctx context.Context
// ctxKey is the key used when looking up this stream's context
ctxKey string
cancel context.CancelFunc
streamKey string
cancel context.CancelFunc

// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream
Expand Down Expand Up @@ -249,6 +249,10 @@ func NewWatcher(c *Client) Watcher {
}

func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
return newWatchFromWatchClient(wc, c)
}

func newWatchFromWatchClient(wc pb.WatchClient, c *Client) *watcher {
w := &watcher{
remote: wc,
streams: make(map[string]*watchGRPCStream),
Expand All @@ -271,14 +275,14 @@ func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
func (vc *valCtx) Err() error { return nil }

func (w *watcher) newWatcherGRPCStream(inctx context.Context) *watchGRPCStream {
func (w *watcher) newWatcherGrpcStream(inctx context.Context, streamKey string) *watchGRPCStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGRPCStream{
owner: w,
remote: w.remote,
callOpts: w.callOpts,
ctx: ctx,
ctxKey: streamKeyFromCtx(inctx),
streamKey: streamKey,
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
Expand All @@ -296,14 +300,14 @@ func (w *watcher) newWatcherGRPCStream(inctx context.Context) *watchGRPCStream {
// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
ow := opWatch(key, opts...)

var filters []pb.WatchCreateRequest_FilterType
if ow.filterPut {
filters = append(filters, pb.WatchCreateRequest_NOPUT)
}
if ow.filterDelete {
filters = append(filters, pb.WatchCreateRequest_NODELETE)
}
streamKey := streamKeyFromCtx(ctx)

wr := &watchRequest{
ctx: ctx,
Expand All @@ -317,10 +321,11 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
}
return w.watch(ctx, streamKey, wr)
}

func (w *watcher) watch(ctx context.Context, streamKey string, wr *watchRequest) WatchChan {
ok := false
ctxKey := streamKeyFromCtx(ctx)

var closeCh chan WatchResponse
for {
// find or allocate appropriate grpc watch stream
Expand All @@ -332,10 +337,10 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch
close(ch)
return ch
}
wgs := w.streams[ctxKey]
wgs := w.streams[streamKey]
if wgs == nil {
wgs = w.newWatcherGRPCStream(ctx)
w.streams[ctxKey] = wgs
wgs = w.newWatcherGrpcStream(ctx, streamKey)
w.streams[streamKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
Expand Down Expand Up @@ -404,16 +409,19 @@ func (w *watcher) Close() (err error) {
// RequestProgress requests a progress notify response be sent in all watch channels.
func (w *watcher) RequestProgress(ctx context.Context) (err error) {
ctxKey := streamKeyFromCtx(ctx)
return w.requestProgress(ctx, ctxKey)
}

func (w *watcher) requestProgress(ctx context.Context, streamKey string) (err error) {
w.mu.Lock()
if w.streams == nil {
w.mu.Unlock()
return errors.New("no stream found for context")
}
wgs := w.streams[ctxKey]
wgs := w.streams[streamKey]
if wgs == nil {
wgs = w.newWatcherGRPCStream(ctx)
w.streams[ctxKey] = wgs
wgs = w.newWatcherGrpcStream(ctx, streamKey)
w.streams[streamKey] = wgs
}
donec := wgs.donec
reqc := wgs.reqc
Expand Down Expand Up @@ -450,7 +458,7 @@ func (w *watcher) closeStream(wgs *watchGRPCStream) {
close(wgs.donec)
wgs.cancel()
if w.streams != nil {
delete(w.streams, wgs.ctxKey)
delete(w.streams, wgs.streamKey)
}
w.mu.Unlock()
}
Expand Down

0 comments on commit d198d8b

Please sign in to comment.