Skip to content

Commit

Permalink
Add iterator for prometheus families
Browse files Browse the repository at this point in the history
  • Loading branch information
jsoriano committed Feb 13, 2020
1 parent 833fc29 commit 0712e44
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 26 deletions.
75 changes: 54 additions & 21 deletions metricbeat/helper/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,20 @@ import (
// Prometheus helper retrieves prometheus formatted metrics
type Prometheus interface {
// GetFamilies requests metric families from prometheus endpoint and returns them
GetFamilies() ([]*dto.MetricFamily, error)
GetFamilies() (FamiliesDecoder, error)

GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapStr, error)

ReportProcessedMetrics(mapping *MetricsMapping, r mb.ReporterV2) error
}

// Families decoder
type FamiliesDecoder interface {
Decode() bool
Family() *dto.MetricFamily
Err() error
}

type prometheus struct {
httpfetcher
logger *logp.Logger
Expand All @@ -62,13 +69,48 @@ func NewPrometheusClient(base mb.BaseMetricSet) (Prometheus, error) {
return &prometheus{http, base.Logger()}, nil
}

type familiesDecoder struct {
decoder expfmt.Decoder
closer io.Closer
family *dto.MetricFamily
err error
}

func (d *familiesDecoder) Decode() bool {
d.family = &dto.MetricFamily{}
err := d.decoder.Decode(d.family)
if err != nil {
if err != io.EOF {
d.err = errors.Wrap(err, "decoding metric family")
}
d.Close()
return false
}

return true
}

func (d *familiesDecoder) Family() *dto.MetricFamily {
return d.family
}

func (d *familiesDecoder) Close() error {
if d.closer == nil {
return nil
}
return d.closer.Close()
}

func (d *familiesDecoder) Err() error {
return d.err
}

// GetFamilies requests metric families from prometheus endpoint and returns them
func (p *prometheus) GetFamilies() ([]*dto.MetricFamily, error) {
func (p *prometheus) GetFamilies() (FamiliesDecoder, error) {
resp, err := p.FetchResponse()
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode > 399 {
bodyBytes, err := ioutil.ReadAll(resp.Body)
Expand All @@ -80,29 +122,15 @@ func (p *prometheus) GetFamilies() ([]*dto.MetricFamily, error) {

format := expfmt.ResponseFormat(resp.Header)
if format == "" {
return nil, fmt.Errorf("Invalid format for response of response")
return nil, fmt.Errorf("invalid format for response of response")
}

decoder := expfmt.NewDecoder(resp.Body, format)
if decoder == nil {
return nil, fmt.Errorf("Unable to create decoder to decode response")
}

families := []*dto.MetricFamily{}
for {
mf := &dto.MetricFamily{}
err = decoder.Decode(mf)
if err != nil {
if err == io.EOF {
break
}
return nil, errors.Wrap(err, "decoding of metric family failed")
} else {
families = append(families, mf)
}
return nil, fmt.Errorf("unable to create decoder to decode response")
}

return families, nil
return &familiesDecoder{decoder: decoder, closer: resp.Body}, nil
}

// MetricsMapping defines mapping settings for Prometheus metrics, to be used with `GetProcessedMetrics`
Expand All @@ -128,7 +156,8 @@ func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapS

eventsMap := map[string]common.MapStr{}
infoMetrics := []*infoMetricData{}
for _, family := range families {
for families.Decode() {
family := families.Family()
for _, metric := range family.GetMetric() {
m, ok := mapping.Metrics[family.GetName()]
if m == nil || !ok {
Expand Down Expand Up @@ -209,6 +238,10 @@ func (p *prometheus) GetProcessedMetrics(mapping *MetricsMapping) ([]common.MapS
}
}

if err := families.Err(); err != nil {
return nil, err
}

// populate events array from values in eventsMap
events := make([]common.MapStr, 0, len(eventsMap))
for _, event := range eventsMap {
Expand Down
29 changes: 24 additions & 5 deletions metricbeat/module/prometheus/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
package collector

import (
"runtime"

"github.com/pkg/errors"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
p "github.com/elastic/beats/metricbeat/helper/prometheus"
"github.com/elastic/beats/metricbeat/mb"
"github.com/elastic/beats/metricbeat/mb/parse"
Expand Down Expand Up @@ -67,20 +70,29 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {

// Fetch fetches data and reports it
func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
families, err := m.prometheus.GetFamilies()
var memStats runtime.MemStats

eventList := map[string]common.MapStr{}
runtime.ReadMemStats(&memStats)
logp.Warn("Before getting families, alloc: %d, total: %d", memStats.Alloc, memStats.TotalAlloc)

families, err := m.prometheus.GetFamilies()
if err != nil {
m.addUpEvent(eventList, 0)
for _, evt := range eventList {
event := make(map[string]common.MapStr)
m.addUpEvent(event, 0)
for evt := range event {
reporter.Event(mb.Event{
RootFields: common.MapStr{"prometheus": evt},
})
}
return errors.Wrap(err, "unable to decode response from prometheus endpoint")
}

for _, family := range families {
runtime.ReadMemStats(&memStats)
logp.Warn("After getting families, alloc: %d, total: %d", memStats.Alloc, memStats.TotalAlloc)

eventList := map[string]common.MapStr{}
for families.Decode() {
family := families.Family()
promEvents := getPromEventsFromMetricFamily(family)

for _, promEvent := range promEvents {
Expand Down Expand Up @@ -110,8 +122,15 @@ func (m *MetricSet) Fetch(reporter mb.ReporterV2) error {
}
}

if err := families.Err(); err != nil {
return err
}

m.addUpEvent(eventList, 1)

runtime.ReadMemStats(&memStats)
logp.Warn("After getting event list, alloc: %d, total: %d", memStats.Alloc, memStats.TotalAlloc)

// Converts hash list to slice
for _, e := range eventList {
isOpen := reporter.Event(mb.Event{
Expand Down

0 comments on commit 0712e44

Please sign in to comment.