Skip to content

Commit

Permalink
Attempt to fix integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
michel-laterman committed Apr 3, 2024
1 parent e46ab6c commit 6c615be
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 16 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Use policy outputs when running in agent-mode

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Fleet-server will retrieve and use the output from the policy when running in agent-mode.
This allows the fleet-server to connect to multiple Elasticsearch hosts if it is successful when
connecting to the host provided at enrolment/installation.
We expect that the host provided during enrollment/installation is never removed as a valid output.
fleet-server does not persist output settings it retrieves locally so it must always be able to connect
with options specified at enrollment/installation.
# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3411

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/2784
37 changes: 24 additions & 13 deletions internal/pkg/server/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ import (
"time"

"github.com/elastic/elastic-agent-client/v7/pkg/client"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/state"

"github.com/elastic/go-ucfg"
"go.elastic.co/apm/v2"
apmtransport "go.elastic.co/apm/v2/transport"

Expand All @@ -31,10 +29,12 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/gc"
"github.com/elastic/fleet-server/v7/internal/pkg/logger"
"github.com/elastic/fleet-server/v7/internal/pkg/monitor"
"github.com/elastic/fleet-server/v7/internal/pkg/policy"
"github.com/elastic/fleet-server/v7/internal/pkg/profile"
"github.com/elastic/fleet-server/v7/internal/pkg/scheduler"
"github.com/elastic/fleet-server/v7/internal/pkg/state"
"github.com/elastic/fleet-server/v7/internal/pkg/ver"

"github.com/hashicorp/go-version"
Expand Down Expand Up @@ -196,10 +196,9 @@ LOOP:
select {
case cfg := <-f.cfgCh:
log.Info().Msg("Server configuration update")
if cfg.Inputs == nil {
// cfg only contains updated output retrieved from policy
if cfg.Inputs == nil && cfg.RevisionIdx != 0 { // cfg only contains updated output retrieved from policy
rev := cfg.RevisionIdx
esOutput := config.MergeElasticsearchFromPolicy(curCfg.Output.Elasticsearch, cfg.Output.Elasticsearch)
newCfg.RevisionIdx = cfg.RevisionIdx

// test config
cli, err := es.NewClient(ctx,
Expand All @@ -212,25 +211,37 @@ LOOP:
elasticsearchOptions(curCfg.Inputs[0].Server.Instrumentation.Enabled, f.bi)...,
)
if err != nil {
log.Warn().Err(err).Msg("unable to create elasticsearch client from policy output")
log.Warn().Int64(logger.RevisionIdx, rev).Err(err).Msg("unable to create elasticsearch client from policy output")
continue
}
remoteVersion, err := ver.CheckCompatibility(ctx, cli, f.bi.Version)
if err != nil {
// NOTE The error can indicate a bad network connection, bad TLS settings, etc.
// But if the error is an ErrElasticVersionConflict then something is very wrong
if errors.Is(err, es.ErrElasticVersionConflict) {
log.Error().Err(err).Interface("output", esOutput).Interface("bootstrap", curCfg.Output.Elasticsearch).Str("remote_version", remoteVersion).Msg("Elasticsearch version constraint failed for new output")
log.Error().Err(err).Int64(logger.RevisionIdx, rev).Interface("output", esOutput).Interface("bootstrap", curCfg.Output.Elasticsearch).Str("remote_version", remoteVersion).Msg("Elasticsearch version constraint failed for new output")
} else {
log.Warn().Err(err).Msg("Failed version compatibility check using output from policy")
log.Warn().Err(err).Int64(logger.RevisionIdx, rev).Msg("Failed version compatibility check using output from policy")
}
continue
}
log.Info().Int64(logger.RevisionIdx, cfg.RevisionIdx).Msg("Using output from policy")
newCfg.Output.Elasticsearch = esOutput
} else {
newCfg = cfg
// work around to get a new cfg object based off curCfg
// we override the output with esOutput and have a complete config with a new mutex
tmp, err := ucfg.NewFrom(curCfg, config.DefaultOptions...)
if err != nil {
log.Error().Err(err).Int64(logger.RevisionIdx, rev).Msg("Unable to convert config")
continue
}
err = tmp.Unpack(cfg, config.DefaultOptions...)
if err != nil {
log.Error().Err(err).Int64(logger.RevisionIdx, rev).Msg("Unable to unpack config")
continue
}
log.Info().Int64(logger.RevisionIdx, rev).Msg("Using output from policy")
cfg.Output.Elasticsearch = esOutput
cfg.RevisionIdx = rev
}
newCfg = cfg
case err := <-ech:
f.reporter.UpdateState(client.UnitStateFailed, fmt.Sprintf("Error - %s", err), nil) //nolint:errcheck // unclear on what should we do if updating the status fails?
log.Error().Err(err).Msg("Fleet Server failed")
Expand Down
8 changes: 5 additions & 3 deletions internal/pkg/server/fleet_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ func TestServerConfigErrorReload(t *testing.T) {
cancel()
}).Return(nil)
mReporter.On("UpdateState", client.UnitStateStopping, mock.Anything, mock.Anything).Return(nil)
mReporter.On("UpdateState", client.UnitStateFailed, mock.MatchedBy(func(err error) bool { return errors.Is(err, context.Canceled) }), mock.Anything).Return(nil).Maybe()

// set bad config
cfg.Output.Elasticsearch.ServiceToken = "incorrect"
Expand Down Expand Up @@ -459,16 +460,17 @@ func TestServerReloadOutputOnly(t *testing.T) {
},
RevisionIdx: 3,
}

successes := successfulOutputMsg.Load()
err = srv.srv.Reload(ctx, &cfg)
require.NoError(t, err)

for i := 0; i < 5; i++ {
if successfulOutputMsg.Load() > 0 {
if successfulOutputMsg.Load() > successes {
break
}
time.Sleep(time.Second)
}
require.NotZero(t, successfulOutputMsg.Load(), "Did not detect elasticsearch output client success")
require.Greater(t, successfulOutputMsg.Load(), successes, "Did not detect elasticsearch output client success")

cancel()
srv.waitExit() //nolint:errcheck // test case
Expand Down

0 comments on commit 6c615be

Please sign in to comment.