Skip to content

Commit

Permalink
Add default timeouts to metricbeat HTTP helpers (#11032)
Browse files Browse the repository at this point in the history
Set a default request timeout of 10 seconds to metricbeat HTTP helpers
so there are less chances of leaking established connections.

Add a connection timeout that defaults to two seconds so connections
fail fast in case of network connectivity problems.
  • Loading branch information
jsoriano committed Mar 19, 2019
1 parent d5ccedc commit 1cdc88b
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Update haproxy.* fields to map to ECS. {pull}10558[10558] {pull}10568[10568]
- Collect all EC2 meta data from all instances in all states. {pull}10628[10628]
- Migrate docker module to ECS. {pull}10927[10927]
- Add connection and request timeouts for HTTP helper. {pull}11032[11032]
- Add new option `OpMultiplyBuckets` to scale histogram buckets to avoid decimal points in final events {pull}10994[10994]

*Packetbeat*
Expand Down
10 changes: 10 additions & 0 deletions metricbeat/docs/metricbeat-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,16 @@ The username to use for basic authentication.

The password to use for basic authentication.

[float]
==== `connect_timeout`

Total time limit for an HTTP connection to be completed (Default: 2 seconds).

[float]
==== `timeout`

Total time limit for HTTP requests made by the module (Default: 10 seconds).

[float]
==== `ssl`

Expand Down
40 changes: 40 additions & 0 deletions metricbeat/helper/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package helper

import (
"time"

"github.com/elastic/beats/libbeat/common/transport/tlscommon"
)

// Config for an HTTP helper
type Config struct {
TLS *tlscommon.Config `config:"ssl"`
ConnectTimeout time.Duration `config:"connect_timeout"`
Timeout time.Duration `config:"timeout"`
Headers map[string]string `config:"headers"`
BearerTokenFile string `config:"bearer_token_file"`
}

func defaultConfig() Config {
return Config{
ConnectTimeout: 2 * time.Second,
Timeout: 10 * time.Second,
}
}
42 changes: 22 additions & 20 deletions metricbeat/helper/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"io"
"io/ioutil"
"net/http"
"time"

"github.com/pkg/errors"

Expand All @@ -35,26 +34,27 @@ import (
)

type HTTP struct {
base mb.BaseMetricSet
client *http.Client // HTTP client that is reused across requests.
headers map[string]string
uri string
method string
body []byte
hostData mb.HostData
client *http.Client // HTTP client that is reused across requests.
headers map[string]string
name string
uri string
method string
body []byte
}

// NewHTTP creates new http helper
func NewHTTP(base mb.BaseMetricSet) (*HTTP, error) {
config := struct {
TLS *tlscommon.Config `config:"ssl"`
Timeout time.Duration `config:"timeout"`
Headers map[string]string `config:"headers"`
BearerTokenFile string `config:"bearer_token_file"`
}{}
config := defaultConfig()
if err := base.Module().UnpackConfig(&config); err != nil {
return nil, err
}

return newHTTPFromConfig(config, base.Name(), base.HostData())
}

// newHTTPWithConfig creates a new http helper from some configuration
func newHTTPFromConfig(config Config, name string, hostData mb.HostData) (*HTTP, error) {
if config.Headers == nil {
config.Headers = map[string]string{}
}
Expand All @@ -74,14 +74,13 @@ func NewHTTP(base mb.BaseMetricSet) (*HTTP, error) {

var dialer, tlsDialer transport.Dialer

dialer = transport.NetDialer(config.Timeout)
tlsDialer, err = transport.TLSDialer(dialer, tlsConfig, config.Timeout)
dialer = transport.NetDialer(config.ConnectTimeout)
tlsDialer, err = transport.TLSDialer(dialer, tlsConfig, config.ConnectTimeout)
if err != nil {
return nil, err
}

return &HTTP{
base: base,
client: &http.Client{
Transport: &http.Transport{
Dial: dialer.Dial,
Expand All @@ -91,7 +90,7 @@ func NewHTTP(base mb.BaseMetricSet) (*HTTP, error) {
},
headers: config.Headers,
method: "GET",
uri: base.HostData().SanitizedURI,
uri: hostData.SanitizedURI,
body: nil,
}, nil
}
Expand All @@ -107,8 +106,11 @@ func (h *HTTP) FetchResponse() (*http.Response, error) {
}

req, err := http.NewRequest(h.method, h.uri, reader)
if h.base.HostData().User != "" || h.base.HostData().Password != "" {
req.SetBasicAuth(h.base.HostData().User, h.base.HostData().Password)
if err != nil {
return nil, errors.Wrap(err, "failed to create HTTP request")
}
if h.hostData.User != "" || h.hostData.Password != "" {
req.SetBasicAuth(h.hostData.User, h.hostData.Password)
}

for k, v := range h.headers {
Expand Down Expand Up @@ -157,7 +159,7 @@ func (h *HTTP) FetchContent() ([]byte, error) {
defer resp.Body.Close()

if resp.StatusCode != 200 {
return nil, fmt.Errorf("HTTP error %d in %s: %s", resp.StatusCode, h.base.Name(), resp.Status)
return nil, fmt.Errorf("HTTP error %d in %s: %s", resp.StatusCode, h.name, resp.Status)
}

return ioutil.ReadAll(resp.Body)
Expand Down
60 changes: 60 additions & 0 deletions metricbeat/helper/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@ package helper

import (
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/metricbeat/mb"
)

func TestGetAuthHeaderFromToken(t *testing.T) {
Expand Down Expand Up @@ -69,3 +75,57 @@ func TestGetAuthHeaderFromTokenNoFile(t *testing.T) {
assert.Equal(t, "", header)
assert.Error(t, err)
}

func TestTimeout(t *testing.T) {
c := make(chan struct{})
defer close(c)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
<-c
}))
defer ts.Close()

cfg := defaultConfig()
cfg.Timeout = 1 * time.Nanosecond
hostData := mb.HostData{
URI: ts.URL,
SanitizedURI: ts.URL,
}

h, err := newHTTPFromConfig(cfg, "test", hostData)
require.NoError(t, err)

checkTimeout(t, h)
}

func TestConnectTimeout(t *testing.T) {
// This IP shouldn't exist, 192.0.2.0/24 is reserved for testing
uri := "http://192.0.2.42"
cfg := defaultConfig()
cfg.ConnectTimeout = 1 * time.Nanosecond
hostData := mb.HostData{
URI: uri,
SanitizedURI: uri,
}

h, err := newHTTPFromConfig(cfg, "test", hostData)
require.NoError(t, err)

checkTimeout(t, h)
}

func checkTimeout(t *testing.T, h *HTTP) {
t.Helper()

done := make(chan struct{})
go func() {
_, err := h.FetchResponse()
assert.Error(t, err)
done <- struct{}{}
}()

select {
case <-done:
case <-time.After(1 * time.Second):
t.Fatal("timeout should have happened time ago")
}
}

0 comments on commit 1cdc88b

Please sign in to comment.