Skip to content

Commit

Permalink
Attempt to reduce flakiness of integration tests (#49888)
Browse files Browse the repository at this point in the history
Closes #47156.

All of the tests suffering from issues dialing hosts, and failing
with a `failed to dial target host` error were incorrectly waiting
for nodes to become visible before establishing connections. The
main culprit for most of the failures was `waitForNodesToRegister`,
though a few tests had a very similar hand rolled variant, which
incorrectly returned when the nodes appeard in Auth. However, since
the Proxy is the one performing dialing, they should have waited
for the nodes to appear in the Proxy.

To resolve, `waitForNodesToRegister` and all hand rolled equivalents
have been removed in favor of `(TeleInstance) WaitForNodeCount` which
correctly uses the `CachingAccessPoint` of the RemoteSite instead
of `GetClient`. Additionally, `helpers.WaitForNodeCount` was updated
to validate that the node watcher used for routing in the Proxy
also contained the expected number of nodes.
  • Loading branch information
rosstimothy authored Dec 6, 2024
1 parent fc7c2be commit a56bc4e
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 141 deletions.
5 changes: 1 addition & 4 deletions integration/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,7 @@ func ExternalSSHCommand(o CommandOptions) (*exec.Cmd, error) {
}

// Create an exec.Command and tell it where to find the SSH agent.
cmd, err := exec.Command(sshpath, execArgs...), nil
if err != nil {
return nil, trace.Wrap(err)
}
cmd := exec.Command(sshpath, execArgs...)
cmd.Env = []string{fmt.Sprintf("SSH_AUTH_SOCK=%v", o.SocketPath)}

return cmd, nil
Expand Down
50 changes: 50 additions & 0 deletions integration/helpers/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ import (

"github.com/gravitational/teleport/api/breaker"
clientproto "github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/keys"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/auth/keygen"
"github.com/gravitational/teleport/lib/auth/state"
Expand Down Expand Up @@ -1765,3 +1767,51 @@ func (i *TeleInstance) StopAll() error {
i.Log.Infof("Stopped all teleport services for site %q", i.Secrets.SiteName)
return trace.NewAggregate(errors...)
}

// WaitForNodeCount waits for a certain number of nodes in the provided cluster
// to be visible to the Proxy. This should be called prior to any client dialing
// of nodes to be sure that the node is registered and routable.
func (i *TeleInstance) WaitForNodeCount(ctx context.Context, cluster string, count int) error {
const (
deadline = time.Second * 30
iterWaitTime = time.Second
)

err := retryutils.RetryStaticFor(deadline, iterWaitTime, func() error {
site, err := i.Tunnel.GetSite(cluster)
if err != nil {
return trace.Wrap(err)
}

// Validate that the site cache contains the expected count.
accessPoint, err := site.CachingAccessPoint()
if err != nil {
return trace.Wrap(err)
}

nodes, err := accessPoint.GetNodes(ctx, apidefaults.Namespace)
if err != nil {
return trace.Wrap(err)
}
if len(nodes) != count {
return trace.BadParameter("cache contained %v nodes, but wanted to find %v nodes", len(nodes), count)
}

// Validate that the site watcher contains the expected count.
watcher, err := site.NodeWatcher()
if err != nil {
return trace.Wrap(err)
}

if watcher.ResourceCount() != count {
return trace.BadParameter("node watcher contained %v nodes, but wanted to find %v nodes", watcher.ResourceCount(), count)
}

return nil
})
if err != nil {
return trace.Wrap(err)
}

return nil
}
33 changes: 0 additions & 33 deletions integration/helpers/trustedclusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/defaults"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/utils/retryutils"
"github.com/gravitational/teleport/lib/auth"
"github.com/gravitational/teleport/lib/reversetunnelclient"
)
Expand Down Expand Up @@ -112,37 +110,6 @@ func WaitForClusters(tun reversetunnelclient.Server, expected int) func() bool {
}
}

