From 69ed211b3d07827e5308060a5224813c88603169 Mon Sep 17 00:00:00 2001 From: Tim Ross Date: Thu, 5 Dec 2024 17:11:00 -0500 Subject: [PATCH] Attempt to reduce flakiness of integration tests Closes #47156. All of the tests suffering from issues dialing hosts, and failing with a `failed to dial target host` error were incorrectly waiting for nodes to become visible before establishing connections. The main culprit for most of the failures was `waitForNodesToRegister`, though a few tests had a very similar hand rolled variant, which incorrectly returned when the nodes appeard in Auth. However, since the Proxy is the one performing dialing, they should have waited for the nodes to appear in the Proxy. To resolve, `waitForNodesToRegister` and all hand rolled equivalents have been removed in favor of `(TeleInstance) WaitForNodeCount` which correctly uses the `CachingAccessPoint` of the RemoteSite instead of `GetClient`. Additionally, `helpers.WaitForNodeCount` was updated to validate that the node watcher used for routing in the Proxy also contained the expected number of nodes. --- integration/helpers/helpers.go | 5 +- integration/helpers/instance.go | 50 +++++++++ integration/helpers/trustedclusters.go | 33 ------ integration/integration_test.go | 137 +++++++------------------ integration/proxy/proxy_helpers.go | 2 +- integration/proxy/proxy_test.go | 6 +- 6 files changed, 92 insertions(+), 141 deletions(-) diff --git a/integration/helpers/helpers.go b/integration/helpers/helpers.go index 7d1b7e58b819f..fdede24b0209a 100644 --- a/integration/helpers/helpers.go +++ b/integration/helpers/helpers.go @@ -132,10 +132,7 @@ func ExternalSSHCommand(o CommandOptions) (*exec.Cmd, error) { } // Create an exec.Command and tell it where to find the SSH agent. - cmd, err := exec.Command(sshpath, execArgs...), nil - if err != nil { - return nil, trace.Wrap(err) - } + cmd := exec.Command(sshpath, execArgs...) cmd.Env = []string{fmt.Sprintf("SSH_AUTH_SOCK=%v", o.SocketPath)} return cmd, nil diff --git a/integration/helpers/instance.go b/integration/helpers/instance.go index 275837306b9d3..cdff68ef04e2a 100644 --- a/integration/helpers/instance.go +++ b/integration/helpers/instance.go @@ -45,8 +45,10 @@ import ( "github.com/gravitational/teleport/api/breaker" clientproto "github.com/gravitational/teleport/api/client/proto" + apidefaults "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" "github.com/gravitational/teleport/api/utils/keys" + "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/auth/authclient" "github.com/gravitational/teleport/lib/auth/keygen" "github.com/gravitational/teleport/lib/auth/state" @@ -1765,3 +1767,51 @@ func (i *TeleInstance) StopAll() error { i.Log.Infof("Stopped all teleport services for site %q", i.Secrets.SiteName) return trace.NewAggregate(errors...) } + +// WaitForNodeCount waits for a certain number of nodes in the provided cluster +// to be visible to the Proxy. This should be called prior to any client dialing +// of nodes to be sure that the node is registered and routable. +func (i *TeleInstance) WaitForNodeCount(ctx context.Context, cluster string, count int) error { + const ( + deadline = time.Second * 30 + iterWaitTime = time.Second + ) + + err := retryutils.RetryStaticFor(deadline, iterWaitTime, func() error { + site, err := i.Tunnel.GetSite(cluster) + if err != nil { + return trace.Wrap(err) + } + + // Validate that the site cache contains the expected count. + accessPoint, err := site.CachingAccessPoint() + if err != nil { + return trace.Wrap(err) + } + + nodes, err := accessPoint.GetNodes(ctx, apidefaults.Namespace) + if err != nil { + return trace.Wrap(err) + } + if len(nodes) != count { + return trace.BadParameter("cache contained %v nodes, but wanted to find %v nodes", len(nodes), count) + } + + // Validate that the site watcher contains the expected count. + watcher, err := site.NodeWatcher() + if err != nil { + return trace.Wrap(err) + } + + if watcher.ResourceCount() != count { + return trace.BadParameter("node watcher contained %v nodes, but wanted to find %v nodes", watcher.ResourceCount(), count) + } + + return nil + }) + if err != nil { + return trace.Wrap(err) + } + + return nil +} diff --git a/integration/helpers/trustedclusters.go b/integration/helpers/trustedclusters.go index a883fb8635a9e..1b3f43b61507c 100644 --- a/integration/helpers/trustedclusters.go +++ b/integration/helpers/trustedclusters.go @@ -30,9 +30,7 @@ import ( "github.com/stretchr/testify/require" "github.com/gravitational/teleport" - "github.com/gravitational/teleport/api/defaults" "github.com/gravitational/teleport/api/types" - "github.com/gravitational/teleport/api/utils/retryutils" "github.com/gravitational/teleport/lib/auth" "github.com/gravitational/teleport/lib/reversetunnelclient" ) @@ -112,37 +110,6 @@ func WaitForClusters(tun reversetunnelclient.Server, expected int) func() bool { } } -// WaitForNodeCount waits for a certain number of nodes to show up in the remote site. -func WaitForNodeCount(ctx context.Context, t *TeleInstance, clusterName string, count int) error { - const ( - deadline = time.Second * 30 - iterWaitTime = time.Second - ) - - err := retryutils.RetryStaticFor(deadline, iterWaitTime, func() error { - remoteSite, err := t.Tunnel.GetSite(clusterName) - if err != nil { - return trace.Wrap(err) - } - accessPoint, err := remoteSite.CachingAccessPoint() - if err != nil { - return trace.Wrap(err) - } - nodes, err := accessPoint.GetNodes(ctx, defaults.Namespace) - if err != nil { - return trace.Wrap(err) - } - if len(nodes) == count { - return nil - } - return trace.BadParameter("found %v nodes, but wanted to find %v nodes", len(nodes), count) - }) - if err != nil { - return trace.Wrap(err) - } - return nil -} - // WaitForActiveTunnelConnections waits for remote cluster to report a minimum number of active connections func WaitForActiveTunnelConnections(t *testing.T, tunnel reversetunnelclient.Server, clusterName string, expectedCount int) { require.EventuallyWithT(t, func(t *assert.CollectT) { diff --git a/integration/integration_test.go b/integration/integration_test.go index f49dfb06f5e0c..e1f3e9e07796b 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -442,27 +442,9 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { ctx := context.Background() - // wait 10 seconds for both nodes to show up, otherwise + // wait for both nodes to show up, otherwise // we'll have trouble connecting to the node below. - waitForNodes := func(site authclient.ClientI, count int) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - for { - select { - case <-tickCh: - nodesInSite, err := site.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - if got, want := len(nodesInSite), count; got == want { - return nil - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find %v nodes", count) - } - } - } - err = waitForNodes(site, 2) + err = teleport.WaitForNodeCount(ctx, helpers.Site, 2) require.NoError(t, err) // should have no sessions: @@ -796,8 +778,6 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) { teleportSvr := suite.newTeleport(t, nil, true) defer teleportSvr.StopAll() - site := teleportSvr.GetSiteAPI(helpers.Site) - // addNode adds a node to the teleport instance, returning its uuid. // All nodes added this way have the same hostname. addNode := func() (string, error) { @@ -819,36 +799,11 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) { uuid1, err := addNode() require.NoError(t, err) - uuid2, err := addNode() + _, err = addNode() require.NoError(t, err) - // wait up to 10 seconds for supplied node names to show up. - waitForNodes := func(site authclient.ClientI, nodes ...string) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - Outer: - for _, nodeName := range nodes { - for { - select { - case <-tickCh: - nodesInSite, err := site.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - for _, node := range nodesInSite { - if node.GetName() == nodeName { - continue Outer - } - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find node %s", nodeName) - } - } - } - return nil - } - - err = waitForNodes(site, uuid1, uuid2) + // wait for supplied node names to show up. + err = teleportSvr.WaitForNodeCount(ctx, helpers.Site, 3) require.NoError(t, err) // attempting to run a command by hostname should generate NodeIsAmbiguous error. @@ -2150,7 +2105,8 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT tc.concurrentConns = 1 } - waitForNodesToRegister(t, teleport, helpers.Site) + err = teleport.WaitForNodeCount(ctx, helpers.Site, 1) + require.NoError(t, err) asyncErrors := make(chan error, 1) @@ -2169,7 +2125,11 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT tc.clientConfigOpts(&cc) } cl, err := teleport.NewClient(cc) - require.NoError(t, err) + if err != nil { + asyncErrors <- err + return + } + cl.Stdout = person cl.Stdin = person @@ -3139,6 +3099,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus cmd := []string{"echo", "hello world"} + // Wait for nodes to be visible before attempting connections + err = main.WaitForNodeCount(ctx, clusterAux, 2) + require.NoError(t, err) + // Try and connect to a node in the Aux cluster from the Main cluster using // direct dialing. creds, err := helpers.GenerateUserCreds(helpers.UserCredsRequest{ @@ -3224,6 +3188,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus require.Eventually(t, helpers.WaitForClusters(main.Tunnel, 1), 10*time.Second, 1*time.Second, "Two clusters do not see each other: tunnels are not working.") + // Wait for nodes to be visible before attempting connections + err = main.WaitForNodeCount(ctx, clusterAux, 2) + require.NoError(t, err) + // connection and client should recover and work again output = &bytes.Buffer{} tc.Stdout = output @@ -3630,7 +3598,7 @@ func testTrustedTunnelNode(t *testing.T, suite *integrationTestSuite) { "Two clusters do not see each other: tunnels are not working.") // Wait for both nodes to show up before attempting to dial to them. - err = helpers.WaitForNodeCount(ctx, main, clusterAux, 2) + err = main.WaitForNodeCount(ctx, clusterAux, 2) require.NoError(t, err) cmd := []string{"echo", "hello world"} @@ -4026,7 +3994,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) { helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1) helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1) - waitForNodesToRegister(t, main, "cluster-remote") + err = main.WaitForNodeCount(ctx, "cluster-remote", 1) + require.NoError(t, err) // execute the connection via first proxy cfg := helpers.ClientConfig{ @@ -4077,7 +4046,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) { helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1) helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1) - waitForNodesToRegister(t, main, "cluster-remote") + err = main.WaitForNodeCount(ctx, "cluster-remote", 1) + require.NoError(t, err) // Requests going via main proxy should succeed. output, err = runCommand(t, main, []string{"echo", "hello world"}, cfg, 1) @@ -4859,11 +4829,8 @@ func testProxyHostKeyCheck(t *testing.T, suite *integrationTestSuite) { require.NoError(t, err) // Wait for the node to be visible before continuing. - require.EventuallyWithT(t, func(t *assert.CollectT) { - found, err := clt.GetNodes(context.Background(), defaults.Namespace) - assert.NoError(t, err) - assert.Len(t, found, 2) - }, 10*time.Second, 100*time.Millisecond) + err = instance.WaitForNodeCount(context.Background(), helpers.Site, 2) + require.NoError(t, err) _, err = runCommand(t, instance, []string{"echo hello"}, clientConfig, 1) @@ -5956,27 +5923,9 @@ func testList(t *testing.T, suite *integrationTestSuite) { clt := teleport.GetSiteAPI(helpers.Site) require.NotNil(t, clt) - // Wait 10 seconds for both nodes to show up to make sure they both have + // Wait for both nodes to show up to make sure they both have // registered themselves. - waitForNodes := func(clt authclient.ClientI, count int) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - for { - select { - case <-tickCh: - nodesInCluster, err := clt.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - if got, want := len(nodesInCluster), count; got == want { - return nil - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find %v nodes", count) - } - } - } - err = waitForNodes(clt, 2) + err = teleport.WaitForNodeCount(ctx, helpers.Site, 2) require.NoError(t, err) tests := []struct { @@ -6158,22 +6107,6 @@ func testCmdLabels(t *testing.T, suite *integrationTestSuite) { } } -func waitForNodesToRegister(t *testing.T, teleport *helpers.TeleInstance, site string) { - t.Helper() - require.EventuallyWithT(t, func(t *assert.CollectT) { - // once the tunnel is established we need to wait until we have a - // connection to the remote auth - site := teleport.GetSiteAPI(site) - if !assert.NotNil(t, site) { - return - } - // we need to wait until we know about the node because direct dial to - // unregistered servers is no longer supported - _, err := site.GetNode(context.Background(), defaults.Namespace, teleport.Config.HostUUID) - assert.NoError(t, err) - }, time.Second*30, 250*time.Millisecond) -} - // TestDataTransfer makes sure that a "session.data" event is emitted at the // end of a session that matches the amount of data that was transferred. func testDataTransfer(t *testing.T, suite *integrationTestSuite) { @@ -6187,6 +6120,9 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) { main := suite.newTeleport(t, nil, true) defer main.StopAll() + err := main.WaitForNodeCount(context.Background(), helpers.Site, 1) + require.NoError(t, err) + // Create a client to the above Teleport cluster. clientConfig := helpers.ClientConfig{ Login: suite.Me.Username, @@ -6195,8 +6131,6 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) { Port: helpers.Port(t, main.SSH), } - waitForNodesToRegister(t, main, helpers.Site) - // Write 1 MB to stdout. command := []string{"dd", "if=/dev/zero", "bs=1024", "count=1024"} output, err := runCommand(t, main, command, clientConfig, 1) @@ -7155,6 +7089,7 @@ func (s *integrationTestSuite) defaultServiceConfig() *servicecfg.Config { cfg.Log = s.Log cfg.CircuitBreakerConfig = breaker.NoopBreakerConfig() cfg.InstanceMetadataClient = imds.NewDisabledIMDSClient() + cfg.DebugService.Enabled = false return cfg } @@ -7778,7 +7713,8 @@ func testModeratedSFTP(t *testing.T, suite *integrationTestSuite) { _, err = authServer.CreateUser(ctx, moderatorUser) require.NoError(t, err) - waitForNodesToRegister(t, instance, helpers.Site) + err = instance.WaitForNodeCount(context.Background(), helpers.Site, 1) + require.NoError(t, err) // Start a shell so a moderated session is created peerClient, err := instance.NewClient(helpers.ClientConfig{ @@ -8036,7 +7972,8 @@ func testSFTP(t *testing.T, suite *integrationTestSuite) { teleport.StopAll() }) - waitForNodesToRegister(t, teleport, helpers.Site) + err := teleport.WaitForNodeCount(context.Background(), helpers.Site, 1) + require.NoError(t, err) teleportClient, err := teleport.NewClient(helpers.ClientConfig{ Login: suite.Me.Username, diff --git a/integration/proxy/proxy_helpers.go b/integration/proxy/proxy_helpers.go index 422695a363d45..e0e3c7b587224 100644 --- a/integration/proxy/proxy_helpers.go +++ b/integration/proxy/proxy_helpers.go @@ -216,7 +216,7 @@ func (p *Suite) addNodeToLeafCluster(t *testing.T, tunnelNodeHostname string) { "Two clusters do not see each other: tunnels are not working.") // Wait for both nodes to show up before attempting to dial to them. - err = helpers.WaitForNodeCount(context.Background(), p.root, p.leaf.Secrets.SiteName, 2) + err = p.root.WaitForNodeCount(context.Background(), p.leaf.Secrets.SiteName, 2) require.NoError(t, err) } diff --git a/integration/proxy/proxy_test.go b/integration/proxy/proxy_test.go index 43d6254911a3b..0dcf986d70109 100644 --- a/integration/proxy/proxy_test.go +++ b/integration/proxy/proxy_test.go @@ -1614,7 +1614,7 @@ func TestALPNProxyHTTPProxyNoProxyDial(t *testing.T) { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*30)) defer cancel() - err = helpers.WaitForNodeCount(ctx, rc, "root.example.com", 1) + err = rc.WaitForNodeCount(ctx, "root.example.com", 1) require.NoError(t, err) require.Zero(t, ph.Count()) @@ -1624,7 +1624,7 @@ func TestALPNProxyHTTPProxyNoProxyDial(t *testing.T) { require.NoError(t, os.Unsetenv("no_proxy")) _, err = rc.StartNode(makeNodeConfig("second-root-node", rcProxyAddr)) require.NoError(t, err) - err = helpers.WaitForNodeCount(ctx, rc, "root.example.com", 2) + err = rc.WaitForNodeCount(ctx, "root.example.com", 2) require.NoError(t, err) require.NotZero(t, ph.Count()) @@ -1723,7 +1723,7 @@ func TestALPNProxyHTTPProxyBasicAuthDial(t *testing.T) { startErrC <- err }() require.NoError(t, <-startErrC) - require.NoError(t, helpers.WaitForNodeCount(context.Background(), rc, rc.Secrets.SiteName, 1)) + require.NoError(t, rc.WaitForNodeCount(context.Background(), rc.Secrets.SiteName, 1)) require.Greater(t, ph.Count(), 0) }