Skip to content

Commit

Permalink
agent-mode loads output from policy (#3411)
Browse files Browse the repository at this point in the history
Allow the self-monitor that fleet-server starts when in agent mode to send a config.Config struct through the server's config channel. This struct only has the Output and (new) RevisionIdx attributes set from values retrieved from the policy output.
When fleet receives new config it will handle the output only config as a special case and merge it with the previous
output config in order to get an up to date complete config.
When merging config, non-default values from the policy are preferred.
For an output block to be used, at least one host must be reachable.
  • Loading branch information
michel-laterman authored Apr 15, 2024
1 parent e6250bc commit fe7955b
Show file tree
Hide file tree
Showing 13 changed files with 714 additions and 69 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Use policy outputs when running in agent-mode

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
Fleet-server will retrieve and use the output from the policy when running in agent-mode.
This allows the fleet-server to connect to multiple Elasticsearch hosts if it is successful when
connecting to the host provided at enrollment/installation.
We expect that the host provided during enrollment/installation is never removed as a valid output.
fleet-server does not persist output settings it retrieves locally so it must always be able to connect
with options specified at enrollment/installation.
# Affected component; a word indicating the component this changeset affects.
component:

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: 3411

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/2784
13 changes: 7 additions & 6 deletions internal/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@ const kRedacted = "[redacted]"
// The env vars that `elastic-agent container` command uses are unrelated.
// The agent will do all substitutions before sending fleet-server the complete config.
type Config struct {
Fleet Fleet `config:"fleet"`
Output Output `config:"output"`
Inputs []Input `config:"inputs"`
Logging Logging `config:"logging"`
HTTP HTTP `config:"http"`
m sync.Mutex
Fleet Fleet `config:"fleet"`
Output Output `config:"output"`
Inputs []Input `config:"inputs"`
Logging Logging `config:"logging"`
HTTP HTTP `config:"http"`
RevisionIdx int64 `config:",ignore"`
m sync.Mutex
}

var deprecatedConfigOptions = map[string]string{
Expand Down
121 changes: 116 additions & 5 deletions internal/pkg/config/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package config

import (
"crypto/tls"
"fmt"
"net"
"net/http"
Expand All @@ -25,6 +26,14 @@ import (
const httpTransportLongPollTimeout = 10 * time.Minute
const schemeHTTP = "http"

const (
DefaultElasticsearchHost = "localhost:9200"
DefaultElasticsearchTimeout = 90 * time.Second
DefaultElasticsearchMaxRetries = 3
DefaultElasticsearchMaxConnPerHost = 128
DefaultElasticsearchMaxContentLength = 100 * 1024 * 1024
)

var hasScheme = regexp.MustCompile(`^([a-z][a-z0-9+\-.]*)://`)

// Output is the output configuration to elasticsearch.
Expand Down Expand Up @@ -54,11 +63,11 @@ type Elasticsearch struct {
// InitDefaults initializes the defaults for the configuration.
func (c *Elasticsearch) InitDefaults() {
c.Protocol = schemeHTTP
c.Hosts = []string{"localhost:9200"}
c.Timeout = 90 * time.Second
c.MaxRetries = 3
c.MaxConnPerHost = 128
c.MaxContentLength = 100 * 1024 * 1024
c.Hosts = []string{DefaultElasticsearchHost}
c.Timeout = DefaultElasticsearchTimeout
c.MaxRetries = DefaultElasticsearchMaxRetries
c.MaxConnPerHost = DefaultElasticsearchMaxConnPerHost
c.MaxContentLength = DefaultElasticsearchMaxContentLength
}

// Validate ensures that the configuration is valid.
Expand Down Expand Up @@ -173,6 +182,108 @@ func (c *Elasticsearch) ToESConfig(longPoll bool) (elasticsearch.Config, error)
}, nil
}

// MergeElasticsearchPolicy will merge elasticsearch settings retrieved from the fleet-server's policy into the base configuration and return the resulting config.
// ucfg.Merge and config.Config.Merge will both fail at merging configs because the verification mode is not detect as a string type value
func MergeElasticsearchFromPolicy(cfg, pol Elasticsearch) Elasticsearch {
res := Elasticsearch{
Protocol: cfg.Protocol,
Hosts: cfg.Hosts,
Headers: cfg.Headers,
ServiceToken: cfg.ServiceToken, // ServiceToken will always be specified from the settings and not in the policy.
ServiceTokenPath: cfg.ServiceTokenPath,
ProxyURL: cfg.ProxyURL,
ProxyDisable: cfg.ProxyDisable,
ProxyHeaders: cfg.ProxyHeaders,
TLS: mergeElasticsearchTLS(cfg.TLS, pol.TLS), // tls can be a special case
MaxRetries: cfg.MaxRetries,
MaxConnPerHost: cfg.MaxConnPerHost,
Timeout: cfg.Timeout,
MaxContentLength: cfg.MaxContentLength,
}
// If policy has a non-default Hosts value use it's values for Protocol and hosts
if pol.Hosts != nil && !(len(pol.Hosts) == 1 && pol.Hosts[0] == DefaultElasticsearchHost) {
res.Protocol = pol.Protocol
res.Hosts = pol.Hosts
}
if pol.Headers != nil {
res.Headers = pol.Headers
}
// If the policy ProxyURL is set, use all of the policy's Proxy values.
if pol.ProxyURL != "" {
res.ProxyURL = pol.ProxyURL
res.ProxyDisable = pol.ProxyDisable
res.ProxyHeaders = pol.ProxyHeaders
}
if pol.MaxRetries != DefaultElasticsearchMaxRetries {
res.MaxRetries = pol.MaxRetries
}
if pol.MaxConnPerHost != DefaultElasticsearchMaxConnPerHost {
res.MaxConnPerHost = pol.MaxConnPerHost
}
if pol.Timeout != DefaultElasticsearchTimeout {
res.Timeout = pol.Timeout
}
if pol.MaxContentLength != DefaultElasticsearchMaxContentLength {
res.MaxContentLength = pol.MaxContentLength
}
return res
}

// mergeElasticsearchTLS merges the TLS settings received from the fleet-server's policy into the settings the agent passes
func mergeElasticsearchTLS(cfg, pol *tlscommon.Config) *tlscommon.Config {
if cfg == nil && pol == nil {
return nil
} else if cfg == nil && pol != nil {
return pol
} else if cfg != nil && pol == nil {
return cfg
}
res := &tlscommon.Config{
Enabled: cfg.Enabled,
VerificationMode: cfg.VerificationMode,
Versions: cfg.Versions,
CipherSuites: cfg.CipherSuites,
CAs: cfg.CAs,
Certificate: cfg.Certificate,
CurveTypes: cfg.CurveTypes,
Renegotiation: cfg.Renegotiation,
CASha256: cfg.CASha256,
CATrustedFingerprint: cfg.CATrustedFingerprint,
}
if pol.Enabled != nil {
res.Enabled = pol.Enabled
}
if pol.VerificationMode != tlscommon.VerifyFull {
res.VerificationMode = pol.VerificationMode // VerificationMode defaults to VerifyFull
}
if pol.Versions != nil {
res.Versions = pol.Versions
}
if pol.CipherSuites != nil {
res.CipherSuites = pol.CipherSuites
}
if pol.CAs != nil {
res.CAs = pol.CAs
}
if pol.Certificate.Certificate != "" {
res.Certificate = pol.Certificate
}
if pol.CurveTypes != nil {
res.CurveTypes = pol.CurveTypes
}
if pol.Renegotiation != tlscommon.TLSRenegotiationSupport(tls.RenegotiateNever) {
res.Renegotiation = pol.Renegotiation
}
if pol.CASha256 != nil {
res.CASha256 = pol.CASha256
}
if pol.CATrustedFingerprint != "" {
res.CATrustedFingerprint = pol.CATrustedFingerprint
}

return res
}

// Validate validates that only elasticsearch is defined on the output.
func (c *Output) Validate() error {
if c.Extra == nil {
Expand Down
167 changes: 167 additions & 0 deletions internal/pkg/config/output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,170 @@ func setTestEnv(t *testing.T, env map[string]string) {
t.Setenv(k, v)
}
}

func TestMergeElasticsearchFromPolicy(t *testing.T) {
cfg := Elasticsearch{
Protocol: "http",
Hosts: []string{"elasticsearch:9200"},
ServiceToken: "token",
Timeout: time.Second,
MaxRetries: 1,
MaxConnPerHost: 1,
MaxContentLength: 1,
}
tests := []struct {
name string
pol Elasticsearch
res Elasticsearch
}{{
name: "default policy",
pol: Elasticsearch{
Hosts: []string{"localhost:9200"},
Timeout: DefaultElasticsearchTimeout,
MaxRetries: DefaultElasticsearchMaxRetries,
MaxConnPerHost: DefaultElasticsearchMaxConnPerHost,
MaxContentLength: DefaultElasticsearchMaxContentLength,
},
res: Elasticsearch{
Protocol: "http",
Hosts: []string{"elasticsearch:9200"},
ServiceToken: "token",
Timeout: time.Second,
MaxRetries: 1,
MaxConnPerHost: 1,
MaxContentLength: 1,
},
}, {
name: "hosts differ",
pol: Elasticsearch{
Protocol: "https",
Hosts: []string{"elasticsearch:9200", "other:9200"},
Timeout: DefaultElasticsearchTimeout,
MaxRetries: DefaultElasticsearchMaxRetries,
MaxConnPerHost: DefaultElasticsearchMaxConnPerHost,
MaxContentLength: DefaultElasticsearchMaxContentLength,
},
res: Elasticsearch{
Protocol: "https",
Hosts: []string{"elasticsearch:9200", "other:9200"},
ServiceToken: "token",
Timeout: time.Second,
MaxRetries: 1,
MaxConnPerHost: 1,
MaxContentLength: 1,
},
}, {
name: "all non tls attributes differ",
pol: Elasticsearch{
Protocol: "https",
Hosts: []string{"elasticsearch:9200", "other:9200"},
Headers: map[string]string{"custom": "value"},
ProxyURL: "http://proxy:8080",
ProxyDisable: false,
ProxyHeaders: map[string]string{"proxyhead": "proxyval"},
Timeout: time.Second * 2,
MaxRetries: 2,
MaxConnPerHost: 3,
MaxContentLength: 4,
},
res: Elasticsearch{
Protocol: "https",
Hosts: []string{"elasticsearch:9200", "other:9200"},
Headers: map[string]string{"custom": "value"},
ProxyURL: "http://proxy:8080",
ProxyDisable: false,
ProxyHeaders: map[string]string{"proxyhead": "proxyval"},
ServiceToken: "token",
Timeout: 2 * time.Second,
MaxRetries: 2,
MaxConnPerHost: 3,
MaxContentLength: 4,
},
}}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
res := MergeElasticsearchFromPolicy(cfg, tc.pol)
assert.Equal(t, tc.res.Protocol, res.Protocol)
require.Len(t, res.Hosts, len(tc.res.Hosts))
for i, host := range tc.res.Hosts {
assert.Equalf(t, host, res.Hosts[i], "host %d does not match", i)
}
require.Len(t, res.Headers, len(tc.res.Headers))
for k, v := range tc.res.Headers {
assert.Equal(t, v, res.Headers[k])
}
assert.Equal(t, tc.res.ServiceToken, res.ServiceToken)
assert.Equal(t, tc.res.ServiceTokenPath, res.ServiceTokenPath)
assert.Equal(t, tc.res.ProxyURL, res.ProxyURL)
assert.Equal(t, tc.res.ProxyDisable, res.ProxyDisable)
require.Len(t, res.ProxyHeaders, len(tc.res.ProxyHeaders))
for k, v := range tc.res.ProxyHeaders {
assert.Equal(t, v, res.ProxyHeaders[k])
}
assert.Nil(t, res.TLS)
assert.Equal(t, tc.res.MaxRetries, res.MaxRetries)
assert.Equal(t, tc.res.MaxConnPerHost, res.MaxConnPerHost)
assert.Equal(t, tc.res.Timeout, res.Timeout)
assert.Equal(t, tc.res.MaxContentLength, res.MaxContentLength)
})
}
}

func TestMergeElasticsearchTLS(t *testing.T) {
enabled := true
disabled := false
t.Run("both nil", func(t *testing.T) {
res := mergeElasticsearchTLS(nil, nil)
assert.Nil(t, res)
})
t.Run("cfg not nil", func(t *testing.T) {
res := mergeElasticsearchTLS(&tlscommon.Config{
Enabled: &enabled,
VerificationMode: tlscommon.VerifyFull,
}, nil)
require.NotNil(t, res)
assert.True(t, *res.Enabled)
assert.Equal(t, tlscommon.VerifyFull, res.VerificationMode)
})
t.Run("pol not nil", func(t *testing.T) {
res := mergeElasticsearchTLS(nil, &tlscommon.Config{
Enabled: &enabled,
VerificationMode: tlscommon.VerifyFull,
})
require.NotNil(t, res)
assert.True(t, *res.Enabled)
assert.Equal(t, tlscommon.VerifyFull, res.VerificationMode)
})
t.Run("both not nil", func(t *testing.T) {
res := mergeElasticsearchTLS(&tlscommon.Config{
Enabled: &disabled,
VerificationMode: tlscommon.VerifyFull,
}, &tlscommon.Config{
Enabled: &enabled,
VerificationMode: tlscommon.VerifyCertificate,
Versions: []tlscommon.TLSVersion{tlscommon.TLSVersion13},
CipherSuites: []tlscommon.CipherSuite{tlscommon.CipherSuite(tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA)},
CAs: []string{"/path/to/ca.crt"},
Certificate: tlscommon.CertificateConfig{
Certificate: "/path/to/cert.crt",
Key: "/path/to/key.crt",
},
CASha256: []string{"casha256val"},
CATrustedFingerprint: "fingerprint",
})
require.NotNil(t, res)
assert.True(t, *res.Enabled)
assert.Equal(t, tlscommon.VerifyCertificate, res.VerificationMode)
require.Len(t, res.Versions, 1)
assert.Equal(t, tlscommon.TLSVersion13, res.Versions[0])
require.Len(t, res.CipherSuites, 1)
assert.Equal(t, tlscommon.CipherSuite(tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA), res.CipherSuites[0])
require.Len(t, res.CAs, 1)
assert.Equal(t, "/path/to/ca.crt", res.CAs[0])
assert.Equal(t, "/path/to/cert.crt", res.Certificate.Certificate)
assert.Equal(t, "/path/to/key.crt", res.Certificate.Key)
require.Len(t, res.CASha256, 1)
assert.Equal(t, "casha256val", res.CASha256[0])
assert.Equal(t, "fingerprint", res.CATrustedFingerprint)
})
}
Loading

0 comments on commit fe7955b

Please sign in to comment.