Skip to content

Commit

Permalink
Attempt to reduce flakiness of integration tests
Browse files Browse the repository at this point in the history
Closes #47156.

All of the tests suffering from issues dialing hosts, and failing
with a `failed to dial target host` error were incorrectly waiting
for nodes to become visible before establishing connections. The
main culprit for most of the failures was `waitForNodesToRegister`,
though a few tests had a very similar hand rolled variant, which
incorrectly returned when the nodes appeard in Auth. However, since
the Proxy is the one performing dialing, they should have waited
for the nodes to appear in the Proxy.

To resolve, `waitForNodesToRegister` and all hand rolled equivalents
have been removed in favor of `helpers.WaitForNodeCount` which
correctly uses the `CachingAccessPoint` of the RemoteSite instead
of `GetClient`. Additionally, `helpers.WaitForNodeCount` was updated
to validate that the node watcher used for routing in the Proxy
also contained the expected number of nodes.
  • Loading branch information
rosstimothy committed Dec 5, 2024
1 parent 8e723cc commit 8779d70
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 102 deletions.
20 changes: 17 additions & 3 deletions integration/helpers/trustedclusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,32 @@ func WaitForNodeCount(ctx context.Context, t *TeleInstance, clusterName string,
if err != nil {
return trace.Wrap(err)
}

// Validate that the site cache contains the expected count.
accessPoint, err := remoteSite.CachingAccessPoint()
if err != nil {
return trace.Wrap(err)
}

nodes, err := accessPoint.GetNodes(ctx, defaults.Namespace)
if err != nil {
return trace.Wrap(err)
}
if len(nodes) == count {
return nil
if len(nodes) != count {
return trace.BadParameter("cache contained %v nodes, but wanted to find %v nodes", len(nodes), count)
}
return trace.BadParameter("found %v nodes, but wanted to find %v nodes", len(nodes), count)

// Validate that the site watcher contains the expected count.
watcher, err := remoteSite.NodeWatcher()
if err != nil {
return trace.Wrap(err)
}

if watcher.ResourceCount() != count {
return trace.BadParameter("node watcher contained %v nodes, but wanted to find %v nodes", len(nodes), count)
}

return nil
})
if err != nil {
return trace.Wrap(err)
Expand Down
134 changes: 35 additions & 99 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,27 +442,9 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) {

ctx := context.Background()

// wait 10 seconds for both nodes to show up, otherwise
// wait for both nodes to show up, otherwise
// we'll have trouble connecting to the node below.
waitForNodes := func(site authclient.ClientI, count int) error {
tickCh := time.Tick(500 * time.Millisecond)
stopCh := time.After(10 * time.Second)
for {
select {
case <-tickCh:
nodesInSite, err := site.GetNodes(ctx, defaults.Namespace)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
if got, want := len(nodesInSite), count; got == want {
return nil
}
case <-stopCh:
return trace.BadParameter("waited 10s, did find %v nodes", count)
}
}
}
err = waitForNodes(site, 2)
err = helpers.WaitForNodeCount(ctx, teleport, helpers.Site, 2)
require.NoError(t, err)

// should have no sessions:
Expand Down Expand Up @@ -796,8 +778,6 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) {
teleportSvr := suite.newTeleport(t, nil, true)
defer teleportSvr.StopAll()

site := teleportSvr.GetSiteAPI(helpers.Site)

// addNode adds a node to the teleport instance, returning its uuid.
// All nodes added this way have the same hostname.
addNode := func() (string, error) {
Expand All @@ -819,36 +799,11 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) {
uuid1, err := addNode()
require.NoError(t, err)

uuid2, err := addNode()
_, err = addNode()
require.NoError(t, err)

// wait up to 10 seconds for supplied node names to show up.
waitForNodes := func(site authclient.ClientI, nodes ...string) error {
tickCh := time.Tick(500 * time.Millisecond)
stopCh := time.After(10 * time.Second)
Outer:
for _, nodeName := range nodes {
for {
select {
case <-tickCh:
nodesInSite, err := site.GetNodes(ctx, defaults.Namespace)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
for _, node := range nodesInSite {
if node.GetName() == nodeName {
continue Outer
}
}
case <-stopCh:
return trace.BadParameter("waited 10s, did find node %s", nodeName)
}
}
}
return nil
}

err = waitForNodes(site, uuid1, uuid2)
// wait for supplied node names to show up.
err = helpers.WaitForNodeCount(ctx, teleportSvr, helpers.Site, 3)
require.NoError(t, err)

// attempting to run a command by hostname should generate NodeIsAmbiguous error.
Expand Down Expand Up @@ -2150,7 +2105,8 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT
tc.concurrentConns = 1
}

waitForNodesToRegister(t, teleport, helpers.Site)
err = helpers.WaitForNodeCount(ctx, teleport, helpers.Site, 1)
require.NoError(t, err)

asyncErrors := make(chan error, 1)

Expand All @@ -2169,7 +2125,11 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT
tc.clientConfigOpts(&cc)
}
cl, err := teleport.NewClient(cc)
require.NoError(t, err)
if err != nil {
asyncErrors <- err
return
}

cl.Stdout = person
cl.Stdin = person

Expand Down Expand Up @@ -3140,6 +3100,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus

cmd := []string{"echo", "hello world"}

// Wait for nodes to be visible before attempting connections
err = helpers.WaitForNodeCount(ctx, main, clusterAux, 2)
require.NoError(t, err)

