Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Add support for multiple hosts in connection to kibana #19628

Merged
merged 3 commits into from
Jul 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@
- Rename input.type logs to logfile {pull}19360[19360]
- Agent now installs/uninstalls Elastic Endpoint {pull}19248[19248]
- Agent now downloads Elastic Endpoint {pull}19503[19503]
- Agent now load balances across multiple Kibana instances {pull}19628[19628]
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/_meta/elastic-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/elastic-agent.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/elastic-agent.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/elastic-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
184 changes: 123 additions & 61 deletions x-pack/elastic-agent/pkg/kibana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -24,20 +25,19 @@ import (
const (
kibanaPort = 5601
kibanaHTTPSPort = 443

kibanaRetryOnBadConnTimeout = 5 * time.Minute
)

type requestFunc func(string, string, url.Values, io.Reader) (*http.Request, error)
type wrapperFunc func(rt http.RoundTripper) (http.RoundTripper, error)

type clienter interface {
Send(
method string,
path string,
params url.Values,
headers http.Header,
body io.Reader,
) (*http.Response, error)
Close() error
type requestClient struct {
request requestFunc
client http.Client
lastUsed time.Time
lastErr error
lastErrOcc time.Time
}

// Client wraps an http.Client and takes care of making the raw calls to kibana, the client should
Expand All @@ -46,27 +46,11 @@ type clienter interface {
// implementations that will take care of the boiler plates.
type Client struct {
log *logger.Logger
request requestFunc
client http.Client
lock sync.Mutex
clients []*requestClient
config *Config
}

// New creates new Kibana API client.
func New(
log *logger.Logger,
factory requestFunc,
cfg *Config,
httpClient http.Client,
) (*Client, error) {
c := &Client{
log: log,
request: factory,
client: httpClient,
config: cfg,
}
return c, nil
}

// NewConfigFromURL returns a Kibana Config based on a received host.
func NewConfigFromURL(kURL string) (*Config, error) {
u, err := url.Parse(kURL)
Expand Down Expand Up @@ -112,29 +96,6 @@ func NewWithRawConfig(log *logger.Logger, config *config.Config, wrapper wrapper

// NewWithConfig takes a Kibana Config and return a client.
func NewWithConfig(log *logger.Logger, cfg *Config, wrapper wrapperFunc) (*Client, error) {
var transport http.RoundTripper
transport, err := makeTransport(cfg.Timeout, cfg.TLS)
if err != nil {
return nil, err
}

if cfg.IsBasicAuth() {
// Pass basic auth credentials to all the underlying calls.
transport = NewBasicAuthRoundTripper(transport, cfg.Username, cfg.Password)
}

if wrapper != nil {
transport, err = wrapper(transport)
if err != nil {
return nil, errors.Wrap(err, "fail to create transport client")
}
}

httpClient := http.Client{
Transport: transport,
Timeout: cfg.Timeout,
}

// Normalize the URL with the path any spaces configured.
var p string
if len(cfg.SpaceID) > 0 {
Expand All @@ -152,22 +113,53 @@ func NewWithConfig(log *logger.Logger, cfg *Config, wrapper wrapperFunc) (*Clien
usedDefaultPort = kibanaHTTPSPort
}

kibanaURL, err := common.MakeURL(string(cfg.Protocol), p, cfg.Host, usedDefaultPort)
if err != nil {
return nil, errors.Wrap(err, "invalid Kibana endpoint")
hosts := cfg.GetHosts()
clients := make([]*requestClient, len(hosts))
for i, host := range cfg.GetHosts() {
var transport http.RoundTripper
transport, err := makeTransport(cfg.Timeout, cfg.TLS)
if err != nil {
return nil, err
}

if cfg.IsBasicAuth() {
// Pass basic auth credentials to all the underlying calls.
transport = NewBasicAuthRoundTripper(transport, cfg.Username, cfg.Password)
}

if wrapper != nil {
transport, err = wrapper(transport)
if err != nil {
return nil, errors.Wrap(err, "fail to create transport client")
}
}

httpClient := http.Client{
Transport: transport,
Timeout: cfg.Timeout,
}

kibanaURL, err := common.MakeURL(string(cfg.Protocol), p, host, usedDefaultPort)
if err != nil {
return nil, errors.Wrap(err, "invalid Kibana endpoint")
}
clients[i] = &requestClient{
request: prefixRequestFactory(kibanaURL),
client: httpClient,
}
}

return New(log, prefixRequestFactory(kibanaURL), cfg, httpClient)
return new(log, cfg, clients...)
}

// Send executes a direct calls agains't the Kibana API, the method will takes cares of cloning
// Send executes a direct calls against the Kibana API, the method will takes cares of cloning
// also add necessary headers for Kibana likes: "Content-Type", "Accept", and "kbn-xsrf".
// No assumptions is done on the response concerning the received format, this will be the responsability
// No assumptions is done on the response concerning the received format, this will be the responsibility
// of the implementation to correctly unpack any received data.
//
// NOTE:
// - The caller of this method is free to overrides any values found in the headers.
// - The magic of unpack kibana errors is not done in the Send method, an helper methods is provided.
// - The caller of this method is free to override any value found in the headers.
// - The magic of unpack kibana errors is not done in the Send method, a helper method is provided.
func (c *Client) Send(
ctx context.Context,
method, path string,
Expand All @@ -176,8 +168,11 @@ func (c *Client) Send(
body io.Reader,
) (*http.Response, error) {
c.log.Debugf("Request method: %s, path: %s", method, path)
c.lock.Lock()
defer c.lock.Unlock()
requester := c.nextRequester()

req, err := c.request(method, path, params, body)
req, err := requester.request(method, path, params, body)
if err != nil {
return nil, errors.Wrapf(err, "fail to create HTTP request using method %s to %s", method, path)
}
Expand All @@ -195,12 +190,79 @@ func (c *Client) Send(
}
}

return c.client.Do(req.WithContext(ctx))
requester.lastUsed = time.Now().UTC()
resp, err := requester.client.Do(req.WithContext(ctx))
if err != nil {
requester.lastErr = err
requester.lastErrOcc = time.Now().UTC()
} else {
requester.lastErr = nil
requester.lastErrOcc = time.Time{}
}
return resp, err
}

// URI returns the remote URI.
func (c *Client) URI() string {
return string(c.config.Protocol) + "://" + c.config.Host + "/" + c.config.Path
host := c.config.GetHosts()[0]
return string(c.config.Protocol) + "://" + host + "/" + c.config.Path
}

// new creates new Kibana API client.
func new(
log *logger.Logger,
cfg *Config,
httpClients ...*requestClient,
) (*Client, error) {
c := &Client{
log: log,
clients: httpClients,
config: cfg,
}
return c, nil
}

// nextRequester returns the requester to use.
//
// It excludes clients that have errored in the last 5 minutes.
func (c *Client) nextRequester() *requestClient {
var selected *requestClient

now := time.Now().UTC()
for _, requester := range c.clients {
if requester.lastErr != nil && now.Sub(requester.lastErrOcc) > kibanaRetryOnBadConnTimeout {
requester.lastErr = nil
requester.lastErrOcc = time.Time{}
}
if requester.lastErr != nil {
continue
}
if requester.lastUsed.IsZero() {
// never been used, instant winner!
selected = requester
break
}
if selected == nil {
selected = requester
continue
}
if requester.lastUsed.Before(selected.lastUsed) {
selected = requester
}
}
if selected == nil {
// all are erroring; select the oldest one that errored
for _, requester := range c.clients {
if selected == nil {
selected = requester
continue
}
if requester.lastErrOcc.Before(selected.lastErrOcc) {
selected = requester
}
}
}
return selected
}

func prefixRequestFactory(URL string) requestFunc {
Expand Down
Loading