From 2064828868de725102c67c1e2168f585dd89539d Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Fri, 19 Apr 2019 12:24:41 +0200 Subject: [PATCH 1/4] storage: deflake TestNodeLivenessStatusMap Prior to this patch, this test would fail `stressrace` after a few dozen iterations. The root cause of this was the invalid call to `t.Parallel()`, which this patch removes. Additionally, this patch adapts TimeUntilStoreDead for each test case to avoid flakes, and removes a previous hack obviated by this simplification. Release note: None Co-authored-by: Tobias Schottdorf --- pkg/storage/node_liveness_test.go | 32 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/pkg/storage/node_liveness_test.go b/pkg/storage/node_liveness_test.go index 94f221e10093..dbf8ed0f1800 100644 --- a/pkg/storage/node_liveness_test.go +++ b/pkg/storage/node_liveness_test.go @@ -855,11 +855,14 @@ func TestNodeLivenessStatusMap(t *testing.T) { // See what comes up in the status. callerNodeLiveness := firstServer.GetNodeLiveness() - type expectedStatus struct { + type testCase struct { nodeID roachpb.NodeID expectedStatus storagepb.NodeLivenessStatus } - testData := []expectedStatus{ + + // Below we're going to check that all statuses converge and stabilize + // to a known situation. + testData := []testCase{ {liveNodeID, storagepb.NodeLivenessStatus_LIVE}, {deadNodeID, storagepb.NodeLivenessStatus_DEAD}, {decommissioningNodeID, storagepb.NodeLivenessStatus_DECOMMISSIONING}, @@ -867,9 +870,8 @@ func TestNodeLivenessStatusMap(t *testing.T) { } for _, test := range testData { - t.Run(test.expectedStatus.String(), func(t *testing.T) { + t.Run(fmt.Sprintf("n%d->%s", test.nodeID, test.expectedStatus), func(t *testing.T) { nodeID, expectedStatus := test.nodeID, test.expectedStatus - t.Parallel() testutils.SucceedsSoon(t, func() error { // Ensure that dead nodes are quickly recognized as dead by @@ -882,21 +884,17 @@ func TestNodeLivenessStatusMap(t *testing.T) { storage.TimeUntilStoreDead.Override(&firstServer.ClusterSettings().SV, storage.TestTimeUntilStoreDead) - log.Infof(ctx, "checking expected status for node %d", nodeID) + log.Infof(ctx, "checking expected status (%s) for node %d", expectedStatus, nodeID) nodeStatuses := callerNodeLiveness.GetLivenessStatusMap() - if st, ok := nodeStatuses[nodeID]; !ok { - return fmt.Errorf("%s node not in statuses", expectedStatus) - } else { - if st != expectedStatus { - if expectedStatus == storagepb.NodeLivenessStatus_DECOMMISSIONING && st == storagepb.NodeLivenessStatus_DECOMMISSIONED { - // Server somehow shut down super-fast. Tolerating the mismatch. - return nil - } - return fmt.Errorf("unexpected status: got %s, expected %s", - st, expectedStatus) - } + st, ok := nodeStatuses[nodeID] + if !ok { + return errors.Errorf("node %d: not in statuses\n", nodeID) + } + if st != expectedStatus { + return errors.Errorf("node %d: unexpected status: got %s, expected %s\n", + nodeID, st, expectedStatus, + ) } - log.Infof(ctx, "node %d status ok", nodeID) return nil }) }) From f281cf48159352204d896eb24f1442c4a78086a6 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 24 Apr 2019 12:17:34 +0200 Subject: [PATCH 2/4] tests: avoid t.Parallel() when the top test has defers Sub-tests that invoke `t.Parallel()` get to run concurrently with their parent test, and may be delayed arbitrarily past beyond the end of the termination of the parent test (including beyond the execution of its `defer` calls). This means it's unsafe to call `t.Parallel()` with e.g. `leaktest.AfterTest()`. This patch removes `t.Parallel` from the test in `sql/physical_props_test.go` and modifies `pkg/rpc/TestGRPCKeepaliveFailureFailsInflightRPCs` to use a wait group instead. Release note: None --- pkg/rpc/context_test.go | 383 ++++++++++++++++++--------------- pkg/sql/physical_props_test.go | 6 - 2 files changed, 214 insertions(+), 175 deletions(-) diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 1a3a5d6e296f..59347e7b6c01 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/log/logtags" "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -747,11 +748,16 @@ func TestRemoteOffsetUnhealthy(t *testing.T) { func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) { defer leaktest.AfterTest(t)() - testCases := []struct { + sc := log.Scope(t) + defer sc.Close(t) + + type testCase struct { cKeepalive, sKeepalive bool partitionC2S, partitionS2C bool expClose bool - }{ + } + + testCases := []testCase{ // Keepalive doesn't matter if the network is fine. {cKeepalive: false, sKeepalive: false, partitionC2S: false, partitionS2C: false, expClose: false}, @@ -779,194 +785,233 @@ func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) { {cKeepalive: true, sKeepalive: true, partitionC2S: false, partitionS2C: true, expClose: true}, {cKeepalive: true, sKeepalive: true, partitionC2S: true, partitionS2C: true, expClose: true}, } - for _, c := range testCases { - c := c // copy for parallel test - - // For consistent spacing in test names. - fmtBool := func(b bool) string { - s := strconv.FormatBool(b) - if b { - s += " " - } - return s + + // For consistent spacing in test names. + fmtBool := func(b bool) string { + s := strconv.FormatBool(b) + if b { + s += " " } - connIcon := func(partition bool) string { - if partition { - return "-X->" - } - return "--->" + return s + } + connIcon := func(partition bool) string { + if partition { + return "-X->" } + return "--->" + } - kaName := fmt.Sprintf("clientKeepalive=%s,serverKeepalive=%s", fmtBool(c.cKeepalive), fmtBool(c.sKeepalive)) - pName := fmt.Sprintf("client%sserver,server%sclient", connIcon(c.partitionC2S), connIcon(c.partitionS2C)) - t.Run(kaName+"/"+pName, func(t *testing.T) { - t.Parallel() + runTestCase := func(testCtx context.Context, c testCase) error { + var cKeepalive keepalive.ClientParameters + if c.cKeepalive { + cKeepalive = clientTestingKeepalive + } + var sKeepalive keepalive.ServerParameters + if c.sKeepalive { + sKeepalive = serverTestingKeepalive + } - var cKeepalive keepalive.ClientParameters - if c.cKeepalive { - cKeepalive = clientTestingKeepalive - } - var sKeepalive keepalive.ServerParameters - if c.sKeepalive { - sKeepalive = serverTestingKeepalive - } + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + ctx, cancel := stopper.WithCancelOnQuiesce(testCtx) + defer cancel() - stopper := stop.NewStopper() - defer stopper.Stop(context.TODO()) - ctx, cancel := stopper.WithCancelOnQuiesce(context.TODO()) - defer cancel() + // Construct server with server-side keepalive. + log.Infof(ctx, "constructing server") + clock := hlc.NewClock(timeutil.Unix(0, 20).UnixNano, time.Nanosecond) + serverCtx := newTestContext(clock, stopper) + s := newTestServer(t, serverCtx, grpc.KeepaliveParams(sKeepalive)) + + // Create heartbeat service. This service will continuously + // read on its input stream and send on its output stream. + log.Infof(ctx, "creating heartbeat service") + const msgInterval = 10 * time.Millisecond + hss := &HeartbeatStreamService{ + HeartbeatService: HeartbeatService{ + clock: clock, + remoteClockMonitor: serverCtx.RemoteClocks, + clusterID: &serverCtx.ClusterID, + version: serverCtx.version, + }, + interval: msgInterval, + } + RegisterHeartbeatServer(s, hss) + RegisterTestingHeartbeatStreamServer(s, hss) - // Construct server with server-side keepalive. - clock := hlc.NewClock(timeutil.Unix(0, 20).UnixNano, time.Nanosecond) - serverCtx := newTestContext(clock, stopper) - s := newTestServer(t, serverCtx, grpc.KeepaliveParams(sKeepalive)) - - // Create heartbeat service. This service will continuously - // read on its input stream and send on its output stream. - const msgInterval = 10 * time.Millisecond - hss := &HeartbeatStreamService{ - HeartbeatService: HeartbeatService{ - clock: clock, - remoteClockMonitor: serverCtx.RemoteClocks, - clusterID: &serverCtx.ClusterID, - version: serverCtx.version, - }, - interval: msgInterval, - } - RegisterHeartbeatServer(s, hss) - RegisterTestingHeartbeatStreamServer(s, hss) + ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr) + if err != nil { + return err + } + remoteAddr := ln.Addr().String() - ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr) - if err != nil { - t.Fatal(err) - } - remoteAddr := ln.Addr().String() + log.Infof(ctx, "setting up client") + clientCtx := newTestContext(clock, stopper) + // Disable automatic heartbeats. We'll send them by hand. + clientCtx.heartbeatInterval = math.MaxInt64 + + var firstConn int32 = 1 + + // We're going to open RPC transport connections using a dialer that returns + // PartitionableConns. We'll partition the first opened connection. + dialerCh := make(chan *testutils.PartitionableConn, 1) + clientCtx.AddTestingDialOpts( + grpc.WithDialer( + func(addr string, timeout time.Duration) (net.Conn, error) { + if !atomic.CompareAndSwapInt32(&firstConn, 1, 0) { + // If we allow gRPC to open a 2nd transport connection, then our RPCs + // might succeed if they're sent on that one. In the spirit of a + // partition, we'll return errors for the attempt to open a new + // connection (albeit for a TCP connection the error would come after + // a socket connect timeout). + return nil, errors.Errorf("No more connections for you. We're partitioned.") + } - clientCtx := newTestContext(clock, stopper) - // Disable automatic heartbeats. We'll send them by hand. - clientCtx.heartbeatInterval = math.MaxInt64 - - var firstConn int32 = 1 - - // We're going to open RPC transport connections using a dialer that returns - // PartitionableConns. We'll partition the first opened connection. - dialerCh := make(chan *testutils.PartitionableConn, 1) - clientCtx.AddTestingDialOpts( - grpc.WithDialer( - func(addr string, timeout time.Duration) (net.Conn, error) { - if !atomic.CompareAndSwapInt32(&firstConn, 1, 0) { - // If we allow gRPC to open a 2nd transport connection, then our RPCs - // might succeed if they're sent on that one. In the spirit of a - // partition, we'll return errors for the attempt to open a new - // connection (albeit for a TCP connection the error would come after - // a socket connect timeout). - return nil, errors.Errorf("No more connections for you. We're partitioned.") - } - - conn, err := net.DialTimeout("tcp", addr, timeout) - if err != nil { - return nil, err - } - transportConn := testutils.NewPartitionableConn(conn) - dialerCh <- transportConn - return transportConn, nil - }), - grpc.WithKeepaliveParams(cKeepalive), - ) - conn, err := clientCtx.GRPCDial(remoteAddr).Connect(ctx) - if err != nil { - t.Fatal(err) - } - defer func() { _ = conn.Close() }() + conn, err := net.DialTimeout("tcp", addr, timeout) + if err != nil { + return nil, err + } + transportConn := testutils.NewPartitionableConn(conn) + dialerCh <- transportConn + return transportConn, nil + }), + grpc.WithKeepaliveParams(cKeepalive), + ) + log.Infof(ctx, "dialing server") + conn, err := clientCtx.GRPCDial(remoteAddr).Connect(ctx) + if err != nil { + return err + } + defer func() { _ = conn.Close() }() - // Create the heartbeat client. - unlockedHeartbeatClient, err := NewTestingHeartbeatStreamClient(conn).PingStream(ctx) - if err != nil { - t.Fatal(err) - } - heartbeatClient := &lockedPingStreamClient{ - TestingHeartbeatStream_PingStreamClient: unlockedHeartbeatClient, - } + // Create the heartbeat client. + log.Infof(ctx, "starting heartbeat client") + unlockedHeartbeatClient, err := NewTestingHeartbeatStreamClient(conn).PingStream(ctx) + if err != nil { + return err + } + heartbeatClient := &lockedPingStreamClient{ + TestingHeartbeatStream_PingStreamClient: unlockedHeartbeatClient, + } - // Perform an initial request-response round trip. - request := PingRequest{ServerVersion: clientCtx.version.ServerVersion} - if err := heartbeatClient.Send(&request); err != nil { - t.Fatal(err) - } - if _, err := heartbeatClient.Recv(); err != nil { - t.Fatal(err) - } + // Perform an initial request-response round trip. + log.Infof(ctx, "first ping") + request := PingRequest{ServerVersion: clientCtx.version.ServerVersion} + if err := heartbeatClient.Send(&request); err != nil { + return err + } + if _, err := heartbeatClient.Recv(); err != nil { + return err + } - // Launch a goroutine to read from the channel continuously and - // a goroutine to write to the channel continuously. Both will - // exit when the channel breaks (either because of a partition - // or because the stopper stops). - go func() { - t := time.NewTicker(msgInterval) - defer t.Stop() - for { - <-t.C - if err := heartbeatClient.Send(&request); err != nil { - return - } + // Launch a goroutine to read from the channel continuously and + // a goroutine to write to the channel continuously. Both will + // exit when the channel breaks (either because of a partition + // or because the stopper stops). + go func() { + t := time.NewTicker(msgInterval) + defer t.Stop() + for { + <-t.C + log.Infof(ctx, "client send") + if err := heartbeatClient.Send(&request); err != nil { + return } - }() - go func() { - for { - if _, err := heartbeatClient.Recv(); err != nil { - return - } + } + }() + go func() { + for { + log.Infof(ctx, "client recv") + if _, err := heartbeatClient.Recv(); err != nil { + return } - }() + } + }() + + // Now partition either client->server, server->client, or both, and attempt + // to perform an RPC. We expect it to fail once the grpc keepalive fails to + // get a response from the server. - // Now partition either client->server, server->client, or both, and attempt - // to perform an RPC. We expect it to fail once the grpc keepalive fails to - // get a response from the server. + transportConn := <-dialerCh + defer transportConn.Finish() - transportConn := <-dialerCh - defer transportConn.Finish() + if c.partitionC2S { + log.Infof(ctx, "partition C2S") + transportConn.PartitionC2S() + } + if c.partitionS2C { + log.Infof(ctx, "partition S2C") + transportConn.PartitionS2C() + } - if c.partitionC2S { - transportConn.PartitionC2S() + // Check whether the connection eventually closes. We may need to + // adjust this duration if the test gets flaky. + const retryDur = 3 * time.Second + errNotClosed := errors.New("conn not closed") + closedErr := retry.ForDuration(retryDur, func() error { + err := heartbeatClient.Send(&request) + if err == nil { + log.Infof(ctx, "expected send error, got no error") + return errNotClosed } - if c.partitionS2C { - transportConn.PartitionS2C() + if !grpcutil.IsClosedConnection(err) { + newErr := fmt.Errorf("expected closed connection error, found %v", err) + log.Infof(ctx, "%+v", newErr) + return newErr } - - // Check whether the connection eventually closes. We may need to - // adjust this duration if the test gets flaky. - const retryDur = 3 * time.Second - errNotClosed := errors.New("conn not closed") - closedErr := retry.ForDuration(retryDur, func() error { - err := heartbeatClient.Send(&request) - if err == nil { - return errNotClosed - } - if !grpcutil.IsClosedConnection(err) { - t.Fatalf("expected closed connection error, found %v", err) - } - return nil - }) - if c.expClose { - if closedErr != nil { - t.Fatalf("expected closed connection, found %v", closedErr) - } - } else { - if closedErr != errNotClosed { - t.Fatalf("expected unclosed connection, found %v", closedErr) - } + return nil + }) + if c.expClose { + if closedErr != nil { + newErr := fmt.Errorf("expected closed connection, found %v", closedErr) + log.Infof(ctx, "%+v", newErr) + return newErr } + } else { + if closedErr != errNotClosed { + newErr := fmt.Errorf("expected unclosed connection, found %v", closedErr) + log.Infof(ctx, "%+v", newErr) + return newErr + } + } - // If the DialOptions we passed to gRPC didn't prevent it from opening new - // connections, then next RPCs would succeed since gRPC reconnects the - // transport (and that would succeed here since we've only partitioned one - // connection). We could further test that the status reported by - // Context.ConnHealth() for the remote node moves to UNAVAILABLE because of - // the (application-level) heartbeats performed by rpc.Context, but the - // behavior of our heartbeats in the face of transport failures is - // sufficiently tested in TestHeartbeatHealthTransport. - }) + log.Infof(ctx, "test done") + // If the DialOptions we passed to gRPC didn't prevent it from opening new + // connections, then next RPCs would succeed since gRPC reconnects the + // transport (and that would succeed here since we've only partitioned one + // connection). We could further test that the status reported by + // Context.ConnHealth() for the remote node moves to UNAVAILABLE because of + // the (application-level) heartbeats performed by rpc.Context, but the + // behavior of our heartbeats in the face of transport failures is + // sufficiently tested in TestHeartbeatHealthTransport. + return nil } + + // Run all the tests. + var wg sync.WaitGroup + wg.Add(len(testCases)) + errCh := make(chan error, len(testCases)) + for testNum, c := range testCases { + kaName := fmt.Sprintf("clientKeepalive=%s,serverKeepalive=%s", fmtBool(c.cKeepalive), fmtBool(c.sKeepalive)) + pName := fmt.Sprintf("client%sserver,server%sclient", connIcon(c.partitionC2S), connIcon(c.partitionS2C)) + testName := fmt.Sprintf("%d/%s/%s", testNum, kaName, pName) + ctx := logtags.AddTag(context.Background(), testName, nil) + + log.Infof(ctx, "starting sub-test") + go func(c testCase) { + errCh <- errors.Wrapf(runTestCase(ctx, c), "%+v", c) + wg.Done() + }(c) + } + log.Infof(context.Background(), "waiting for sub-tests to complete") + wg.Wait() + close(errCh) + + for err := range errCh { + if err != nil { + t.Errorf("%+v", err) + } + } + } func TestClusterIDMismatch(t *testing.T) { diff --git a/pkg/sql/physical_props_test.go b/pkg/sql/physical_props_test.go index ab30a7e5105a..d4ba1e334a00 100644 --- a/pkg/sql/physical_props_test.go +++ b/pkg/sql/physical_props_test.go @@ -357,7 +357,6 @@ func TestComputeOrderingMatch(t *testing.T) { for tcIdx := range ts.cases { tc := ts.cases[tcIdx] t.Run(fmt.Sprintf("line%d", tc.line), func(t *testing.T) { - t.Parallel() res := ts.existing.computeMatch(tc.desired) resRev := ts.existing.reverse().computeMatch(tc.desired) if res != tc.expected || resRev != tc.expectedReverse { @@ -383,7 +382,6 @@ func TestTrimOrderingGuarantee(t *testing.T) { for _, isKey := range []bool{false, true} { name := fmt.Sprintf("%d,%d,%d,%t", numConstCols, numEquiv, numOrderCols, isKey) t.Run(name, func(t *testing.T) { - t.Parallel() rng, _ := randutil.NewPseudoRand() for tries := 0; tries < 20; tries++ { if numOrderCols == 0 && isKey { @@ -559,7 +557,6 @@ func TestTrimOrdering(t *testing.T) { for i := range testCases { tc := testCases[i] t.Run(tc.name, func(t *testing.T) { - t.Parallel() tc.props.trim(tc.desired) if !propsEqual(tc.props, tc.expected) { t.Errorf("expected %s, got %s", tc.expected.AsString(nil), tc.props.AsString(nil)) @@ -756,7 +753,6 @@ func TestComputeMergeJoinOrdering(t *testing.T) { for i := range testCases { tc := testCases[i] t.Run(tc.name, func(t *testing.T) { - t.Parallel() result := computeMergeJoinOrdering(tc.a, tc.b, tc.colA, tc.colB) if !reflect.DeepEqual(tc.expected, result) { t.Errorf("expected %v, got %v", tc.expected, result) @@ -827,7 +823,6 @@ func TestProjectOrdering(t *testing.T) { for tIdx := range testCases { tc := testCases[tIdx] t.Run(fmt.Sprintf("%d", tIdx), func(t *testing.T) { - t.Parallel() res := ord.project(tc.columns) if !propsEqual(res, tc.expected) { t.Errorf("expected %s, got %s", tc.expected.AsString(nil), res.AsString(nil)) @@ -843,7 +838,6 @@ func TestRandomProps(t *testing.T) { for _, n := range []int{2, 5, 10} { t.Run(fmt.Sprintf("%d", n), func(t *testing.T) { - t.Parallel() rng, _ := randutil.NewPseudoRand() for it := 0; it < 100; it++ { o := physicalProps{} From efb765ceb06ebd451601ed3169a47a71c59523cc Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 24 Apr 2019 14:58:41 +0200 Subject: [PATCH 3/4] pkg/rpc: extract the test code from TestGRPCKeepaliveFailureFailsInflightRPCs Since this now runs on its own goroutine, it is invalid for it to access the surrounding `testing.T`. This patch moves the code so that `t` is clearly out of scope. Release note: None --- pkg/rpc/context_test.go | 395 ++++++++++++++++++++-------------------- 1 file changed, 201 insertions(+), 194 deletions(-) diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 59347e7b6c01..8f33880b7139 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -751,13 +751,7 @@ func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) { sc := log.Scope(t) defer sc.Close(t) - type testCase struct { - cKeepalive, sKeepalive bool - partitionC2S, partitionS2C bool - expClose bool - } - - testCases := []testCase{ + testCases := []grpcKeepaliveTestCase{ // Keepalive doesn't matter if the network is fine. {cKeepalive: false, sKeepalive: false, partitionC2S: false, partitionS2C: false, expClose: false}, @@ -801,191 +795,6 @@ func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) { return "--->" } - runTestCase := func(testCtx context.Context, c testCase) error { - var cKeepalive keepalive.ClientParameters - if c.cKeepalive { - cKeepalive = clientTestingKeepalive - } - var sKeepalive keepalive.ServerParameters - if c.sKeepalive { - sKeepalive = serverTestingKeepalive - } - - stopper := stop.NewStopper() - defer stopper.Stop(context.TODO()) - ctx, cancel := stopper.WithCancelOnQuiesce(testCtx) - defer cancel() - - // Construct server with server-side keepalive. - log.Infof(ctx, "constructing server") - clock := hlc.NewClock(timeutil.Unix(0, 20).UnixNano, time.Nanosecond) - serverCtx := newTestContext(clock, stopper) - s := newTestServer(t, serverCtx, grpc.KeepaliveParams(sKeepalive)) - - // Create heartbeat service. This service will continuously - // read on its input stream and send on its output stream. - log.Infof(ctx, "creating heartbeat service") - const msgInterval = 10 * time.Millisecond - hss := &HeartbeatStreamService{ - HeartbeatService: HeartbeatService{ - clock: clock, - remoteClockMonitor: serverCtx.RemoteClocks, - clusterID: &serverCtx.ClusterID, - version: serverCtx.version, - }, - interval: msgInterval, - } - RegisterHeartbeatServer(s, hss) - RegisterTestingHeartbeatStreamServer(s, hss) - - ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr) - if err != nil { - return err - } - remoteAddr := ln.Addr().String() - - log.Infof(ctx, "setting up client") - clientCtx := newTestContext(clock, stopper) - // Disable automatic heartbeats. We'll send them by hand. - clientCtx.heartbeatInterval = math.MaxInt64 - - var firstConn int32 = 1 - - // We're going to open RPC transport connections using a dialer that returns - // PartitionableConns. We'll partition the first opened connection. - dialerCh := make(chan *testutils.PartitionableConn, 1) - clientCtx.AddTestingDialOpts( - grpc.WithDialer( - func(addr string, timeout time.Duration) (net.Conn, error) { - if !atomic.CompareAndSwapInt32(&firstConn, 1, 0) { - // If we allow gRPC to open a 2nd transport connection, then our RPCs - // might succeed if they're sent on that one. In the spirit of a - // partition, we'll return errors for the attempt to open a new - // connection (albeit for a TCP connection the error would come after - // a socket connect timeout). - return nil, errors.Errorf("No more connections for you. We're partitioned.") - } - - conn, err := net.DialTimeout("tcp", addr, timeout) - if err != nil { - return nil, err - } - transportConn := testutils.NewPartitionableConn(conn) - dialerCh <- transportConn - return transportConn, nil - }), - grpc.WithKeepaliveParams(cKeepalive), - ) - log.Infof(ctx, "dialing server") - conn, err := clientCtx.GRPCDial(remoteAddr).Connect(ctx) - if err != nil { - return err - } - defer func() { _ = conn.Close() }() - - // Create the heartbeat client. - log.Infof(ctx, "starting heartbeat client") - unlockedHeartbeatClient, err := NewTestingHeartbeatStreamClient(conn).PingStream(ctx) - if err != nil { - return err - } - heartbeatClient := &lockedPingStreamClient{ - TestingHeartbeatStream_PingStreamClient: unlockedHeartbeatClient, - } - - // Perform an initial request-response round trip. - log.Infof(ctx, "first ping") - request := PingRequest{ServerVersion: clientCtx.version.ServerVersion} - if err := heartbeatClient.Send(&request); err != nil { - return err - } - if _, err := heartbeatClient.Recv(); err != nil { - return err - } - - // Launch a goroutine to read from the channel continuously and - // a goroutine to write to the channel continuously. Both will - // exit when the channel breaks (either because of a partition - // or because the stopper stops). - go func() { - t := time.NewTicker(msgInterval) - defer t.Stop() - for { - <-t.C - log.Infof(ctx, "client send") - if err := heartbeatClient.Send(&request); err != nil { - return - } - } - }() - go func() { - for { - log.Infof(ctx, "client recv") - if _, err := heartbeatClient.Recv(); err != nil { - return - } - } - }() - - // Now partition either client->server, server->client, or both, and attempt - // to perform an RPC. We expect it to fail once the grpc keepalive fails to - // get a response from the server. - - transportConn := <-dialerCh - defer transportConn.Finish() - - if c.partitionC2S { - log.Infof(ctx, "partition C2S") - transportConn.PartitionC2S() - } - if c.partitionS2C { - log.Infof(ctx, "partition S2C") - transportConn.PartitionS2C() - } - - // Check whether the connection eventually closes. We may need to - // adjust this duration if the test gets flaky. - const retryDur = 3 * time.Second - errNotClosed := errors.New("conn not closed") - closedErr := retry.ForDuration(retryDur, func() error { - err := heartbeatClient.Send(&request) - if err == nil { - log.Infof(ctx, "expected send error, got no error") - return errNotClosed - } - if !grpcutil.IsClosedConnection(err) { - newErr := fmt.Errorf("expected closed connection error, found %v", err) - log.Infof(ctx, "%+v", newErr) - return newErr - } - return nil - }) - if c.expClose { - if closedErr != nil { - newErr := fmt.Errorf("expected closed connection, found %v", closedErr) - log.Infof(ctx, "%+v", newErr) - return newErr - } - } else { - if closedErr != errNotClosed { - newErr := fmt.Errorf("expected unclosed connection, found %v", closedErr) - log.Infof(ctx, "%+v", newErr) - return newErr - } - } - - log.Infof(ctx, "test done") - // If the DialOptions we passed to gRPC didn't prevent it from opening new - // connections, then next RPCs would succeed since gRPC reconnects the - // transport (and that would succeed here since we've only partitioned one - // connection). We could further test that the status reported by - // Context.ConnHealth() for the remote node moves to UNAVAILABLE because of - // the (application-level) heartbeats performed by rpc.Context, but the - // behavior of our heartbeats in the face of transport failures is - // sufficiently tested in TestHeartbeatHealthTransport. - return nil - } - // Run all the tests. var wg sync.WaitGroup wg.Add(len(testCases)) @@ -997,8 +806,8 @@ func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) { ctx := logtags.AddTag(context.Background(), testName, nil) log.Infof(ctx, "starting sub-test") - go func(c testCase) { - errCh <- errors.Wrapf(runTestCase(ctx, c), "%+v", c) + go func(c grpcKeepaliveTestCase) { + errCh <- errors.Wrapf(grpcRunKeepaliveTestCase(ctx, c), "%+v", c) wg.Done() }(c) } @@ -1011,7 +820,205 @@ func TestGRPCKeepaliveFailureFailsInflightRPCs(t *testing.T) { t.Errorf("%+v", err) } } +} + +type grpcKeepaliveTestCase struct { + cKeepalive, sKeepalive bool + partitionC2S, partitionS2C bool + expClose bool +} + +func grpcRunKeepaliveTestCase(testCtx context.Context, c grpcKeepaliveTestCase) error { + var cKeepalive keepalive.ClientParameters + if c.cKeepalive { + cKeepalive = clientTestingKeepalive + } + var sKeepalive keepalive.ServerParameters + if c.sKeepalive { + sKeepalive = serverTestingKeepalive + } + + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + ctx, cancel := stopper.WithCancelOnQuiesce(testCtx) + defer cancel() + + // Construct server with server-side keepalive. + log.Infof(ctx, "constructing server") + clock := hlc.NewClock(timeutil.Unix(0, 20).UnixNano, time.Nanosecond) + serverCtx := newTestContext(clock, stopper) + tlsConfig, err := serverCtx.GetServerTLSConfig() + if err != nil { + return err + } + s := grpc.NewServer( + grpc.Creds(credentials.NewTLS(tlsConfig)), + grpc.StatsHandler(&serverCtx.stats), + grpc.KeepaliveParams(sKeepalive), + ) + + // Create heartbeat service. This service will continuously + // read on its input stream and send on its output stream. + log.Infof(ctx, "creating heartbeat service") + const msgInterval = 10 * time.Millisecond + hss := &HeartbeatStreamService{ + HeartbeatService: HeartbeatService{ + clock: clock, + remoteClockMonitor: serverCtx.RemoteClocks, + clusterID: &serverCtx.ClusterID, + version: serverCtx.version, + }, + interval: msgInterval, + } + RegisterHeartbeatServer(s, hss) + RegisterTestingHeartbeatStreamServer(s, hss) + + ln, err := netutil.ListenAndServeGRPC(serverCtx.Stopper, s, util.TestAddr) + if err != nil { + return err + } + remoteAddr := ln.Addr().String() + + log.Infof(ctx, "setting up client") + clientCtx := newTestContext(clock, stopper) + // Disable automatic heartbeats. We'll send them by hand. + clientCtx.heartbeatInterval = math.MaxInt64 + + var firstConn int32 = 1 + + // We're going to open RPC transport connections using a dialer that returns + // PartitionableConns. We'll partition the first opened connection. + dialerCh := make(chan *testutils.PartitionableConn, 1) + clientCtx.AddTestingDialOpts( + grpc.WithDialer( + func(addr string, timeout time.Duration) (net.Conn, error) { + if !atomic.CompareAndSwapInt32(&firstConn, 1, 0) { + // If we allow gRPC to open a 2nd transport connection, then our RPCs + // might succeed if they're sent on that one. In the spirit of a + // partition, we'll return errors for the attempt to open a new + // connection (albeit for a TCP connection the error would come after + // a socket connect timeout). + return nil, errors.Errorf("No more connections for you. We're partitioned.") + } + + conn, err := net.DialTimeout("tcp", addr, timeout) + if err != nil { + return nil, err + } + transportConn := testutils.NewPartitionableConn(conn) + dialerCh <- transportConn + return transportConn, nil + }), + grpc.WithKeepaliveParams(cKeepalive), + ) + log.Infof(ctx, "dialing server") + conn, err := clientCtx.GRPCDial(remoteAddr).Connect(ctx) + if err != nil { + return err + } + defer func() { _ = conn.Close() }() + + // Create the heartbeat client. + log.Infof(ctx, "starting heartbeat client") + unlockedHeartbeatClient, err := NewTestingHeartbeatStreamClient(conn).PingStream(ctx) + if err != nil { + return err + } + heartbeatClient := &lockedPingStreamClient{ + TestingHeartbeatStream_PingStreamClient: unlockedHeartbeatClient, + } + + // Perform an initial request-response round trip. + log.Infof(ctx, "first ping") + request := PingRequest{ServerVersion: clientCtx.version.ServerVersion} + if err := heartbeatClient.Send(&request); err != nil { + return err + } + if _, err := heartbeatClient.Recv(); err != nil { + return err + } + + // Launch a goroutine to read from the channel continuously and + // a goroutine to write to the channel continuously. Both will + // exit when the channel breaks (either because of a partition + // or because the stopper stops). + go func() { + t := time.NewTicker(msgInterval) + defer t.Stop() + for { + <-t.C + log.Infof(ctx, "client send") + if err := heartbeatClient.Send(&request); err != nil { + return + } + } + }() + go func() { + for { + log.Infof(ctx, "client recv") + if _, err := heartbeatClient.Recv(); err != nil { + return + } + } + }() + + // Now partition either client->server, server->client, or both, and attempt + // to perform an RPC. We expect it to fail once the grpc keepalive fails to + // get a response from the server. + + transportConn := <-dialerCh + defer transportConn.Finish() + + if c.partitionC2S { + log.Infof(ctx, "partition C2S") + transportConn.PartitionC2S() + } + if c.partitionS2C { + log.Infof(ctx, "partition S2C") + transportConn.PartitionS2C() + } + // Check whether the connection eventually closes. We may need to + // adjust this duration if the test gets flaky. + const retryDur = 3 * time.Second + errNotClosed := errors.New("conn not closed") + closedErr := retry.ForDuration(retryDur, func() error { + err := heartbeatClient.Send(&request) + if err == nil { + log.Infof(ctx, "expected send error, got no error") + return errNotClosed + } + if !grpcutil.IsClosedConnection(err) { + newErr := fmt.Errorf("expected closed connection error, found %v", err) + log.Infof(ctx, "%+v", newErr) + return newErr + } + return nil + }) + if c.expClose { + if closedErr != nil { + newErr := fmt.Errorf("expected closed connection, found %v", closedErr) + log.Infof(ctx, "%+v", newErr) + return newErr + } + } else { + if closedErr != errNotClosed { + newErr := fmt.Errorf("expected unclosed connection, found %v", closedErr) + log.Infof(ctx, "%+v", newErr) + return newErr + } + } + + log.Infof(ctx, "test done") + // If the DialOptions we passed to gRPC didn't prevent it from opening new + // connections, then next RPCs would succeed since gRPC reconnects the + // transport (and that would succeed here since we've only partitioned one + // connection). We could further test that the status reported by + // Context.ConnHealth() for the remote node moves to UNAVAILABLE because of + // the (application-level) heartbeats performed by rpc.Context, but the + // behavior of our heartbeats in the face of transport failures is + // sufficiently tested in TestHeartbeatHealthTransport. + return nil } func TestClusterIDMismatch(t *testing.T) { From c9b31da92007a121a08d04f18a10df8889755b56 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 24 Apr 2019 15:06:55 +0200 Subject: [PATCH 4/4] lint: add a linter to forbid uses of t.Parallel() This suggests using `sync.WaitGroup` instead. Release note: None --- pkg/sql/logictest/logic.go | 2 +- pkg/testutils/lint/lint_test.go | 36 +++++++++++++++++++++++++++++++++ pkg/util/fast_int_map_test.go | 2 +- pkg/util/fast_int_set_test.go | 2 +- 4 files changed, 39 insertions(+), 3 deletions(-) diff --git a/pkg/sql/logictest/logic.go b/pkg/sql/logictest/logic.go index 42cf44fe84ec..bdec7f2fc935 100644 --- a/pkg/sql/logictest/logic.go +++ b/pkg/sql/logictest/logic.go @@ -2170,7 +2170,7 @@ func RunLogicTest(t *testing.T, globs ...string) { // the batch size is a global variable. // TODO(jordan, radu): make sqlbase.kvBatchSize non-global to fix this. if filepath.Base(path) != "select_index_span_ranges" { - t.Parallel() + t.Parallel() // SAFE FOR TESTING (this comments satisfies the linter) } } lt := logicTest{ diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 67c90189d366..be0fa9fa78a0 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -696,6 +696,42 @@ func TestLint(t *testing.T) { } }) + t.Run("TestTParallel", func(t *testing.T) { + t.Parallel() + cmd, stderr, filter, err := dirCmd( + pkgDir, + "git", + "grep", + "-nE", + `\.Parallel\(\)`, + "--", + "*.go", + ":!testutils/lint/*.go", + ) + if err != nil { + t.Fatal(err) + } + + if err := cmd.Start(); err != nil { + t.Fatal(err) + } + + if err := stream.ForEach(stream.Sequence( + filter, + stream.GrepNot(`// SAFE FOR TESTING`), + ), func(s string) { + t.Errorf("\n%s <- forbidden, use a sync.WaitGroup instead (cf https://github.com/golang/go/issues/31651)", s) + }); err != nil { + t.Error(err) + } + + if err := cmd.Wait(); err != nil { + if out := stderr.String(); len(out) > 0 { + t.Fatalf("err=%s, stderr=%s", err, out) + } + } + }) + t.Run("TestProtoMarshal", func(t *testing.T) { t.Parallel() cmd, stderr, filter, err := dirCmd( diff --git a/pkg/util/fast_int_map_test.go b/pkg/util/fast_int_map_test.go index f7df795f62e8..843ad0154e84 100644 --- a/pkg/util/fast_int_map_test.go +++ b/pkg/util/fast_int_map_test.go @@ -33,7 +33,7 @@ func TestFastIntMap(t *testing.T) { } for _, tc := range cases { t.Run(fmt.Sprintf("%d-%d", tc.keyRange, tc.valRange), func(t *testing.T) { - t.Parallel() + t.Parallel() // SAFE FOR TESTING (this comment is for the linter) rng, _ := randutil.NewPseudoRand() var fm FastIntMap m := make(map[int]int) diff --git a/pkg/util/fast_int_set_test.go b/pkg/util/fast_int_set_test.go index e3742ee92ec6..f5a59183e6fe 100644 --- a/pkg/util/fast_int_set_test.go +++ b/pkg/util/fast_int_set_test.go @@ -26,7 +26,7 @@ func TestFastIntSet(t *testing.T) { for _, mVal := range []int{1, 8, 30, smallCutoff, 2 * smallCutoff, 4 * smallCutoff} { m := mVal t.Run(fmt.Sprintf("%d", m), func(t *testing.T) { - t.Parallel() + t.Parallel() // SAFE FOR TESTING (this comment is for the linter) rng, _ := randutil.NewPseudoRand() in := make([]bool, m) forEachRes := make([]bool, m)