Skip to content

Commit

Permalink
[Elastic Agent] Add support for multiple hosts in connection to kibana (
Browse files Browse the repository at this point in the history
#19628)

* Add ability for multiple hosts to be defined for agents connection to kibana.

* Fix reference.

* Add changelog.
  • Loading branch information
blakerouse authored Jul 9, 2020
1 parent fb53a8a commit 57c4280
Show file tree
Hide file tree
Showing 11 changed files with 228 additions and 69 deletions.
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,4 @@
- Rename input.type logs to logfile {pull}19360[19360]
- Agent now installs/uninstalls Elastic Endpoint {pull}19248[19248]
- Agent now downloads Elastic Endpoint {pull}19503[19503]
- Agent now load balances across multiple Kibana instances {pull}19628[19628]
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/_meta/config/common.p2.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/_meta/elastic-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/elastic-agent.docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/elastic-agent.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
2 changes: 1 addition & 1 deletion x-pack/elastic-agent/elastic-agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ inputs:
# access_token: ""
# kibana:
# # kibana minimal configuration
# host: "localhost:5601"
# hosts: ["localhost:5601"]
# ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

# # optional values
Expand Down
184 changes: 123 additions & 61 deletions x-pack/elastic-agent/pkg/kibana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"strings"
"sync"
"time"

"github.com/pkg/errors"
Expand All @@ -24,20 +25,19 @@ import (
const (
kibanaPort = 5601
kibanaHTTPSPort = 443

kibanaRetryOnBadConnTimeout = 5 * time.Minute
)

type requestFunc func(string, string, url.Values, io.Reader) (*http.Request, error)
type wrapperFunc func(rt http.RoundTripper) (http.RoundTripper, error)

type clienter interface {
Send(
method string,
path string,
params url.Values,
headers http.Header,
body io.Reader,
) (*http.Response, error)
Close() error
type requestClient struct {
request requestFunc
client http.Client
lastUsed time.Time
lastErr error
lastErrOcc time.Time
}

// Client wraps an http.Client and takes care of making the raw calls to kibana, the client should
Expand All @@ -46,27 +46,11 @@ type clienter interface {
// implementations that will take care of the boiler plates.
type Client struct {
log *logger.Logger
request requestFunc
client http.Client
lock sync.Mutex
clients []*requestClient
config *Config
}

// New creates new Kibana API client.
func New(
log *logger.Logger,
factory requestFunc,
cfg *Config,
httpClient http.Client,
) (*Client, error) {
c := &Client{
log: log,
request: factory,
client: httpClient,
config: cfg,
}
return c, nil
}

// NewConfigFromURL returns a Kibana Config based on a received host.
func NewConfigFromURL(kURL string) (*Config, error) {
u, err := url.Parse(kURL)
Expand Down Expand Up @@ -112,29 +96,6 @@ func NewWithRawConfig(log *logger.Logger, config *config.Config, wrapper wrapper

// NewWithConfig takes a Kibana Config and return a client.
func NewWithConfig(log *logger.Logger, cfg *Config, wrapper wrapperFunc) (*Client, error) {
var transport http.RoundTripper
transport, err := makeTransport(cfg.Timeout, cfg.TLS)
if err != nil {
return nil, err
}

if cfg.IsBasicAuth() {
// Pass basic auth credentials to all the underlying calls.
transport = NewBasicAuthRoundTripper(transport, cfg.Username, cfg.Password)
}

if wrapper != nil {
transport, err = wrapper(transport)
if err != nil {
return nil, errors.Wrap(err, "fail to create transport client")
}
}

httpClient := http.Client{
Transport: transport,
Timeout: cfg.Timeout,
}

// Normalize the URL with the path any spaces configured.
var p string
if len(cfg.SpaceID) > 0 {
Expand All @@ -152,22 +113,53 @@ func NewWithConfig(log *logger.Logger, cfg *Config, wrapper wrapperFunc) (*Clien
usedDefaultPort = kibanaHTTPSPort
}

kibanaURL, err := common.MakeURL(string(cfg.Protocol), p, cfg.Host, usedDefaultPort)
if err != nil {
return nil, errors.Wrap(err, "invalid Kibana endpoint")
hosts := cfg.GetHosts()
clients := make([]*requestClient, len(hosts))
for i, host := range cfg.GetHosts() {
var transport http.RoundTripper
transport, err := makeTransport(cfg.Timeout, cfg.TLS)
if err != nil {
return nil, err
}

if cfg.IsBasicAuth() {
// Pass basic auth credentials to all the underlying calls.
transport = NewBasicAuthRoundTripper(transport, cfg.Username, cfg.Password)
}

if wrapper != nil {
transport, err = wrapper(transport)
if err != nil {
return nil, errors.Wrap(err, "fail to create transport client")
}
}

httpClient := http.Client{
Transport: transport,
Timeout: cfg.Timeout,
}

kibanaURL, err := common.MakeURL(string(cfg.Protocol), p, host, usedDefaultPort)
if err != nil {
return nil, errors.Wrap(err, "invalid Kibana endpoint")
}
clients[i] = &requestClient{
request: prefixRequestFactory(kibanaURL),
client: httpClient,
}
}

return New(log, prefixRequestFactory(kibanaURL), cfg, httpClient)
return new(log, cfg, clients...)
}

// Send executes a direct calls agains't the Kibana API, the method will takes cares of cloning
// Send executes a direct calls against the Kibana API, the method will takes cares of cloning
// also add necessary headers for Kibana likes: "Content-Type", "Accept", and "kbn-xsrf".
// No assumptions is done on the response concerning the received format, this will be the responsability
// No assumptions is done on the response concerning the received format, this will be the responsibility
// of the implementation to correctly unpack any received data.
//
// NOTE:
// - The caller of this method is free to overrides any values found in the headers.
// - The magic of unpack kibana errors is not done in the Send method, an helper methods is provided.
// - The caller of this method is free to override any value found in the headers.
// - The magic of unpack kibana errors is not done in the Send method, a helper method is provided.
func (c *Client) Send(
ctx context.Context,
method, path string,
Expand All @@ -176,8 +168,11 @@ func (c *Client) Send(
body io.Reader,
) (*http.Response, error) {
c.log.Debugf("Request method: %s, path: %s", method, path)
c.lock.Lock()
defer c.lock.Unlock()
requester := c.nextRequester()

req, err := c.request(method, path, params, body)
req, err := requester.request(method, path, params, body)
if err != nil {
return nil, errors.Wrapf(err, "fail to create HTTP request using method %s to %s", method, path)
}
Expand All @@ -195,12 +190,79 @@ func (c *Client) Send(
}
}

return c.client.Do(req.WithContext(ctx))
requester.lastUsed = time.Now().UTC()
resp, err := requester.client.Do(req.WithContext(ctx))
if err != nil {
requester.lastErr = err
requester.lastErrOcc = time.Now().UTC()
} else {
requester.lastErr = nil
requester.lastErrOcc = time.Time{}
}
return resp, err
}

// URI returns the remote URI.
func (c *Client) URI() string {
return string(c.config.Protocol) + "://" + c.config.Host + "/" + c.config.Path
host := c.config.GetHosts()[0]
return string(c.config.Protocol) + "://" + host + "/" + c.config.Path
}

// new creates new Kibana API client.
func new(
log *logger.Logger,
cfg *Config,
httpClients ...*requestClient,
) (*Client, error) {
c := &Client{
log: log,
clients: httpClients,
config: cfg,
}
return c, nil
}

// nextRequester returns the requester to use.
//
// It excludes clients that have errored in the last 5 minutes.
func (c *Client) nextRequester() *requestClient {
var selected *requestClient

now := time.Now().UTC()
for _, requester := range c.clients {
if requester.lastErr != nil && now.Sub(requester.lastErrOcc) > kibanaRetryOnBadConnTimeout {
requester.lastErr = nil
requester.lastErrOcc = time.Time{}
}
if requester.lastErr != nil {
continue
}
if requester.lastUsed.IsZero() {
// never been used, instant winner!
selected = requester
break
}
if selected == nil {
selected = requester
continue
}
if requester.lastUsed.Before(selected.lastUsed) {
selected = requester
}
}
if selected == nil {
// all are erroring; select the oldest one that errored
for _, requester := range c.clients {
if selected == nil {
selected = requester
continue
}
if requester.lastErrOcc.Before(selected.lastErrOcc) {
selected = requester
}
}
}
return selected
}

func prefixRequestFactory(URL string) requestFunc {
Expand Down
Loading

0 comments on commit 57c4280

Please sign in to comment.