// WaitForNodeCount waits for a certain number of nodes to show up in the remote site.
func WaitForNodeCount(ctx context.Context, t *TeleInstance, clusterName string, count int) error {
const (
deadline = time.Second * 30
iterWaitTime = time.Second
)

err := retryutils.RetryStaticFor(deadline, iterWaitTime, func() error {
remoteSite, err := t.Tunnel.GetSite(clusterName)
if err != nil {
return trace.Wrap(err)
}
accessPoint, err := remoteSite.CachingAccessPoint()
if err != nil {
return trace.Wrap(err)
}
nodes, err := accessPoint.GetNodes(ctx, defaults.Namespace)
if err != nil {
return trace.Wrap(err)
}
if len(nodes) == count {
return nil
}
return trace.BadParameter("found %v nodes, but wanted to find %v nodes", len(nodes), count)
})
if err != nil {
return trace.Wrap(err)
}
return nil
}

// WaitForActiveTunnelConnections waits for remote cluster to report a minimum number of active connections
func WaitForActiveTunnelConnections(t *testing.T, tunnel reversetunnelclient.Server, clusterName string, expectedCount int) {
require.EventuallyWithT(t, func(t *assert.CollectT) {
Expand Down
137 changes: 37 additions & 100 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,27 +442,9 @@ func testAuditOn(t *testing.T, suite *integrationTestSuite) {

ctx := context.Background()

// wait 10 seconds for both nodes to show up, otherwise
// wait for both nodes to show up, otherwise
// we'll have trouble connecting to the node below.
waitForNodes := func(site authclient.ClientI, count int) error {
tickCh := time.Tick(500 * time.Millisecond)
stopCh := time.After(10 * time.Second)
for {
select {
case <-tickCh:
nodesInSite, err := site.GetNodes(ctx, defaults.Namespace)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
if got, want := len(nodesInSite), count; got == want {
return nil
}
case <-stopCh:
return trace.BadParameter("waited 10s, did find %v nodes", count)
}
}
}
err = waitForNodes(site, 2)
err = teleport.WaitForNodeCount(ctx, helpers.Site, 2)
require.NoError(t, err)

// should have no sessions:
Expand Down Expand Up @@ -796,8 +778,6 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) {
teleportSvr := suite.newTeleport(t, nil, true)
defer teleportSvr.StopAll()

site := teleportSvr.GetSiteAPI(helpers.Site)

// addNode adds a node to the teleport instance, returning its uuid.
// All nodes added this way have the same hostname.
addNode := func() (string, error) {
Expand All @@ -819,36 +799,11 @@ func testUUIDBasedProxy(t *testing.T, suite *integrationTestSuite) {
uuid1, err := addNode()
require.NoError(t, err)

uuid2, err := addNode()
_, err = addNode()
require.NoError(t, err)

// wait up to 10 seconds for supplied node names to show up.
waitForNodes := func(site authclient.ClientI, nodes ...string) error {
tickCh := time.Tick(500 * time.Millisecond)
stopCh := time.After(10 * time.Second)
Outer:
for _, nodeName := range nodes {
for {
select {
case <-tickCh:
nodesInSite, err := site.GetNodes(ctx, defaults.Namespace)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
for _, node := range nodesInSite {
if node.GetName() == nodeName {
continue Outer
}
}
case <-stopCh:
return trace.BadParameter("waited 10s, did find node %s", nodeName)
}
}
}
return nil
}

err = waitForNodes(site, uuid1, uuid2)
// wait for supplied node names to show up.
err = teleportSvr.WaitForNodeCount(ctx, helpers.Site, 3)
require.NoError(t, err)

// attempting to run a command by hostname should generate NodeIsAmbiguous error.
Expand Down Expand Up @@ -2150,7 +2105,8 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT
tc.concurrentConns = 1
}

waitForNodesToRegister(t, teleport, helpers.Site)
err = teleport.WaitForNodeCount(ctx, helpers.Site, 1)
require.NoError(t, err)

asyncErrors := make(chan error, 1)

Expand All @@ -2169,7 +2125,11 @@ func runDisconnectTest(t *testing.T, suite *integrationTestSuite, tc disconnectT
tc.clientConfigOpts(&cc)
}
cl, err := teleport.NewClient(cc)
require.NoError(t, err)
if err != nil {
asyncErrors <- err
return
}

cl.Stdout = person
cl.Stdin = person

Expand Down Expand Up @@ -3139,6 +3099,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus

cmd := []string{"echo", "hello world"}

// Wait for nodes to be visible before attempting connections
err = main.WaitForNodeCount(ctx, clusterAux, 2)
require.NoError(t, err)

// Try and connect to a node in the Aux cluster from the Main cluster using
// direct dialing.
creds, err := helpers.GenerateUserCreds(helpers.UserCredsRequest{
Expand Down Expand Up @@ -3224,6 +3188,10 @@ func trustedClusters(t *testing.T, suite *integrationTestSuite, test trustedClus
require.Eventually(t, helpers.WaitForClusters(main.Tunnel, 1), 10*time.Second, 1*time.Second,
"Two clusters do not see each other: tunnels are not working.")

// Wait for nodes to be visible before attempting connections
err = main.WaitForNodeCount(ctx, clusterAux, 2)
require.NoError(t, err)

// connection and client should recover and work again
output = &bytes.Buffer{}
tc.Stdout = output
Expand Down Expand Up @@ -3630,7 +3598,7 @@ func testTrustedTunnelNode(t *testing.T, suite *integrationTestSuite) {
"Two clusters do not see each other: tunnels are not working.")

// Wait for both nodes to show up before attempting to dial to them.
err = helpers.WaitForNodeCount(ctx, main, clusterAux, 2)
err = main.WaitForNodeCount(ctx, clusterAux, 2)
require.NoError(t, err)

cmd := []string{"echo", "hello world"}
Expand Down Expand Up @@ -4026,7 +3994,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) {
helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1)
helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1)

waitForNodesToRegister(t, main, "cluster-remote")
err = main.WaitForNodeCount(ctx, "cluster-remote", 1)
require.NoError(t, err)

// execute the connection via first proxy
cfg := helpers.ClientConfig{
Expand Down Expand Up @@ -4077,7 +4046,8 @@ func testDiscovery(t *testing.T, suite *integrationTestSuite) {
helpers.WaitForActiveTunnelConnections(t, main.Tunnel, "cluster-remote", 1)
helpers.WaitForActiveTunnelConnections(t, secondProxy, "cluster-remote", 1)

waitForNodesToRegister(t, main, "cluster-remote")
err = main.WaitForNodeCount(ctx, "cluster-remote", 1)
require.NoError(t, err)

// Requests going via main proxy should succeed.
output, err = runCommand(t, main, []string{"echo", "hello world"}, cfg, 1)
Expand Down Expand Up @@ -4859,11 +4829,8 @@ func testProxyHostKeyCheck(t *testing.T, suite *integrationTestSuite) {
require.NoError(t, err)

// Wait for the node to be visible before continuing.
require.EventuallyWithT(t, func(t *assert.CollectT) {
found, err := clt.GetNodes(context.Background(), defaults.Namespace)
assert.NoError(t, err)
assert.Len(t, found, 2)
}, 10*time.Second, 100*time.Millisecond)
err = instance.WaitForNodeCount(context.Background(), helpers.Site, 2)
require.NoError(t, err)

_, err = runCommand(t, instance, []string{"echo hello"}, clientConfig, 1)

Expand Down Expand Up @@ -5956,27 +5923,9 @@ func testList(t *testing.T, suite *integrationTestSuite) {
clt := teleport.GetSiteAPI(helpers.Site)
require.NotNil(t, clt)

// Wait 10 seconds for both nodes to show up to make sure they both have
// Wait for both nodes to show up to make sure they both have
// registered themselves.
waitForNodes := func(clt authclient.ClientI, count int) error {
tickCh := time.Tick(500 * time.Millisecond)
stopCh := time.After(10 * time.Second)
for {
select {
case <-tickCh:
nodesInCluster, err := clt.GetNodes(ctx, defaults.Namespace)
if err != nil && !trace.IsNotFound(err) {
return trace.Wrap(err)
}
if got, want := len(nodesInCluster), count; got == want {
return nil
}
case <-stopCh:
return trace.BadParameter("waited 10s, did find %v nodes", count)
}
}
}
err = waitForNodes(clt, 2)
err = teleport.WaitForNodeCount(ctx, helpers.Site, 2)
require.NoError(t, err)

tests := []struct {
Expand Down Expand Up @@ -6158,22 +6107,6 @@ func testCmdLabels(t *testing.T, suite *integrationTestSuite) {
}
}

func waitForNodesToRegister(t *testing.T, teleport *helpers.TeleInstance, site string) {
t.Helper()
require.EventuallyWithT(t, func(t *assert.CollectT) {
// once the tunnel is established we need to wait until we have a
// connection to the remote auth
site := teleport.GetSiteAPI(site)
if !assert.NotNil(t, site) {
return
}
// we need to wait until we know about the node because direct dial to
// unregistered servers is no longer supported
_, err := site.GetNode(context.Background(), defaults.Namespace, teleport.Config.HostUUID)
assert.NoError(t, err)
}, time.Second*30, 250*time.Millisecond)
}

// TestDataTransfer makes sure that a "session.data" event is emitted at the
// end of a session that matches the amount of data that was transferred.
func testDataTransfer(t *testing.T, suite *integrationTestSuite) {
Expand All @@ -6187,6 +6120,9 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) {
main := suite.newTeleport(t, nil, true)
defer main.StopAll()

err := main.WaitForNodeCount(context.Background(), helpers.Site, 1)
require.NoError(t, err)

// Create a client to the above Teleport cluster.
clientConfig := helpers.ClientConfig{
Login: suite.Me.Username,
Expand All @@ -6195,8 +6131,6 @@ func testDataTransfer(t *testing.T, suite *integrationTestSuite) {
Port: helpers.Port(t, main.SSH),
}

waitForNodesToRegister(t, main, helpers.Site)

// Write 1 MB to stdout.
command := []string{"dd", "if=/dev/zero", "bs=1024", "count=1024"}
output, err := runCommand(t, main, command, clientConfig, 1)
Expand Down Expand Up @@ -7155,6 +7089,7 @@ func (s *integrationTestSuite) defaultServiceConfig() *servicecfg.Config {
cfg.Log = s.Log
cfg.CircuitBreakerConfig = breaker.NoopBreakerConfig()
cfg.InstanceMetadataClient = imds.NewDisabledIMDSClient()
cfg.DebugService.Enabled = false
return cfg
}

Expand Down Expand Up @@ -7778,7 +7713,8 @@ func testModeratedSFTP(t *testing.T, suite *integrationTestSuite) {
_, err = authServer.CreateUser(ctx, moderatorUser)
require.NoError(t, err)

waitForNodesToRegister(t, instance, helpers.Site)
err = instance.WaitForNodeCount(context.Background(), helpers.Site, 1)
require.NoError(t, err)

// Start a shell so a moderated session is created
peerClient, err := instance.NewClient(helpers.ClientConfig{
Expand Down Expand Up @@ -8036,7 +7972,8 @@ func testSFTP(t *testing.T, suite *integrationTestSuite) {
teleport.StopAll()
})

waitForNodesToRegister(t, teleport, helpers.Site)
err := teleport.WaitForNodeCount(context.Background(), helpers.Site, 1)
require.NoError(t, err)

teleportClient, err := teleport.NewClient(helpers.ClientConfig{
Login: suite.Me.Username,
Expand Down
2 changes: 1 addition & 1 deletion integration/proxy/proxy_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ func (p *Suite) addNodeToLeafCluster(t *testing.T, tunnelNodeHostname string) {
"Two clusters do not see each other: tunnels are not working.")

// Wait for both nodes to show up before attempting to dial to them.
err = helpers.WaitForNodeCount(context.Background(), p.root, p.leaf.Secrets.SiteName, 2)
err = p.root.WaitForNodeCount(context.Background(), p.leaf.Secrets.SiteName, 2)
require.NoError(t, err)
}

Expand Down
Loading

0 comments on commit a56bc4e

Please sign in to comment.