Skip to content

Commit

Permalink
Fix integration tests
Browse files Browse the repository at this point in the history
- switched to configuration layout mandated by v2
- added `mage test` target that will run both unit and integration
  tests
- switched CI runs from just running unit tests to running all tests
- Skipped integration tests that require docker to run logstash or
  kafka, support for doing that is finalized in magefile.
- updated example config file

Closes elastic#244
  • Loading branch information
leehinman committed Feb 27, 2023
1 parent fff35d3 commit bfa9709
Show file tree
Hide file tree
Showing 13 changed files with 407 additions and 294 deletions.
2 changes: 1 addition & 1 deletion .ci/Jenkinsfile
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ pipeline {
withGithubNotify(context: "Test-${PLATFORM}") {
dir("${BASE_DIR}"){
withMageEnv(){
cmd(label: 'Go unitTest', script: 'mage unitTest')
cmd(label: 'Go unitTest', script: 'mage test')
}
}
}
Expand Down
22 changes: 5 additions & 17 deletions controller/controller_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func TestRemoveOutput(t *testing.T) {
for _, unit := range observed.Units {
t.Logf("current state for %s is: %s", unit.Id, unit.Message)
}

}
}
return nil
Expand All @@ -100,7 +99,6 @@ func TestRemoveOutput(t *testing.T) {
assert.True(t, restartedOutput, "gRPC input has stopped")
assert.True(t, outputHealthy, "input restarted")
assert.True(t, testStopped, "test has shut down")

}

func TestStopAllInputs(t *testing.T) {
Expand Down Expand Up @@ -190,7 +188,6 @@ func TestStopAllInputs(t *testing.T) {
for _, unit := range observed.Units {
t.Logf("current state for %s is: %s", unit.Id, unit.Message)
}

}
}
return nil
Expand All @@ -203,7 +200,6 @@ func TestStopAllInputs(t *testing.T) {
assert.True(t, newInput, "input restarted")
assert.True(t, testStopped, "test has shut down")
assert.True(t, inputRemoved, "input unit removed")

}

func TestChangeOutputType(t *testing.T) {
Expand All @@ -219,7 +215,8 @@ func TestChangeOutputType(t *testing.T) {
"type": "elasticsearch",
"enabled": "true",
"hosts": []interface{}{"localhost:9200"},
})}
}),
}

doneWaiter := &sync.WaitGroup{}
srvFunc := func(observed *proto.CheckinObserved) *proto.CheckinExpected {
Expand Down Expand Up @@ -284,7 +281,6 @@ func TestChangeOutputType(t *testing.T) {
for _, unit := range observed.Units {
t.Logf("current state for %s is: %s", unit.Id, unit.Message)
}

}
}
return nil
Expand Down Expand Up @@ -393,7 +389,6 @@ func TestAddingInputs(t *testing.T) {
for _, unit := range observed.Units {
t.Logf("current state for %s is: %s", unit.Id, unit.Message)
}

}
}
return nil
Expand All @@ -407,7 +402,6 @@ func TestAddingInputs(t *testing.T) {
assert.True(t, stoppedFirst, "stopped first unit")
assert.True(t, gotStopped, "units stopped")
assert.False(t, gotRestarted, "units restarted, they should not")

}

