Skip to content

Commit

Permalink
[Metricbeat] Migrate Ceph monitor_health to use ReporterV2 interface (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sayden authored Apr 3, 2019
1 parent 8de672e commit 9f3aede
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 92 deletions.
24 changes: 10 additions & 14 deletions metricbeat/module/ceph/monitor_health/_meta/data.json
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
{
"@timestamp": "2017-10-12T08:05:34.853Z",
"agent": {
"hostname": "host.example.com",
"name": "host.example.com"
},
"@timestamp": "2019-03-01T08:05:34.853Z",
"ceph": {
"monitor_health": {
"available": {
"kb": 46814916,
"pct": 76
"kb": 773013080,
"pct": 82
},
"health": "HEALTH_OK",
"last_updated": "2019-01-25T12:37:24.921587Z",
"name": "f0e2841bb524",
"last_updated": "2019-03-13T11:21:24.667025Z",
"name": "26c372192772",
"store_stats": {
"last_updated": "0.000000",
"log": {
"bytes": 2031616
"bytes": 4128768
},
"misc": {
"bytes": 65552
Expand All @@ -25,14 +21,14 @@
"bytes": 1087
},
"total": {
"bytes": 2098255
"bytes": 4195407
}
},
"total": {
"kb": 61255492
"kb": 936145620
},
"used": {
"kb": 11299252
"kb": 115509168
}
}
},
Expand All @@ -45,7 +41,7 @@
"name": "monitor_health"
},
"service": {
"address": "127.0.0.1:5000",
"address": "127.0.0.1:55555",
"type": "ceph"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
type: http
url: "/api/v0.1/health"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"status": "OK", "output": {"detail": [], "timechecks": {"round_status": "finished", "epoch": 3, "round": 0}, "health": {"health_services": [{"mons": [{"last_updated": "2019-03-13 11:21:24.667025", "name": "26c372192772", "avail_percent": 82, "kb_total": 936145620, "kb_avail": 773013080, "health": "HEALTH_OK", "kb_used": 115509168, "store_stats": {"bytes_total": 4195407, "bytes_log": 4128768, "last_updated": "0.000000", "bytes_misc": 65552, "bytes_sst": 1087}}]}]}, "overall_status": "HEALTH_OK", "summary": []}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
[
{
"ceph": {
"monitor_health": {
"available": {
"kb": 773013080,
"pct": 82
},
"health": "HEALTH_OK",
"last_updated": "2019-03-13T11:21:24.667025Z",
"name": "26c372192772",
"store_stats": {
"last_updated": "0.000000",
"log": {
"bytes": 4128768
},
"misc": {
"bytes": 65552
},
"sst": {
"bytes": 1087
},
"total": {
"bytes": 4195407
}
},
"total": {
"kb": 936145620
},
"used": {
"kb": 115509168
}
}
},
"event": {
"dataset": "ceph.monitor_health",
"duration": 115000,
"module": "ceph"
},
"metricset": {
"name": "monitor_health"
},
"service": {
"address": "127.0.0.1:55555",
"type": "ceph"
}
}
]
9 changes: 5 additions & 4 deletions metricbeat/module/ceph/monitor_health/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ import (
"encoding/json"
"time"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

type Tick struct {
Expand Down Expand Up @@ -85,11 +86,11 @@ type HealthRequest struct {
Output Output `json:"output"`
}

func eventsMapping(content []byte) []common.MapStr {
func eventsMapping(content []byte) ([]common.MapStr, error) {
var d HealthRequest
err := json.Unmarshal(content, &d)
if err != nil {
logp.Err("Error: %+v", err)
return nil, errors.Wrapf(err, "could not get HealthRequest data")
}

events := []common.MapStr{}
Expand Down Expand Up @@ -131,5 +132,5 @@ func eventsMapping(content []byte) []common.MapStr {
}
}

return events
return events, nil
}
19 changes: 15 additions & 4 deletions metricbeat/module/ceph/monitor_health/monitor_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package monitor_health

import (
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/metricbeat/helper"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand Down Expand Up @@ -61,11 +60,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}, nil
}

func (m *MetricSet) Fetch() ([]common.MapStr, error) {
// Fetch methods implements the data gathering and data conversion to the right
// format. It publishes the event which is then forwarded to the output. In case
// of an error set the Error field of mb.Event or simply call report.Error().
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
content, err := m.HTTP.FetchContent()
if err != nil {
return nil, err
return err
}

events, err := eventsMapping(content)
if err != nil {
return err
}

for _, event := range events {
reporter.Event(mb.Event{MetricSetFields: event})
}

return eventsMapping(content), nil
return nil
}

This file was deleted.

12 changes: 7 additions & 5 deletions metricbeat/module/ceph/monitor_health/monitor_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

func TestFetchEventContents(t *testing.T) {
absPath, err := filepath.Abs("../_meta/testdata/")
assert.NoError(t, err)

response, err := ioutil.ReadFile(absPath + "/sample_response.json")
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -47,12 +48,13 @@ func TestFetchEventContents(t *testing.T) {
"hosts": []string{server.URL},
}

f := mbtest.NewEventsFetcher(t, config)
events, err := f.Fetch()
if err != nil {
t.Fatal(err)
f := mbtest.NewReportingMetricSetV2Error(t, config)
events, errs := mbtest.ReportingFetchV2Error(f)
if len(errs) > 0 {
t.Fatalf("Expected 0 error, had %d. %v\n", len(errs), errs)
}
event := events[0]
assert.NotEmpty(t, events)
event := events[0].MetricSetFields

t.Logf("%s/%s event: %+v", f.Module().Name(), f.Name(), event.StringToPrint())

Expand Down

0 comments on commit 9f3aede

Please sign in to comment.