Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Metricbeat](Etcd-Leader)Followers wont report leader metrics #12004

Merged
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"message":"not current leader"}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"message":"random error message"}
55 changes: 48 additions & 7 deletions metricbeat/module/etcd/leader/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
package leader

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
Expand All @@ -30,6 +35,10 @@ const (
defaultScheme = "http"
defaultPath = "/v2/stats/leader"
apiVersion = "2"

// returned JSON management
msgElement = "message"
msgValueNonLeader = "not current leader"
)

var (
Expand Down Expand Up @@ -71,14 +80,46 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
content, err := m.http.FetchContent()
res, err := m.http.FetchResponse()
if err != nil {
return errors.Wrap(err, "error fetching response")
}
defer res.Body.Close()

content, err := ioutil.ReadAll(res.Body)
if err != nil {
return errors.Wrap(err, "error in http fetch")
return errors.Wrapf(err, "error reading body response")
}

if res.StatusCode == http.StatusOK {
reporter.Event(mb.Event{
MetricSetFields: eventMapping(content),
ModuleFields: common.MapStr{"api_version": apiVersion},
})
return nil
}

// Errors might be reported as {"message":"<error message>"}
// let's look for that structure
var jsonResponse map[string]interface{}
if err = json.Unmarshal(content, &jsonResponse); err == nil {
if retMessage := jsonResponse[msgElement]; retMessage != "" {
// there is an error message element, let's use it

// If a 403 is returned and {"message":"not current leader"}
// do not consider this an error
// do not report events since this is not a leader
if res.StatusCode == http.StatusForbidden &&
retMessage == msgValueNonLeader {
return nil
Copy link
Contributor

@exekias exekias May 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a debug message? people may hit this without knowing what's going on

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree on a debug/info level log here 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks for the review.

I'm ok with all debug info, but let me clarify. An etcd cluster is usually 3 or 5 instances, only one of those instances will show an event, not the other 2, we can consider that if statement above the normal path.
Not an error, not an unexpected path.

I'll add a Debug log in there if that's ok with you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's not an error, still some users may not understand why they are not getting metrics from a node where this module is running 👍, debug log will help them understand that

}

return fmt.Errorf("fetching HTTP response returned status code %d: %s",
res.StatusCode, retMessage)
}
}

reporter.Event(mb.Event{
MetricSetFields: eventMapping(content),
ModuleFields: common.MapStr{"api_version": apiVersion},
})
return nil
// no message in the JSON payload, return standard error
return fmt.Errorf("fetching HTTP response returned status code %d", res.StatusCode)

}
110 changes: 88 additions & 22 deletions metricbeat/module/etcd/leader/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"net/http"
"net/http/httptest"
"path/filepath"
"regexp"

"github.com/stretchr/testify/assert"

Expand All @@ -40,29 +41,94 @@ func TestEventMapping(t *testing.T) {
}

func TestFetchEventContent(t *testing.T) {
absPath, err := filepath.Abs("../_meta/test/")
assert.NoError(t, err)

response, err := ioutil.ReadFile(absPath + "/leaderstats.json")
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
w.Header().Set("Content-Type", "application/json;")
w.Write([]byte(response))
}))
defer server.Close()

config := map[string]interface{}{
"module": "etcd",
"metricsets": []string{"leader"},
"hosts": []string{server.URL},
}
const (
module = "etcd"
metricset = "leader"
mockedFetchLocation = "../_meta/test/"
)

f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
assert.NotEmpty(t, events)
var testcases = []struct {
name string
mockedFetchFile string
httpCode int

expectedFetchErrorRegexp string
expectedNumEvents int
}{
{
name: "Leader member stats",
mockedFetchFile: "/leaderstats.json",
httpCode: http.StatusOK,
expectedNumEvents: 1,
},
{
name: "Follower member",
mockedFetchFile: "/leaderstats_follower.json",
httpCode: http.StatusForbidden,
expectedNumEvents: 0,
},
{
name: "Simulating credentials issue",
mockedFetchFile: "/leaderstats_empty.json",
httpCode: http.StatusForbidden,
expectedFetchErrorRegexp: "fetching HTTP response returned status code 403",
expectedNumEvents: 0,
},
{
name: "Simulating failure message",
mockedFetchFile: "/leaderstats_internalerror.json",
httpCode: http.StatusInternalServerError,
expectedFetchErrorRegexp: "fetching HTTP response returned status code 500:.+",
expectedNumEvents: 0,
}}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {

absPath, err := filepath.Abs(mockedFetchLocation + tc.mockedFetchFile)
assert.NoError(t, err)

response, err := ioutil.ReadFile(absPath)
assert.NoError(t, err)

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), events[0])
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(tc.httpCode)
w.Header().Set("Content-Type", "application/json;")
w.Write([]byte(response))
}))
defer server.Close()

config := map[string]interface{}{
"module": module,
"metricsets": []string{metricset},
"hosts": []string{server.URL},
}

f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)

if tc.expectedFetchErrorRegexp != "" {
for _, err := range errs {
if match, _ := regexp.MatchString(tc.expectedFetchErrorRegexp, err.Error()); match {
// found expected fetch error, no need for further checks
return
}
}
t.Fatalf("Expected fetch error not found:\n Expected:%s\n Got: %+v",
tc.expectedFetchErrorRegexp,
errs)
}

if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}

assert.Equal(t, tc.expectedNumEvents, len(events))

for i := range events {
t.Logf("%s/%s event[%d]: %+v", f.Module().Name(), f.Name(), i, events[i])
}
})
}
}