Skip to content

Commit

Permalink
Merge pull request #8335 from heyitsanthony/test-put-atmostonce
Browse files Browse the repository at this point in the history
clientv3: put at most once
  • Loading branch information
Anthony Romano authored Aug 1, 2017
2 parents 585b1d7 + fdba9e5 commit 9f1bfd9
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 60 deletions.
11 changes: 3 additions & 8 deletions clientv3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,9 @@ type Client struct {
conn *grpc.ClientConn
dialerrc chan error

cfg Config
creds *credentials.TransportCredentials
balancer *simpleBalancer
retryWrapper retryRpcFunc
retryAuthWrapper retryRpcFunc
cfg Config
creds *credentials.TransportCredentials
balancer *simpleBalancer

ctx context.Context
cancel context.CancelFunc
Expand Down Expand Up @@ -387,8 +385,6 @@ func newClient(cfg *Config) (*Client, error) {
return nil, err
}
client.conn = conn
client.retryWrapper = client.newRetryWrapper()
client.retryAuthWrapper = client.newAuthRetryWrapper()

// wait for a connection
if cfg.DialTimeout > 0 {
Expand Down Expand Up @@ -510,7 +506,6 @@ func toErr(ctx context.Context, err error) error {
err = ctx.Err()
}
case codes.Unavailable:
err = ErrNoAvailableEndpoints
case codes.FailedPrecondition:
err = grpc.ErrClientConnClosing
}
Expand Down
15 changes: 15 additions & 0 deletions clientv3/integration/dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package integration

import (
"math/rand"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -189,3 +190,17 @@ func TestDialForeignEndpoint(t *testing.T) {
t.Fatal(err)
}
}

// TestSetEndpointAndPut checks that a Put following a SetEndpoint
// to a working endpoint will always succeed.
func TestSetEndpointAndPut(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2})
defer clus.Terminate(t)

clus.Client(1).SetEndpoints(clus.Members[0].GRPCAddr())
_, err := clus.Client(1).Put(context.TODO(), "foo", "bar")
if err != nil && !strings.Contains(err.Error(), "closing") {
t.Fatal(err)
}
}
37 changes: 37 additions & 0 deletions clientv3/integration/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,3 +895,40 @@ func TestKVGetResetLoneEndpoint(t *testing.T) {
case <-donec:
}
}

// TestKVPutAtMostOnce ensures that a Put will only occur at most once
// in the presence of network errors.
func TestKVPutAtMostOnce(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

if _, err := clus.Client(0).Put(context.TODO(), "k", "1"); err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
clus.Members[0].DropConnections()
donec := make(chan struct{})
go func() {
defer close(donec)
for i := 0; i < 10; i++ {
clus.Members[0].DropConnections()
time.Sleep(5 * time.Millisecond)
}
}()
_, err := clus.Client(0).Put(context.TODO(), "k", "v")
<-donec
if err != nil {
break
}
}