func TestBasicAgentControl(t *testing.T) {
Expand Down Expand Up @@ -446,7 +440,7 @@ func TestBasicAgentControl(t *testing.T) {
} else if unitsAreState(t, proto.State_HEALTHY, observed.Units) {
t.Logf("Got unit state healthy, sending STOPPED")
gotHealthy = true
//shutdown
// shutdown
unitIn.ConfigStateIdx++
unitIn.State = proto.State_STOPPED

Expand All @@ -469,7 +463,6 @@ func TestBasicAgentControl(t *testing.T) {
for _, unit := range observed.Units {
t.Logf("current state for %s is: %s", unit.Id, unit.Message)
}

}
}
return nil
Expand All @@ -480,7 +473,6 @@ func TestBasicAgentControl(t *testing.T) {
assert.True(t, gotConfig, "config state")
assert.True(t, gotHealthy, "healthy state")
assert.True(t, gotStopped, "stopped state")

}

func TestUnitLogChange(t *testing.T) {
Expand Down Expand Up @@ -522,7 +514,7 @@ func TestUnitLogChange(t *testing.T) {
} else if unitsAreState(t, proto.State_HEALTHY, observed.Units) && !gotHealthy {
t.Logf("Got unit state healthy, changing log level")
gotHealthy = true
//shutdown
// shutdown
unitOut.ConfigStateIdx++
unitOut.LogLevel = proto.UnitLogLevel_DEBUG
return &proto.CheckinExpected{
Expand All @@ -540,7 +532,7 @@ func TestUnitLogChange(t *testing.T) {
} else if unitsAreState(t, proto.State_HEALTHY, observed.Units) && gotHealthy {
// shut down
t.Logf("Got unit state healthy, stopping")
//check to see if log level has changed
// check to see if log level has changed
observedLogLevel = logp.GetLevel()
unitOut.ConfigStateIdx++
unitOut.State = proto.State_STOPPED
Expand All @@ -560,7 +552,6 @@ func TestUnitLogChange(t *testing.T) {
for _, unit := range observed.Units {
t.Logf("current state for %s is: %s", unit.Id, unit.Message)
}

}
}
return nil
Expand All @@ -573,7 +564,6 @@ func TestUnitLogChange(t *testing.T) {
assert.True(t, gotStopped, "stopped state")
assert.False(t, gotRestarted, "units restarted")
assert.Equal(t, zapcore.DebugLevel, observedLogLevel, "not expected log level")

}

func runServerTest(t *testing.T, implFunc mock.StubServerCheckinV2, waitUntil *sync.WaitGroup, token string) {
Expand All @@ -582,7 +572,6 @@ func runServerTest(t *testing.T, implFunc mock.StubServerCheckinV2, waitUntil *s
srv := mock.StubServerV2{
CheckinV2Impl: implFunc,
ActionImpl: func(response *proto.ActionResponse) error {

return nil
},
ActionsChan: make(chan *mock.PerformAction, 100),
Expand All @@ -605,7 +594,6 @@ func runServerTest(t *testing.T, implFunc mock.StubServerCheckinV2, waitUntil *s

err := runController(ctx, validClient)
assert.NoError(t, err)

}

func createClient(token string, port int) client.V2 {
Expand Down
32 changes: 13 additions & 19 deletions controller/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ package controller

import (
"context"
"errors"
"io/fs"
"net"
"os"
"path/filepath"
"strings"
"testing"
"time"

Expand All @@ -21,16 +24,12 @@ import (

func TestUnmanaged(t *testing.T) {
_ = logp.DevelopmentSetup()
serverAddr := filepath.Join(os.TempDir(), "test-unmanaged-shipper.sock")
serverAddr := filepath.Join(t.TempDir(), "test.sock")
cfg := config.DefaultConfig()
cfg.Type = "console"
cfg.Shipper.Output.Console = &output.ConsoleConfig{Enabled: true}
cfg.Shipper.Server.Server = serverAddr

defer func() {
_ = os.Remove(serverAddr)
}()

ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

go func() {
Expand All @@ -45,21 +44,16 @@ func TestUnmanaged(t *testing.T) {
default:
}
_, err := os.Stat(serverAddr)
if !os.IsNotExist(err) {
break
if err != nil && errors.Is(err, fs.ErrNotExist) {
continue
}
con, err := net.Dial("unix", serverAddr)
if err != nil && strings.Contains(err.Error(), "connection refused") {
continue
}
require.NoError(t, err)
t.Cleanup(func() { _ = con.Close() })
break
}
// remove the file now that we know it's there
defer func() {
_ = os.Remove(serverAddr)
}()

// basic test, make sure output is running
con, err := net.Dial("unix", serverAddr)
require.NoError(t, err)
defer func() {
_ = con.Close()
}()

cancel()
}
Loading

0 comments on commit bfa9709

Please sign in to comment.