diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index edd32bcca81..75d06defc4d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -99,6 +99,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Update haproxy.* fields to map to ECS. {pull}10558[10558] {pull}10568[10568] - Collect all EC2 meta data from all instances in all states. {pull}10628[10628] - Migrate docker module to ECS. {pull}10927[10927] +- Add connection and request timeouts for HTTP helper. {pull}11032[11032] - Add new option `OpMultiplyBuckets` to scale histogram buckets to avoid decimal points in final events {pull}10994[10994] *Packetbeat* diff --git a/metricbeat/docs/metricbeat-options.asciidoc b/metricbeat/docs/metricbeat-options.asciidoc index 10e75d025e6..6b0d39fbc74 100644 --- a/metricbeat/docs/metricbeat-options.asciidoc +++ b/metricbeat/docs/metricbeat-options.asciidoc @@ -224,6 +224,16 @@ The username to use for basic authentication. The password to use for basic authentication. +[float] +==== `connect_timeout` + +Total time limit for an HTTP connection to be completed (Default: 2 seconds). + +[float] +==== `timeout` + +Total time limit for HTTP requests made by the module (Default: 10 seconds). + [float] ==== `ssl` diff --git a/metricbeat/helper/config.go b/metricbeat/helper/config.go new file mode 100644 index 00000000000..1c4c8fd6ddc --- /dev/null +++ b/metricbeat/helper/config.go @@ -0,0 +1,40 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package helper + +import ( + "time" + + "github.com/elastic/beats/libbeat/common/transport/tlscommon" +) + +// Config for an HTTP helper +type Config struct { + TLS *tlscommon.Config `config:"ssl"` + ConnectTimeout time.Duration `config:"connect_timeout"` + Timeout time.Duration `config:"timeout"` + Headers map[string]string `config:"headers"` + BearerTokenFile string `config:"bearer_token_file"` +} + +func defaultConfig() Config { + return Config{ + ConnectTimeout: 2 * time.Second, + Timeout: 10 * time.Second, + } +} diff --git a/metricbeat/helper/http.go b/metricbeat/helper/http.go index b01fa781885..1f738b97d18 100644 --- a/metricbeat/helper/http.go +++ b/metricbeat/helper/http.go @@ -25,7 +25,6 @@ import ( "io" "io/ioutil" "net/http" - "time" "github.com/pkg/errors" @@ -35,26 +34,27 @@ import ( ) type HTTP struct { - base mb.BaseMetricSet - client *http.Client // HTTP client that is reused across requests. - headers map[string]string - uri string - method string - body []byte + hostData mb.HostData + client *http.Client // HTTP client that is reused across requests. + headers map[string]string + name string + uri string + method string + body []byte } // NewHTTP creates new http helper func NewHTTP(base mb.BaseMetricSet) (*HTTP, error) { - config := struct { - TLS *tlscommon.Config `config:"ssl"` - Timeout time.Duration `config:"timeout"` - Headers map[string]string `config:"headers"` - BearerTokenFile string `config:"bearer_token_file"` - }{} + config := defaultConfig() if err := base.Module().UnpackConfig(&config); err != nil { return nil, err } + return newHTTPFromConfig(config, base.Name(), base.HostData()) +} + +// newHTTPWithConfig creates a new http helper from some configuration +func newHTTPFromConfig(config Config, name string, hostData mb.HostData) (*HTTP, error) { if config.Headers == nil { config.Headers = map[string]string{} } @@ -74,14 +74,13 @@ func NewHTTP(base mb.BaseMetricSet) (*HTTP, error) { var dialer, tlsDialer transport.Dialer - dialer = transport.NetDialer(config.Timeout) - tlsDialer, err = transport.TLSDialer(dialer, tlsConfig, config.Timeout) + dialer = transport.NetDialer(config.ConnectTimeout) + tlsDialer, err = transport.TLSDialer(dialer, tlsConfig, config.ConnectTimeout) if err != nil { return nil, err } return &HTTP{ - base: base, client: &http.Client{ Transport: &http.Transport{ Dial: dialer.Dial, @@ -91,7 +90,7 @@ func NewHTTP(base mb.BaseMetricSet) (*HTTP, error) { }, headers: config.Headers, method: "GET", - uri: base.HostData().SanitizedURI, + uri: hostData.SanitizedURI, body: nil, }, nil } @@ -107,8 +106,11 @@ func (h *HTTP) FetchResponse() (*http.Response, error) { } req, err := http.NewRequest(h.method, h.uri, reader) - if h.base.HostData().User != "" || h.base.HostData().Password != "" { - req.SetBasicAuth(h.base.HostData().User, h.base.HostData().Password) + if err != nil { + return nil, errors.Wrap(err, "failed to create HTTP request") + } + if h.hostData.User != "" || h.hostData.Password != "" { + req.SetBasicAuth(h.hostData.User, h.hostData.Password) } for k, v := range h.headers { @@ -157,7 +159,7 @@ func (h *HTTP) FetchContent() ([]byte, error) { defer resp.Body.Close() if resp.StatusCode != 200 { - return nil, fmt.Errorf("HTTP error %d in %s: %s", resp.StatusCode, h.base.Name(), resp.Status) + return nil, fmt.Errorf("HTTP error %d in %s: %s", resp.StatusCode, h.name, resp.Status) } return ioutil.ReadAll(resp.Body) diff --git a/metricbeat/helper/http_test.go b/metricbeat/helper/http_test.go index a3e5d615e4d..aee80166fa2 100644 --- a/metricbeat/helper/http_test.go +++ b/metricbeat/helper/http_test.go @@ -19,10 +19,16 @@ package helper import ( "io/ioutil" + "net/http" + "net/http/httptest" "os" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/beats/metricbeat/mb" ) func TestGetAuthHeaderFromToken(t *testing.T) { @@ -69,3 +75,57 @@ func TestGetAuthHeaderFromTokenNoFile(t *testing.T) { assert.Equal(t, "", header) assert.Error(t, err) } + +func TestTimeout(t *testing.T) { + c := make(chan struct{}) + defer close(c) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-c + })) + defer ts.Close() + + cfg := defaultConfig() + cfg.Timeout = 1 * time.Nanosecond + hostData := mb.HostData{ + URI: ts.URL, + SanitizedURI: ts.URL, + } + + h, err := newHTTPFromConfig(cfg, "test", hostData) + require.NoError(t, err) + + checkTimeout(t, h) +} + +func TestConnectTimeout(t *testing.T) { + // This IP shouldn't exist, 192.0.2.0/24 is reserved for testing + uri := "http://192.0.2.42" + cfg := defaultConfig() + cfg.ConnectTimeout = 1 * time.Nanosecond + hostData := mb.HostData{ + URI: uri, + SanitizedURI: uri, + } + + h, err := newHTTPFromConfig(cfg, "test", hostData) + require.NoError(t, err) + + checkTimeout(t, h) +} + +func checkTimeout(t *testing.T, h *HTTP) { + t.Helper() + + done := make(chan struct{}) + go func() { + _, err := h.FetchResponse() + assert.Error(t, err) + done <- struct{}{} + }() + + select { + case <-done: + case <-time.After(1 * time.Second): + t.Fatal("timeout should have happened time ago") + } +}