resp, err := clus.Client(0).Get(context.TODO(), "k")
if err != nil {
t.Fatal(err)
}
if resp.Kvs[0].Version > 11 {
t.Fatalf("expected version <= 10, got %+v", resp.Kvs[0])
}
}
19 changes: 5 additions & 14 deletions clientv3/ordering/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,11 @@ func TestDetectKvOrderViolation(t *testing.T) {
cli, err := clientv3.New(cfg)
ctx := context.TODO()

cli.SetEndpoints(clus.Members[0].GRPCAddr())
_, err = cli.Put(ctx, "foo", "bar")
if err != nil {
if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil {
t.Fatal(err)
}
// ensure that the second member has the current revision for the key foo
cli.SetEndpoints(clus.Members[1].GRPCAddr())
_, err = cli.Get(ctx, "foo")
if err != nil {
if _, err = clus.Client(1).Get(ctx, "foo"); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -107,23 +103,18 @@ func TestDetectTxnOrderViolation(t *testing.T) {
cli, err := clientv3.New(cfg)
ctx := context.TODO()

cli.SetEndpoints(clus.Members[0].GRPCAddr())
_, err = cli.Put(ctx, "foo", "bar")
if err != nil {
if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil {
t.Fatal(err)
}
// ensure that the second member has the current revision for the key foo
cli.SetEndpoints(clus.Members[1].GRPCAddr())
_, err = cli.Get(ctx, "foo")
if err != nil {
if _, err = clus.Client(1).Get(ctx, "foo"); err != nil {
t.Fatal(err)
}

// stop third member in order to force the member to have an outdated revision
clus.Members[2].Stop(t)
time.Sleep(1 * time.Second) // give enough time for operation
_, err = cli.Put(ctx, "foo", "buzz")
if err != nil {
if _, err = clus.Client(1).Put(ctx, "foo", "buzz"); err != nil {
t.Fatal(err)
}

Expand Down
24 changes: 9 additions & 15 deletions clientv3/ordering/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,21 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
defer testutil.AfterTest(t)
clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3})
defer clus.Terminate(t)
cfg := clientv3.Config{
Endpoints: []string{
clus.Members[0].GRPCAddr(),
clus.Members[1].GRPCAddr(),
clus.Members[2].GRPCAddr(),
},
eps := []string{
clus.Members[0].GRPCAddr(),
clus.Members[1].GRPCAddr(),
clus.Members[2].GRPCAddr(),
}
cfg := clientv3.Config{Endpoints: []string{clus.Members[0].GRPCAddr()}}
cli, err := clientv3.New(cfg)
eps := cli.Endpoints()

ctx := context.TODO()

cli.SetEndpoints(clus.Members[0].GRPCAddr())
_, err = cli.Put(ctx, "foo", "bar")
if err != nil {
if _, err = clus.Client(0).Put(ctx, "foo", "bar"); err != nil {
t.Fatal(err)
}
// ensure that the second member has current revision for key "foo"
cli.SetEndpoints(clus.Members[1].GRPCAddr())
_, err = cli.Get(ctx, "foo")
if err != nil {
if _, err = clus.Client(1).Get(ctx, "foo"); err != nil {
t.Fatal(err)
}

Expand All @@ -58,8 +53,7 @@ func TestEndpointSwitchResolvesViolation(t *testing.T) {
time.Sleep(1 * time.Second) // give enough time for the operation

// update to "foo" will not be replicated to the third member due to the partition
_, err = cli.Put(ctx, "foo", "buzz")
if err != nil {
if _, err = clus.Client(1).Put(ctx, "foo", "buzz"); err != nil {
t.Fatal(err)
}

Expand Down
59 changes: 36 additions & 23 deletions clientv3/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,29 @@ import (

type rpcFunc func(ctx context.Context) error
type retryRpcFunc func(context.Context, rpcFunc) error
type retryStopErrFunc func(error) bool

func (c *Client) newRetryWrapper() retryRpcFunc {
return func(rpcCtx context.Context, f rpcFunc) error {
for {
err := f(rpcCtx)
if err == nil {
return nil
}
func isReadStopError(err error) bool {
eErr := rpctypes.Error(err)
// always stop retry on etcd errors
if _, ok := eErr.(rpctypes.EtcdError); ok {
return true
}
// only retry if unavailable
return grpc.Code(err) != codes.Unavailable
}

eErr := rpctypes.Error(err)
// always stop retry on etcd errors
if _, ok := eErr.(rpctypes.EtcdError); ok {
return err
}
func isWriteStopError(err error) bool {
return grpc.Code(err) != codes.Unavailable ||
grpc.ErrorDesc(err) != "there is no address available"
}

// only retry if unavailable
if grpc.Code(err) != codes.Unavailable {
func (c *Client) newRetryWrapper(isStop retryStopErrFunc) retryRpcFunc {
return func(rpcCtx context.Context, f rpcFunc) error {
for {
if err := f(rpcCtx); err == nil || isStop(err) {
return err
}

select {
case <-c.balancer.ConnectNotify():
case <-rpcCtx.Done():
Expand Down Expand Up @@ -79,17 +82,24 @@ func (c *Client) newAuthRetryWrapper() retryRpcFunc {

// RetryKVClient implements a KVClient that uses the client's FailFast retry policy.
func RetryKVClient(c *Client) pb.KVClient {
retryWrite := &retryWriteKVClient{pb.NewKVClient(c.conn), c.retryWrapper}
return &retryKVClient{&retryWriteKVClient{retryWrite, c.retryAuthWrapper}}
readRetry := c.newRetryWrapper(isReadStopError)
writeRetry := c.newRetryWrapper(isWriteStopError)
conn := pb.NewKVClient(c.conn)
retryBasic := &retryKVClient{&retryWriteKVClient{conn, writeRetry}, readRetry}
retryAuthWrapper := c.newAuthRetryWrapper()
return &retryKVClient{
&retryWriteKVClient{retryBasic, retryAuthWrapper},
retryAuthWrapper}
}

type retryKVClient struct {
*retryWriteKVClient
readRetry retryRpcFunc
}

func (rkv *retryKVClient) Range(ctx context.Context, in *pb.RangeRequest, opts ...grpc.CallOption) (resp *pb.RangeResponse, err error) {
err = rkv.retryf(ctx, func(rctx context.Context) error {
resp, err = rkv.retryWriteKVClient.Range(rctx, in, opts...)
err = rkv.readRetry(ctx, func(rctx context.Context) error {
resp, err = rkv.KVClient.Range(rctx, in, opts...)
return err
})
return resp, err
Expand Down Expand Up @@ -139,8 +149,11 @@ type retryLeaseClient struct {

// RetryLeaseClient implements a LeaseClient that uses the client's FailFast retry policy.
func RetryLeaseClient(c *Client) pb.LeaseClient {
retry := &retryLeaseClient{pb.NewLeaseClient(c.conn), c.retryWrapper}
return &retryLeaseClient{retry, c.retryAuthWrapper}
retry := &retryLeaseClient{
pb.NewLeaseClient(c.conn),
c.newRetryWrapper(isReadStopError),
}
return &retryLeaseClient{retry, c.newAuthRetryWrapper()}
}

func (rlc *retryLeaseClient) LeaseGrant(ctx context.Context, in *pb.LeaseGrantRequest, opts ...grpc.CallOption) (resp *pb.LeaseGrantResponse, err error) {
Expand All @@ -167,7 +180,7 @@ type retryClusterClient struct {

// RetryClusterClient implements a ClusterClient that uses the client's FailFast retry policy.
func RetryClusterClient(c *Client) pb.ClusterClient {
return &retryClusterClient{pb.NewClusterClient(c.conn), c.retryWrapper}
return &retryClusterClient{pb.NewClusterClient(c.conn), c.newRetryWrapper(isWriteStopError)}
}

func (rcc *retryClusterClient) MemberAdd(ctx context.Context, in *pb.MemberAddRequest, opts ...grpc.CallOption) (resp *pb.MemberAddResponse, err error) {
Expand Down Expand Up @@ -201,7 +214,7 @@ type retryAuthClient struct {

// RetryAuthClient implements a AuthClient that uses the client's FailFast retry policy.
func RetryAuthClient(c *Client) pb.AuthClient {
return &retryAuthClient{pb.NewAuthClient(c.conn), c.retryWrapper}
return &retryAuthClient{pb.NewAuthClient(c.conn), c.newRetryWrapper(isWriteStopError)}
}

func (rac *retryAuthClient) AuthEnable(ctx context.Context, in *pb.AuthEnableRequest, opts ...grpc.CallOption) (resp *pb.AuthEnableResponse, err error) {
Expand Down

0 comments on commit 9f1bfd9

Please sign in to comment.