diff --git a/integration/helpers/trustedclusters.go b/integration/helpers/trustedclusters.go index a883fb8635a9e..a50a1fa2807a5 100644 --- a/integration/helpers/trustedclusters.go +++ b/integration/helpers/trustedclusters.go @@ -124,18 +124,32 @@ func WaitForNodeCount(ctx context.Context, t *TeleInstance, clusterName string, if err != nil { return trace.Wrap(err) } + + // Validate that the site cache contains the expected count. accessPoint, err := remoteSite.CachingAccessPoint() if err != nil { return trace.Wrap(err) } + nodes, err := accessPoint.GetNodes(ctx, defaults.Namespace) if err != nil { return trace.Wrap(err) } - if len(nodes) == count { - return nil + if len(nodes) != count { + return trace.BadParameter("cache contained %v nodes, but wanted to find %v nodes", len(nodes), count) } - return trace.BadParameter("found %v nodes, but wanted to find %v nodes", len(nodes), count) + + // Validate that the site watcher contains the expected count. + watcher, err := remoteSite.NodeWatcher() + if err != nil { + return trace.Wrap(err) + } + + if watcher.ResourceCount() != count { + return trace.BadParameter("node watcher contained %v nodes, but wanted to find %v nodes", len(nodes), count) + } + + return nil }) if err != nil { return trace.Wrap(err) diff --git a/integration/integration_test.go b/integration/integration_test.go index b00e47d7fa404..7fa6cc2254c4f 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -442,27 +442,9 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) { ctx := context.Background() - // wait 10 seconds for both nodes to show up, otherwise + // wait for both nodes to show up, otherwise // we'll have trouble connecting to the node below. - waitForNodes := func(site authclient.ClientI, count int) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - for { - select { - case <-tickCh: - nodesInSite, err := site.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - if got, want := len(nodesInSite), count; got == want { - return nil - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find %v nodes", count) - } - } - } - err = waitForNodes(site, 2) + err = helpers.WaitForNodeCount(ctx, teleport, helpers.Site, 2) require.NoError(t, err) // should have no sessions: @@ -796,8 +778,6 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) { teleportSvr := suite.newTeleport(t, nil, true) defer teleportSvr.StopAll() - site := teleportSvr.GetSiteAPI(helpers.Site) - // addNode adds a node to the teleport instance, returning its uuid. // All nodes added this way have the same hostname. addNode := func() (string, error) { @@ -819,36 +799,11 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) { uuid1, err := addNode() require.NoError(t, err) - uuid2, err := addNode() + _, err = addNode() require.NoError(t, err) - // wait up to 10 seconds for supplied node names to show up. - waitForNodes := func(site authclient.ClientI, nodes ...string) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - Outer: - for _, nodeName := range nodes { - for { - select { - case <-tickCh: - nodesInSite, err := site.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - for _, node := range nodesInSite { - if node.GetName() == nodeName { - continue Outer - } - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find node %s", nodeName) - } - } - } - return nil - } - - err = waitForNodes(site, uuid1, uuid2) + // wait for supplied node names to show up. + err = helpers.WaitForNodeCount(ctx, teleportSvr, helpers.Site, 3) require.NoError(t, err) // attempting to run a command by hostname should generate NodeIsAmbiguous error. @@ -2150,7 +2105,8 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT tc.concurrentConns = 1 } - waitForNodesToRegister(t, teleport, helpers.Site) + err = helpers.WaitForNodeCount(ctx, teleport, helpers.Site, 1) + require.NoError(t, err) asyncErrors := make(chan error, 1) @@ -2169,7 +2125,11 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT tc.clientConfigOpts(&cc) } cl, err := teleport.NewClient(cc) - require.NoError(t, err) + if err != nil { + asyncErrors <- err + return + } + cl.Stdout = person cl.Stdin = person @@ -3140,6 +3100,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus cmd := []string{"echo", "hello world"} + // Wait for nodes to be visible before attempting connections + err = helpers.WaitForNodeCount(ctx, main, clusterAux, 2) + require.NoError(t, err) + // Try and connect to a node in the Aux cluster from the Main cluster using // direct dialing. creds, err := helpers.GenerateUserCreds(helpers.UserCredsRequest{ @@ -3225,6 +3189,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus require.Eventually(t, helpers.WaitForClusters(main.Tunnel, 1), 10*time.Second, 1*time.Second, "Two clusters do not see each other: tunnels are not working.") + // Wait for nodes to be visible before attempting connections + err = helpers.WaitForNodeCount(ctx, main, clusterAux, 2) + require.NoError(t, err) + // connection and client should recover and work again output = &bytes.Buffer{} tc.Stdout = output @@ -4027,7 +3995,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) { helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1) helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1) - waitForNodesToRegister(t, main, "cluster-remote") + err = helpers.WaitForNodeCount(ctx, main, "cluster-remote", 1) + require.NoError(t, err) // execute the connection via first proxy cfg := helpers.ClientConfig{ @@ -4078,7 +4047,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) { helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1) helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1) - waitForNodesToRegister(t, main, "cluster-remote") + err = helpers.WaitForNodeCount(ctx, main, "cluster-remote", 1) + require.NoError(t, err) // Requests going via main proxy should succeed. output, err = runCommand(t, main, []string{"echo", "hello world"}, cfg, 1) @@ -4860,11 +4830,8 @@ func testProxyHostKeyCheck(t *testing.T, suite *integrationTestSuite) { require.NoError(t, err) // Wait for the node to be visible before continuing. - require.EventuallyWithT(t, func(t *assert.CollectT) { - found, err := clt.GetNodes(context.Background(), defaults.Namespace) - assert.NoError(t, err) - assert.Len(t, found, 2) - }, 10*time.Second, 100*time.Millisecond) + err = helpers.WaitForNodeCount(context.Background(), instance, helpers.Site, 1) + require.NoError(t, err) _, err = runCommand(t, instance, []string{"echo hello"}, clientConfig, 1) @@ -5957,27 +5924,9 @@ func testList(t *testing.T, suite *integrationTestSuite) { clt := teleport.GetSiteAPI(helpers.Site) require.NotNil(t, clt) - // Wait 10 seconds for both nodes to show up to make sure they both have + // Wait for both nodes to show up to make sure they both have // registered themselves. - waitForNodes := func(clt authclient.ClientI, count int) error { - tickCh := time.Tick(500 * time.Millisecond) - stopCh := time.After(10 * time.Second) - for { - select { - case <-tickCh: - nodesInCluster, err := clt.GetNodes(ctx, defaults.Namespace) - if err != nil && !trace.IsNotFound(err) { - return trace.Wrap(err) - } - if got, want := len(nodesInCluster), count; got == want { - return nil - } - case <-stopCh: - return trace.BadParameter("waited 10s, did find %v nodes", count) - } - } - } - err = waitForNodes(clt, 2) + err = helpers.WaitForNodeCount(ctx, teleport, helpers.Site, 2) require.NoError(t, err) tests := []struct { @@ -6159,22 +6108,6 @@ func testCmdLabels(t *testing.T, suite *integrationTestSuite) { } } -func waitForNodesToRegister(t *testing.T, teleport *helpers.TeleInstance, site string) { - t.Helper() - require.EventuallyWithT(t, func(t *assert.CollectT) { - // once the tunnel is established we need to wait until we have a - // connection to the remote auth - site := teleport.GetSiteAPI(site) - if !assert.NotNil(t, site) { - return - } - // we need to wait until we know about the node because direct dial to - // unregistered servers is no longer supported - _, err := site.GetNode(context.Background(), defaults.Namespace, teleport.Config.HostUUID) - assert.NoError(t, err) - }, time.Second*30, 250*time.Millisecond) -} - // TestDataTransfer makes sure that a "session.data" event is emitted at the // end of a session that matches the amount of data that was transferred. func testDataTransfer(t *testing.T, suite *integrationTestSuite) { @@ -6188,6 +6121,9 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) { main := suite.newTeleport(t, nil, true) defer main.StopAll() + err := helpers.WaitForNodeCount(context.Background(), main, helpers.Site, 1) + require.NoError(t, err) + // Create a client to the above Teleport cluster. clientConfig := helpers.ClientConfig{ Login: suite.Me.Username, @@ -6196,8 +6132,6 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) { Port: helpers.Port(t, main.SSH), } - waitForNodesToRegister(t, main, helpers.Site) - // Write 1 MB to stdout. command := []string{"dd", "if=/dev/zero", "bs=1024", "count=1024"} output, err := runCommand(t, main, command, clientConfig, 1) @@ -7779,7 +7713,8 @@ func testModeratedSFTP(t *testing.T, suite *integrationTestSuite) { _, err = authServer.CreateUser(ctx, moderatorUser) require.NoError(t, err) - waitForNodesToRegister(t, instance, helpers.Site) + err = helpers.WaitForNodeCount(context.Background(), instance, helpers.Site, 1) + require.NoError(t, err) // Start a shell so a moderated session is created peerClient, err := instance.NewClient(helpers.ClientConfig{ @@ -8037,7 +7972,8 @@ func testSFTP(t *testing.T, suite *integrationTestSuite) { teleport.StopAll() }) - waitForNodesToRegister(t, teleport, helpers.Site) + err := helpers.WaitForNodeCount(context.Background(), teleport, helpers.Site, 1) + require.NoError(t, err) teleportClient, err := teleport.NewClient(helpers.ClientConfig{ Login: suite.Me.Username,