Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat](Etcd-Leader)Followers wont report leader metrics #12004

Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add Filebeat envoyproxy module. {pull}11700[11700]
- Add apache2(httpd) log path (`/var/log/httpd`) to make apache2 module work out of the box on Redhat-family OSes. {issue}11887[11887] {pull}11888[11888]
- Add support to new MongoDB additional diagnostic information {pull}11952[11952]
-
kaiyan-sheng marked this conversation as resolved.
Show resolved Hide resolved

*Heartbeat*

Expand All @@ -178,6 +179,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add check on object name in the counter path if the instance name is missing {issue}6528[6528] {pull}11878[11878]
- Add AWS cloudwatch metricset. {pull}11798[11798] {issue}11734[11734]
- Add `regions` in aws module config to specify target regions for querying cloudwatch metrics. {issue}11932[11932] {pull}11956[11956]
- Keep `etcd` followers members from reporting `leader` metricset events {pull}12004[12004]

*Packetbeat*

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"message":"not current leader"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"message":"random error message"}
70 changes: 62 additions & 8 deletions metricbeat/module/etcd/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
package leader

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"

"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand All @@ -30,6 +37,12 @@ const (
defaultScheme = "http"
defaultPath = "/v2/stats/leader"
apiVersion = "2"

// returned JSON management
msgElement = "message"
msgValueNonLeader = "not current leader"

logSelector = "etcd.leader"
)

var (
Expand All @@ -46,11 +59,15 @@ func init() {
)
}

// MetricSet for etcd.leader
type MetricSet struct {
mb.BaseMetricSet
http *helper.HTTP
http *helper.HTTP
logger *logp.Logger
debugEnabled bool
}

// New etcd.leader metricset object
func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
config := struct{}{}
if err := base.Module().UnpackConfig(&config); err != nil {
Expand All @@ -64,21 +81,58 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
return &MetricSet{
base,
http,
logp.NewLogger(logSelector),
logp.IsDebug(logSelector),
}, nil
}

// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
content, err := m.http.FetchContent()
res, err := m.http.FetchResponse()
if err != nil {
return errors.Wrap(err, "error fetching response")
}
defer res.Body.Close()

content, err := ioutil.ReadAll(res.Body)
if err != nil {
return errors.Wrap(err, "error in http fetch")
return errors.Wrapf(err, "error reading body response")
}

reporter.Event(mb.Event{
MetricSetFields: eventMapping(content),
ModuleFields: common.MapStr{"api_version": apiVersion},
})
return nil
if res.StatusCode == http.StatusOK {
reporter.Event(mb.Event{
MetricSetFields: eventMapping(content),
ModuleFields: common.MapStr{"api_version": apiVersion},
})
return nil
}

// Errors might be reported as {"message":"<error message>"}
// let's look for that structure
var jsonResponse map[string]interface{}
if err = json.Unmarshal(content, &jsonResponse); err == nil {
if retMessage := jsonResponse[msgElement]; retMessage != "" {
// there is an error message element, let's use it

// If a 403 is returned and {"message":"not current leader"}
// do not consider this an error
// do not report events since this is not a leader
if res.StatusCode == http.StatusForbidden &&
retMessage == msgValueNonLeader {
if m.debugEnabled {
m.logger.Debugf("skipping event for non leader member %q", m.Host())
}
return nil
Copy link
Contributor

@exekias exekias May 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a debug message? people may hit this without knowing what's going on

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on a debug/info level log here 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks for the review.

I'm ok with all debug info, but let me clarify. An etcd cluster is usually 3 or 5 instances, only one of those instances will show an event, not the other 2, we can consider that if statement above the normal path.
Not an error, not an unexpected path.

I'll add a Debug log in there if that's ok with you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's not an error, still some users may not understand why they are not getting metrics from a node where this module is running 👍, debug log will help them understand that

}

return fmt.Errorf("fetching HTTP response returned status code %d: %s",
res.StatusCode, retMessage)
}
}

// no message in the JSON payload, return standard error
return fmt.Errorf("fetching HTTP response returned status code %d", res.StatusCode)

}
110 changes: 88 additions & 22 deletions metricbeat/module/etcd/leader/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"net/http/httptest"
"path/filepath"
"regexp"

"github.com/stretchr/testify/assert"

Expand All @@ -40,29 +41,94 @@ func TestEventMapping(t *testing.T) {
}

func TestFetchEventContent(t *testing.T) {
absPath, err := filepath.Abs("../_meta/test/")
assert.NoError(t, err)

response, err := ioutil.ReadFile(absPath + "/leaderstats.json")
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json;")
w.Write([]byte(response))
}))
defer server.Close()

config := map[string]interface{}{
"module": "etcd",
"metricsets": []string{"leader"},
"hosts": []string{server.URL},
}
const (
module = "etcd"
metricset = "leader"
mockedFetchLocation = "../_meta/test/"
)

f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)
var testcases = []struct {
name string
mockedFetchFile string
httpCode int

expectedFetchErrorRegexp string
expectedNumEvents int
}{
{
name: "Leader member stats",
mockedFetchFile: "/leaderstats.json",
httpCode: http.StatusOK,
expectedNumEvents: 1,
},
{
name: "Follower member",
mockedFetchFile: "/leaderstats_follower.json",
httpCode: http.StatusForbidden,
expectedNumEvents: 0,
},
{
name: "Simulating credentials issue",
mockedFetchFile: "/leaderstats_empty.json",
httpCode: http.StatusForbidden,
expectedFetchErrorRegexp: "fetching HTTP response returned status code 403",
expectedNumEvents: 0,
},
{
name: "Simulating failure message",
mockedFetchFile: "/leaderstats_internalerror.json",
httpCode: http.StatusInternalServerError,
expectedFetchErrorRegexp: "fetching HTTP response returned status code 500:.+",
expectedNumEvents: 0,
}}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {

absPath, err := filepath.Abs(mockedFetchLocation + tc.mockedFetchFile)
assert.NoError(t, err)

response, err := ioutil.ReadFile(absPath)
assert.NoError(t, err)

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), events[0])
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tc.httpCode)
w.Header().Set("Content-Type", "application/json;")
w.Write([]byte(response))
}))
defer server.Close()

config := map[string]interface{}{
"module": module,
"metricsets": []string{metricset},
"hosts": []string{server.URL},
}

f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)

if tc.expectedFetchErrorRegexp != "" {
for _, err := range errs {
if match, _ := regexp.MatchString(tc.expectedFetchErrorRegexp, err.Error()); match {
// found expected fetch error, no need for further checks
return
}
}
t.Fatalf("Expected fetch error not found:\n Expected:%s\n Got: %+v",
tc.expectedFetchErrorRegexp,
errs)
}

if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}

assert.Equal(t, tc.expectedNumEvents, len(events))

for i := range events {
t.Logf("%s/%s event[%d]: %+v", f.Module().Name(), f.Name(), i, events[i])
}
})
}
}