Skip to content

Commit

Permalink
[Metricbeat](Etcd-Leader)Followers wont report leader metrics (elasti…
Browse files Browse the repository at this point in the history
…c#12004)

* manage leader metricset so that followers don't report errors nor events
* add debug message when skipping leader events from non leader members
  • Loading branch information
odacremolbap authored and ph committed May 21, 2019
1 parent 983e5a8 commit 60ba703
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add check on object name in the counter path if the instance name is missing {issue}6528[6528] {pull}11878[11878]
- Add AWS cloudwatch metricset. {pull}11798[11798] {issue}11734[11734]
- Add `regions` in aws module config to specify target regions for querying cloudwatch metrics. {issue}11932[11932] {pull}11956[11956]
- Keep `etcd` followers members from reporting `leader` metricset events {pull}12004[12004]

*Packetbeat*

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"message":"not current leader"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"message":"random error message"}
70 changes: 62 additions & 8 deletions metricbeat/module/etcd/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
package leader

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand All @@ -30,6 +37,12 @@ const (
defaultScheme = "http"
defaultPath = "/v2/stats/leader"
apiVersion = "2"

// returned JSON management
msgElement = "message"
msgValueNonLeader = "not current leader"

logSelector = "etcd.leader"
)

var (
Expand All @@ -46,11 +59,15 @@ func init() {
)
}

// MetricSet for etcd.leader
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
http *helper.HTTP
logger *logp.Logger
debugEnabled bool
}

// New etcd.leader metricset object
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := struct{}{}
if err := base.Module().UnpackConfig(&config); err != nil {
Expand All @@ -64,21 +81,58 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
base,
http,
logp.NewLogger(logSelector),
logp.IsDebug(logSelector),
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
content, err := m.http.FetchContent()
res, err := m.http.FetchResponse()
if err != nil {
return errors.Wrap(err, "error fetching response")
}
defer res.Body.Close()

content, err := ioutil.ReadAll(res.Body)
if err != nil {
return errors.Wrap(err, "error in http fetch")
return errors.Wrapf(err, "error reading body response")
}

reporter.Event(mb.Event{
MetricSetFields: eventMapping(content),
ModuleFields: common.MapStr{"api_version": apiVersion},
})
return nil
if res.StatusCode == http.StatusOK {
reporter.Event(mb.Event{
MetricSetFields: eventMapping(content),
ModuleFields: common.MapStr{"api_version": apiVersion},
})
return nil
}

// Errors might be reported as {"message":"<error message>"}
// let's look for that structure
var jsonResponse map[string]interface{}
if err = json.Unmarshal(content, &jsonResponse); err == nil {
if retMessage := jsonResponse[msgElement]; retMessage != "" {
// there is an error message element, let's use it

// If a 403 is returned and {"message":"not current leader"}
// do not consider this an error
// do not report events since this is not a leader
if res.StatusCode == http.StatusForbidden &&
retMessage == msgValueNonLeader {
if m.debugEnabled {
m.logger.Debugf("skipping event for non leader member %q", m.Host())
}
return nil
}

return fmt.Errorf("fetching HTTP response returned status code %d: %s",
res.StatusCode, retMessage)
}
}

// no message in the JSON payload, return standard error
return fmt.Errorf("fetching HTTP response returned status code %d", res.StatusCode)

}
110 changes: 88 additions & 22 deletions metricbeat/module/etcd/leader/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"net/http/httptest"
"path/filepath"
"regexp"

"github.com/stretchr/testify/assert"

Expand All @@ -40,29 +41,94 @@ func TestEventMapping(t *testing.T) {
}

func TestFetchEventContent(t *testing.T) {
absPath, err := filepath.Abs("../_meta/test/")
assert.NoError(t, err)

response, err := ioutil.ReadFile(absPath + "/leaderstats.json")
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json;")
w.Write([]byte(response))
}))
defer server.Close()

config := map[string]interface{}{
"module": "etcd",
"metricsets": []string{"leader"},
"hosts": []string{server.URL},
}
const (
module = "etcd"
metricset = "leader"
mockedFetchLocation = "../_meta/test/"
)

f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)
var testcases = []struct {
name string
mockedFetchFile string
httpCode int

expectedFetchErrorRegexp string
expectedNumEvents int
}{
{
name: "Leader member stats",
mockedFetchFile: "/leaderstats.json",
httpCode: http.StatusOK,
expectedNumEvents: 1,
},
{
name: "Follower member",
mockedFetchFile: "/leaderstats_follower.json",
httpCode: http.StatusForbidden,
expectedNumEvents: 0,
},
{
name: "Simulating credentials issue",
mockedFetchFile: "/leaderstats_empty.json",
httpCode: http.StatusForbidden,
expectedFetchErrorRegexp: "fetching HTTP response returned status code 403",
expectedNumEvents: 0,
},
{
name: "Simulating failure message",
mockedFetchFile: "/leaderstats_internalerror.json",
httpCode: http.StatusInternalServerError,
expectedFetchErrorRegexp: "fetching HTTP response returned status code 500:.+",
expectedNumEvents: 0,
}}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {

absPath, err := filepath.Abs(mockedFetchLocation + tc.mockedFetchFile)
assert.NoError(t, err)

response, err := ioutil.ReadFile(absPath)
assert.NoError(t, err)

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), events[0])
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tc.httpCode)
w.Header().Set("Content-Type", "application/json;")
w.Write([]byte(response))
}))
defer server.Close()

config := map[string]interface{}{
"module": module,
"metricsets": []string{metricset},
"hosts": []string{server.URL},
}

f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)

if tc.expectedFetchErrorRegexp != "" {
for _, err := range errs {
if match, _ := regexp.MatchString(tc.expectedFetchErrorRegexp, err.Error()); match {
// found expected fetch error, no need for further checks
return
}
}
t.Fatalf("Expected fetch error not found:\n Expected:%s\n Got: %+v",
tc.expectedFetchErrorRegexp,
errs)
}

if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}

assert.Equal(t, tc.expectedNumEvents, len(events))

for i := range events {
t.Logf("%s/%s event[%d]: %+v", f.Module().Name(), f.Name(), i, events[i])
}
})
}
}

0 comments on commit 60ba703

Please sign in to comment.