From f4f7817e3e1565a82b669972d2e761e1d6cffdd0 Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Wed, 8 Nov 2017 11:01:05 -0800 Subject: [PATCH] clientv3/integration: add more tests on balancer switch, inflight range Test all possible cases of server shutdown with inflight range requests. Removed redundant tests in kv_test.go. Signed-off-by: Gyu-Ho Lee --- clientv3/integration/kv_test.go | 109 +---------------- clientv3/integration/server_shutdown_test.go | 116 +++++++++++++++++++ 2 files changed, 121 insertions(+), 104 deletions(-) diff --git a/clientv3/integration/kv_test.go b/clientv3/integration/kv_test.go index 69948a856bb9..2214a7892c49 100644 --- a/clientv3/integration/kv_test.go +++ b/clientv3/integration/kv_test.go @@ -17,7 +17,6 @@ package integration import ( "bytes" "context" - "math/rand" "os" "reflect" "strings" @@ -716,11 +715,11 @@ func TestKVGetRetry(t *testing.T) { } } -// TestKVPutFailGetRetry ensures a get will retry following a failed put. -func TestKVPutFailGetRetry(t *testing.T) { +// TestKVPutFail ensures put fails on stopped node. +func TestKVPutFail(t *testing.T) { defer testutil.AfterTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3}) + clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) defer clus.Terminate(t) kv := clus.Client(0) @@ -729,30 +728,8 @@ func TestKVPutFailGetRetry(t *testing.T) { ctx, cancel := context.WithTimeout(context.TODO(), time.Second) defer cancel() _, err := kv.Put(ctx, "foo", "bar") - if err == nil { - t.Fatalf("got success on disconnected put, wanted error") - } - - donec := make(chan struct{}) - go func() { - // Get will fail, but reconnect will trigger - gresp, gerr := kv.Get(context.TODO(), "foo") - if gerr != nil { - t.Fatal(gerr) - } - if len(gresp.Kvs) != 0 { - t.Fatalf("bad get kvs: got %+v, want empty", gresp.Kvs) - } - donec <- struct{}{} - }() - - time.Sleep(100 * time.Millisecond) - clus.Members[0].Restart(t) - - select { - case <-time.After(5 * time.Second): - t.Fatalf("timed out waiting for get") - case <-donec: + if err != context.DeadlineExceeded { + t.Fatalf("expected %v, got %v", context.DeadlineExceeded, err) } } @@ -826,82 +803,6 @@ func TestKVPutStoppedServerAndClose(t *testing.T) { } } -// TestKVGetOneEndpointDown ensures a client can connect and get if one endpoint is down. -func TestKVGetOneEndpointDown(t *testing.T) { - defer testutil.AfterTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 3, SkipCreatingClient: true}) - defer clus.Terminate(t) - - // get endpoint list - eps := make([]string, 3) - for i := range eps { - eps[i] = clus.Members[i].GRPCAddr() - } - - // make a dead node - clus.Members[rand.Intn(len(eps))].Stop(t) - - // try to connect with dead node in the endpoint list - cfg := clientv3.Config{Endpoints: eps, DialTimeout: 1 * time.Second} - cli, err := clientv3.New(cfg) - if err != nil { - t.Fatal(err) - } - defer cli.Close() - ctx, cancel := context.WithTimeout(context.TODO(), 3*time.Second) - if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil { - t.Fatal(err) - } - cancel() -} - -// TestKVGetResetLoneEndpoint ensures that if an endpoint resets and all other -// endpoints are down, then it will reconnect. -func TestKVGetResetLoneEndpoint(t *testing.T) { - defer testutil.AfterTest(t) - clus := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 2, SkipCreatingClient: true}) - defer clus.Terminate(t) - - // get endpoint list - eps := make([]string, 2) - for i := range eps { - eps[i] = clus.Members[i].GRPCAddr() - } - - cfg := clientv3.Config{Endpoints: eps, DialTimeout: 500 * time.Millisecond} - cli, err := clientv3.New(cfg) - if err != nil { - t.Fatal(err) - } - defer cli.Close() - - // disconnect everything - clus.Members[0].Stop(t) - clus.Members[1].Stop(t) - - // have Get try to reconnect - donec := make(chan struct{}) - go func() { - // 3-second is the minimum interval between endpoint being marked - // as unhealthy and being removed from unhealthy, so possibly - // takes >5-second to unpin and repin an endpoint - // TODO: decrease timeout when balancer switch rewrite - ctx, cancel := context.WithTimeout(context.TODO(), 7*time.Second) - if _, err := cli.Get(ctx, "abc", clientv3.WithSerializable()); err != nil { - t.Fatal(err) - } - cancel() - close(donec) - }() - time.Sleep(500 * time.Millisecond) - clus.Members[0].Restart(t) - select { - case <-time.After(10 * time.Second): - t.Fatalf("timed out waiting for Get") - case <-donec: - } -} - // TestKVPutAtMostOnce ensures that a Put will only occur at most once // in the presence of network errors. func TestKVPutAtMostOnce(t *testing.T) { diff --git a/clientv3/integration/server_shutdown_test.go b/clientv3/integration/server_shutdown_test.go index 74780030f9cf..a6b6f4d5c79d 100644 --- a/clientv3/integration/server_shutdown_test.go +++ b/clientv3/integration/server_shutdown_test.go @@ -235,3 +235,119 @@ func testBalancerUnderServerShutdownImmutable(t *testing.T, op func(*clientv3.Cl t.Errorf("failed to finish range request in time %v (timeout %v)", err, timeout) } } + +func TestBalancerUnderServerStopInflightLinearizableGetOnRestartPinLeaderStopPinFirst(t *testing.T) { + testBalancerUnderServerStopInflightRangeOnRestart(t, true, true, true) +} + +func TestBalancerUnderServerStopInflightLinearizableGetOnRestartPinLeaderStopPinLater(t *testing.T) { + testBalancerUnderServerStopInflightRangeOnRestart(t, true, true, false) +} + +func TestBalancerUnderServerStopInflightLinearizableGetOnRestartPinFollowerStopPinFirst(t *testing.T) { + testBalancerUnderServerStopInflightRangeOnRestart(t, true, false, true) +} + +func TestBalancerUnderServerStopInflightLinearizableGetOnRestartPinFollowerStopPinLater(t *testing.T) { + testBalancerUnderServerStopInflightRangeOnRestart(t, true, false, false) +} + +func TestBalancerUnderServerStopInflightSerializableGetOnRestartPinLeaderStopPinFirst(t *testing.T) { + testBalancerUnderServerStopInflightRangeOnRestart(t, false, true, true) +} + +func TestBalancerUnderServerStopInflightSerializableGetOnRestartPinLeaderStopPinLater(t *testing.T) { + testBalancerUnderServerStopInflightRangeOnRestart(t, false, true, false) +} + +func TestBalancerUnderServerStopInflightSerializableGetOnRestartPinFollowerStopPinFirst(t *testing.T) { + testBalancerUnderServerStopInflightRangeOnRestart(t, false, false, true) +} + +func TestBalancerUnderServerStopInflightSerializableGetOnRestartPinFollowerStopPinLater(t *testing.T) { + testBalancerUnderServerStopInflightRangeOnRestart(t, false, false, false) +} + +// testBalancerUnderServerStopInflightRangeOnRestart expects +// inflight range request reconnects on server restart. +func testBalancerUnderServerStopInflightRangeOnRestart(t *testing.T, linearizable, pinLeader, stopPinFirst bool) { + defer testutil.AfterTest(t) + + cfg := &integration.ClusterConfig{ + Size: 2, + SkipCreatingClient: true, + } + if linearizable { + cfg.Size = 3 + } + + clus := integration.NewClusterV3(t, cfg) + defer clus.Terminate(t) + eps := []string{clus.Members[0].GRPCAddr(), clus.Members[1].GRPCAddr()} + if linearizable { + eps = append(eps, clus.Members[2].GRPCAddr()) + } + + lead := clus.WaitLeader(t) + + target := lead + if !pinLeader { + target = (target + 1) % 2 + } + + // pin eps[target] + cli, err := clientv3.New(clientv3.Config{Endpoints: []string{eps[target]}}) + if err != nil { + t.Errorf("failed to create client: %v", err) + } + defer cli.Close() + + // wait for eps[target] to be pinned + mustWaitPinReady(t, cli) + + // add all eps to list, so that when the original pined one fails + // the client can switch to other available eps + cli.SetEndpoints(eps...) + + if stopPinFirst { + clus.Members[target].Stop(t) + // give some time for balancer switch before stopping the other + time.Sleep(time.Second) + clus.Members[(target+1)%2].Stop(t) + } else { + clus.Members[(target+1)%2].Stop(t) + // balancer cannot pin other member since it's already stopped + clus.Members[target].Stop(t) + } + + // 3-second is the minimum interval between endpoint being marked + // as unhealthy and being removed from unhealthy, so possibly + // takes >5-second to unpin and repin an endpoint + // TODO: decrease timeout when balancer switch rewrite + clientTimeout := 7 * time.Second + + var gops []clientv3.OpOption + if !linearizable { + gops = append(gops, clientv3.WithSerializable()) + } + + donec := make(chan struct{}) + go func() { + defer close(donec) + ctx, cancel := context.WithTimeout(context.TODO(), clientTimeout) + _, err := cli.Get(ctx, "abc", gops...) + cancel() + if err != nil { + t.Fatal(err) + } + }() + + time.Sleep(500 * time.Millisecond) + clus.Members[target].Restart(t) + + select { + case <-time.After(clientTimeout + 2*time.Second): + t.Fatalf("timed out waiting for Get") + case <-donec: + } +}