// Try and connect to a node in the Aux cluster from the Main cluster using
// direct dialing.
creds, err := helpers.GenerateUserCreds(helpers.UserCredsRequest{
Expand Down Expand Up @@ -3225,6 +3189,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus
require.Eventually(t, helpers.WaitForClusters(main.Tunnel, 1), 10*time.Second, 1*time.Second,
"Two clusters do not see each other: tunnels are not working.")

// Wait for nodes to be visible before attempting connections
err = helpers.WaitForNodeCount(ctx, main, clusterAux, 2)
require.NoError(t, err)

// connection and client should recover and work again
output = &bytes.Buffer{}
tc.Stdout = output
Expand Down Expand Up @@ -4027,7 +3995,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) {
helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1)
helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1)

waitForNodesToRegister(t, main, "cluster-remote")
err = helpers.WaitForNodeCount(ctx, main, "cluster-remote", 1)
require.NoError(t, err)

// execute the connection via first proxy
cfg := helpers.ClientConfig{
Expand Down Expand Up @@ -4078,7 +4047,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) {
helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1)
helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1)

waitForNodesToRegister(t, main, "cluster-remote")
err = helpers.WaitForNodeCount(ctx, main, "cluster-remote", 1)
require.NoError(t, err)

// Requests going via main proxy should succeed.
output, err = runCommand(t, main, []string{"echo", "hello world"}, cfg, 1)
Expand Down Expand Up @@ -4860,11 +4830,8 @@ func testProxyHostKeyCheck(t *testing.T, suite *integrationTestSuite) {
require.NoError(t, err)

// Wait for the node to be visible before continuing.
require.EventuallyWithT(t, func(t *assert.CollectT) {
found, err := clt.GetNodes(context.Background(), defaults.Namespace)
assert.NoError(t, err)
assert.Len(t, found, 2)
}, 10*time.Second, 100*time.Millisecond)
err = helpers.WaitForNodeCount(context.Background(), instance, helpers.Site, 1)
require.NoError(t, err)

_, err = runCommand(t, instance, []string{"echo hello"}, clientConfig, 1)

Expand Down Expand Up @@ -5957,27 +5924,9 @@ func testList(t *testing.T, suite *integrationTestSuite) {
clt := teleport.GetSiteAPI(helpers.Site)
require.NotNil(t, clt)

// Wait 10 seconds for both nodes to show up to make sure they both have
// Wait for both nodes to show up to make sure they both have
// registered themselves.
waitForNodes := func(clt authclient.ClientI, count int) error {
tickCh := time.Tick(500 * time.Millisecond)
stopCh := time.After(10 * time.Second)
for {
select {
case <-tickCh:
nodesInCluster, err := clt.GetNodes(ctx, defaults.Namespace)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
if got, want := len(nodesInCluster), count; got == want {
return nil
}
case <-stopCh:
return trace.BadParameter("waited 10s, did find %v nodes", count)
}
}
}
err = waitForNodes(clt, 2)
err = helpers.WaitForNodeCount(ctx, teleport, helpers.Site, 2)
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -6159,22 +6108,6 @@ func testCmdLabels(t *testing.T, suite *integrationTestSuite) {
}
}

func waitForNodesToRegister(t *testing.T, teleport *helpers.TeleInstance, site string) {
t.Helper()
require.EventuallyWithT(t, func(t *assert.CollectT) {
// once the tunnel is established we need to wait until we have a
// connection to the remote auth
site := teleport.GetSiteAPI(site)
if !assert.NotNil(t, site) {
return
}
// we need to wait until we know about the node because direct dial to
// unregistered servers is no longer supported
_, err := site.GetNode(context.Background(), defaults.Namespace, teleport.Config.HostUUID)
assert.NoError(t, err)
}, time.Second*30, 250*time.Millisecond)
}

// TestDataTransfer makes sure that a "session.data" event is emitted at the
// end of a session that matches the amount of data that was transferred.
func testDataTransfer(t *testing.T, suite *integrationTestSuite) {
Expand All @@ -6188,6 +6121,9 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) {
main := suite.newTeleport(t, nil, true)
defer main.StopAll()

err := helpers.WaitForNodeCount(context.Background(), main, helpers.Site, 1)
require.NoError(t, err)

// Create a client to the above Teleport cluster.
clientConfig := helpers.ClientConfig{
Login: suite.Me.Username,
Expand All @@ -6196,8 +6132,6 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) {
Port: helpers.Port(t, main.SSH),
}

waitForNodesToRegister(t, main, helpers.Site)

// Write 1 MB to stdout.
command := []string{"dd", "if=/dev/zero", "bs=1024", "count=1024"}
output, err := runCommand(t, main, command, clientConfig, 1)
Expand Down Expand Up @@ -7779,7 +7713,8 @@ func testModeratedSFTP(t *testing.T, suite *integrationTestSuite) {
_, err = authServer.CreateUser(ctx, moderatorUser)
require.NoError(t, err)

waitForNodesToRegister(t, instance, helpers.Site)
err = helpers.WaitForNodeCount(context.Background(), instance, helpers.Site, 1)
require.NoError(t, err)

// Start a shell so a moderated session is created
peerClient, err := instance.NewClient(helpers.ClientConfig{
Expand Down Expand Up @@ -8037,7 +7972,8 @@ func testSFTP(t *testing.T, suite *integrationTestSuite) {
teleport.StopAll()
})

waitForNodesToRegister(t, teleport, helpers.Site)
err := helpers.WaitForNodeCount(context.Background(), teleport, helpers.Site, 1)
require.NoError(t, err)

teleportClient, err := teleport.NewClient(helpers.ClientConfig{
Login: suite.Me.Username,
Expand Down

0 comments on commit 8779d70

Please sign in to comment.