Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Speed up TestAppServersHA #12128

Merged
merged 4 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 81 additions & 82 deletions integration/app_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"net/http"
"net/http/httptest"
"net/url"
"os"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -635,58 +634,42 @@ func TestAppAuditEvents(t *testing.T) {
}

func TestAppServersHA(t *testing.T) {

type packInfo struct {
clusterName string
publicHTTPAddr string
publicWSAddr string
appServers []*service.TeleportProcess
}
testCases := map[string]struct {
packInfo func(pack *pack) (cluterName, publicAddr string, appServers []*service.TeleportProcess)
packInfo func(pack *pack) packInfo
startAppServers func(pack *pack, count int) []*service.TeleportProcess
makeRequest func(pack *pack, inCookie string) (status int, err error)
}{
"RootHTTPApp": {
packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) {
return pack.rootAppClusterName, pack.rootAppPublicAddr, pack.rootAppServers
},
startAppServers: func(pack *pack, count int) []*service.TeleportProcess {
return pack.startRootAppServers(t, count, []service.App{})
},
makeRequest: func(pack *pack, inCookie string) (int, error) {
status, _, err := pack.makeRequest(inCookie, http.MethodGet, "/")
return status, err
},
},
"RootWebSocketApp": {
packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) {
return pack.rootAppClusterName, pack.rootWSPublicAddr, pack.rootAppServers
"RootServer": {
packInfo: func(pack *pack) packInfo {
return packInfo{
clusterName: pack.rootAppClusterName,
publicHTTPAddr: pack.rootAppPublicAddr,
publicWSAddr: pack.rootWSPublicAddr,
appServers: pack.rootAppServers,
}
},
startAppServers: func(pack *pack, count int) []*service.TeleportProcess {
return pack.startRootAppServers(t, count, []service.App{})
},
makeRequest: func(pack *pack, inCookie string) (int, error) {
_, err := pack.makeWebsocketRequest(inCookie, "/")
return 0, err
},
},
"LeafHTTPApp": {
packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) {
return pack.leafAppClusterName, pack.leafAppPublicAddr, pack.leafAppServers
},
startAppServers: func(pack *pack, count int) []*service.TeleportProcess {
return pack.startLeafAppServers(t, count, []service.App{})
},
makeRequest: func(pack *pack, inCookie string) (int, error) {
status, _, err := pack.makeRequest(inCookie, http.MethodGet, "/")
return status, err
},
},
"LeafWebSocketApp": {
packInfo: func(pack *pack) (string, string, []*service.TeleportProcess) {
return pack.leafAppClusterName, pack.leafWSPublicAddr, pack.leafAppServers
"LeafServer": {
packInfo: func(pack *pack) packInfo {
return packInfo{
clusterName: pack.leafAppClusterName,
publicHTTPAddr: pack.leafAppPublicAddr,
publicWSAddr: pack.leafWSPublicAddr,
appServers: pack.leafAppServers,
}
},
startAppServers: func(pack *pack, count int) []*service.TeleportProcess {
return pack.startLeafAppServers(t, count, []service.App{})
},
makeRequest: func(pack *pack, inCookie string) (int, error) {
_, err := pack.makeWebsocketRequest(inCookie, "/")
return 0, err
},
},
}

Expand All @@ -711,46 +694,53 @@ func TestAppServersHA(t *testing.T) {
require.NoError(t, err)
}

makeRequests := func(t *testing.T, pack *pack, httpCookie, wsCookie string, responseAssertion func(*testing.T, int, error)) {
status, _, err := pack.makeRequest(httpCookie, http.MethodGet, "/")
responseAssertion(t, status, err)

_, err = pack.makeWebsocketRequest(wsCookie, "/")
responseAssertion(t, 0, err)
}

pack := setupWithOptions(t, appTestOptions{rootAppServersCount: 3})

for name, test := range testCases {
name, test := name, test
t.Run(name, func(t *testing.T) {
pack := setupWithOptions(t, appTestOptions{rootAppServersCount: 3})
clusterName, publicAddr, appServers := test.packInfo(pack)
t.Parallel()
info := test.packInfo(pack)
httpCookie := pack.createAppSession(t, info.publicHTTPAddr, info.clusterName)
wsCookie := pack.createAppSession(t, info.publicWSAddr, info.clusterName)

inCookie := pack.createAppSession(t, publicAddr, clusterName)
status, err := test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)
makeRequests(t, pack, httpCookie, wsCookie, responseWithoutError)

// Stop all root app servers.
for i, appServer := range appServers {
appServer.Close()
for i, appServer := range info.appServers {
require.NoError(t, appServer.Close())

// issue a request right after a server is gone.
status, err = test.makeRequest(pack, inCookie)
if i == len(appServers)-1 {
if i == len(info.appServers)-1 {
// fails only when the last one is closed.
responseWithError(t, status, err)
makeRequests(t, pack, httpCookie, wsCookie, responseWithError)
} else {
// otherwise the request should be handled by another
// server.
responseWithoutError(t, status, err)
makeRequests(t, pack, httpCookie, wsCookie, responseWithoutError)
}
}

servers := test.startAppServers(pack, 3)
status, err = test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)
makeRequests(t, pack, httpCookie, wsCookie, responseWithoutError)

// Start an additional app server and stop all current running
// ones.
test.startAppServers(pack, 1)
for _, appServer := range servers {
appServer.Close()
require.NoError(t, appServer.Close())

// Everytime a app server stops we issue a request to
// Everytime an app server stops we issue a request to
// guarantee that the requests are going to be resolved by
// the remaining app servers.
status, err = test.makeRequest(pack, inCookie)
responseWithoutError(t, status, err)
makeRequests(t, pack, httpCookie, wsCookie, responseWithoutError)
}
})
}
Expand Down Expand Up @@ -890,8 +880,9 @@ type appTestOptions struct {
rootAppServersCount int
leafAppServersCount int

rootConfig func(config *service.Config)
leafConfig func(config *service.Config)
rootConfig func(config *service.Config)
leafConfig func(config *service.Config)
skipSettingTimeouts bool
}

// setup configures all clusters and servers needed for a test.
Expand All @@ -910,7 +901,9 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack {
// self-signed certificate during tests.
lib.SetInsecureDevMode(true)

SetTestTimeouts(time.Millisecond * time.Duration(500))
if !opts.skipSettingTimeouts {
SetTestTimeouts(time.Millisecond * time.Duration(500))
}

p := &pack{
rootAppName: "app-01",
Expand Down Expand Up @@ -1067,7 +1060,6 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack {
rcConf.Console = nil
rcConf.Log = log
rcConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(rcConf.DataDir) })
rcConf.Auth.Enabled = true
rcConf.Auth.Preference.SetSecondFactor("off")
rcConf.Proxy.Enabled = true
Expand All @@ -1083,7 +1075,6 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack {
lcConf.Console = nil
lcConf.Log = log
lcConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(lcConf.DataDir) })
lcConf.Auth.Enabled = true
lcConf.Auth.Preference.SetSecondFactor("off")
lcConf.Proxy.Enabled = true
Expand All @@ -1102,14 +1093,10 @@ func setupWithOptions(t *testing.T, opts appTestOptions) *pack {

err = p.leafCluster.Start()
require.NoError(t, err)
t.Cleanup(func() {
p.leafCluster.StopAll()
})
t.Cleanup(func() { require.NoError(t, p.leafCluster.StopAll()) })
err = p.rootCluster.Start()
require.NoError(t, err)
t.Cleanup(func() {
p.rootCluster.StopAll()
})
t.Cleanup(func() { require.NoError(t, p.rootCluster.StopAll()) })

// At least one rootAppServer should start during the setup
rootAppServersCount := 1
Expand Down Expand Up @@ -1514,14 +1501,13 @@ func (p *pack) waitForLogout(appCookie string) (int, error) {
func (p *pack) startRootAppServers(t *testing.T, count int, extraApps []service.App) []*service.TeleportProcess {
log := utils.NewLoggerForTests()

servers := make([]*service.TeleportProcess, count)
configs := make([]*service.Config, count)

for i := 0; i < count; i++ {
raConf := service.MakeDefaultConfig()
raConf.Console = nil
raConf.Log = log
raConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(raConf.DataDir) })
raConf.Token = "static-token-value"
raConf.AuthServers = []utils.NetAddr{
{
Expand Down Expand Up @@ -1566,26 +1552,32 @@ func (p *pack) startRootAppServers(t *testing.T, count int, extraApps []service.
},
}, extraApps...)

appServer, err := p.rootCluster.StartApp(raConf)
require.NoError(t, err)
t.Cleanup(func() { appServer.Close() })
configs[i] = raConf
}

servers[i] = appServer
servers, err := p.rootCluster.StartApps(configs)
require.NoError(t, err)
require.Equal(t, len(configs), len(servers))

for _, appServer := range servers {
srv := appServer
t.Cleanup(func() {
require.NoError(t, srv.Close())
})
}

return servers
}

func (p *pack) startLeafAppServers(t *testing.T, count int, extraApps []service.App) []*service.TeleportProcess {
log := utils.NewLoggerForTests()
servers := make([]*service.TeleportProcess, count)
configs := make([]*service.Config, count)

for i := 0; i < count; i++ {
laConf := service.MakeDefaultConfig()
laConf.Console = nil
laConf.Log = log
laConf.DataDir = t.TempDir()
t.Cleanup(func() { os.RemoveAll(laConf.DataDir) })
laConf.Token = "static-token-value"
laConf.AuthServers = []utils.NetAddr{
{
Expand Down Expand Up @@ -1615,11 +1607,18 @@ func (p *pack) startLeafAppServers(t *testing.T, count int, extraApps []service.
},
}, extraApps...)

appServer, err := p.leafCluster.StartApp(laConf)
require.NoError(t, err)
t.Cleanup(func() { appServer.Close() })
configs[i] = laConf
}

servers, err := p.leafCluster.StartApps(configs)
require.NoError(t, err)
require.Equal(t, len(configs), len(servers))

servers[i] = appServer
for _, appServer := range servers {
rosstimothy marked this conversation as resolved.
Show resolved Hide resolved
srv := appServer
t.Cleanup(func() {
require.NoError(t, srv.Close())
})
}

return servers
Expand Down
71 changes: 71 additions & 0 deletions integration/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,77 @@ func (i *TeleInstance) StartApp(conf *service.Config) (*service.TeleportProcess,
return process, nil
}

func (i *TeleInstance) StartApps(configs []*service.Config) ([]*service.TeleportProcess, error) {
type result struct {
process *service.TeleportProcess
tmpDir string
err error
}

results := make(chan result, len(configs))
for _, conf := range configs {
go func(cfg *service.Config) {
dataDir, err := os.MkdirTemp("", "cluster-"+i.Secrets.SiteName)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this logic reuse StartApp above? Seems like most of logic here is the same.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially tried to launch StartApp in a goroutine for each app, but it turns out TeleInstance is racy. StartApp, StartNode, StartDatabase, etc all update i.tempDirs and i.Nodes without using a mutex. I opted to add StartApps and handle updating the TeleInstance fields in a way that wasn't racy instead of adding a mutex to TeleInstance and finding all the possible races.

The downside is as you mentioned StartApp and StartApps are nearly identical. If you think it would be better to remove StartApps and go down the mutex route I'm happy to make the change.

if err != nil {
results <- result{err: err}
}

cfg.DataDir = dataDir
cfg.AuthServers = []utils.NetAddr{
{
AddrNetwork: "tcp",
Addr: net.JoinHostPort(Loopback, i.GetPortWeb()),
},
}
cfg.Token = "token"
cfg.UploadEventsC = i.UploadEventsC
cfg.Auth.Enabled = false
cfg.Proxy.Enabled = false

// Create a new Teleport process and add it to the list of nodes that
// compose this "cluster".
process, err := service.NewTeleport(cfg)
if err != nil {
results <- result{err: err, tmpDir: dataDir}
}

// Build a list of expected events to wait for before unblocking based off
// the configuration passed in.
expectedEvents := []string{
service.AppsReady,
}

// Start the process and block until the expected events have arrived.
receivedEvents, err := startAndWait(process, expectedEvents)
if err != nil {
results <- result{err: err, tmpDir: dataDir}
}

log.Debugf("Teleport Application Server (in instance %v) started: %v/%v events received.",
i.Secrets.SiteName, len(expectedEvents), len(receivedEvents))

results <- result{err: err, tmpDir: dataDir, process: process}
}(conf)
}

processes := make([]*service.TeleportProcess, 0, len(configs))
for j := 0; j < len(configs); j++ {
result := <-results
if result.tmpDir != "" {
i.tempDirs = append(i.tempDirs, result.tmpDir)
}

if result.err != nil {
return nil, trace.Wrap(result.err)
}

i.Nodes = append(i.Nodes, result.process)
processes = append(processes, result.process)
}

return processes, nil
}

// StartDatabase starts the database access service with the provided config.
func (i *TeleInstance) StartDatabase(conf *service.Config) (*service.TeleportProcess, *auth.Client, error) {
dataDir, err := os.MkdirTemp("", "cluster-"+i.Secrets.SiteName)
Expand Down