Skip to content

Commit

Permalink
Refactoring: Use new Fetch interface in elasticsearch/ccr metricset
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Apr 13, 2019
1 parent 9c396b3 commit 24374ea
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 26 deletions.
37 changes: 17 additions & 20 deletions metricbeat/module/elasticsearch/ccr/ccr.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,60 +56,57 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) {
}

// Fetch gathers stats for each follower shard from the _ccr/stats API
func (m *MetricSet) Fetch(r mb.ReporterV2) {
func (m *MetricSet) Fetch(r mb.ReporterV2) error {
isMaster, err := elasticsearch.IsMaster(m.HTTP, m.GetServiceURI())
if err != nil {
err = errors.Wrap(err, "error determining if connected Elasticsearch node is master")
elastic.ReportAndLogError(err, r, m.Logger())
return
return errors.Wrap(err, "error determining if connected Elasticsearch node is master")
}

// Not master, no event sent
if !isMaster {
m.Logger().Debug("trying to fetch ccr stats from a non-master node")
return
return nil
}

info, err := elasticsearch.GetInfo(m.HTTP, m.GetServiceURI())
if err != nil {
elastic.ReportAndLogError(err, r, m.Logger())
return
return err
}

// CCR is only available in Trial or Platinum license of Elasticsearch. So we check
// the license first.
ccrUnavailableMessage, err := m.checkCCRAvailability(info.Version.Number)
if err != nil {
err = errors.Wrap(err, "error determining if CCR is available")
elastic.ReportAndLogError(err, r, m.Logger())
return
return errors.Wrap(err, "error determining if CCR is available")
}

if ccrUnavailableMessage != "" {
if time.Since(m.lastCCRLicenseMessageTimestamp) > 1*time.Minute {
err := fmt.Errorf(ccrUnavailableMessage)
elastic.ReportAndLogError(err, r, m.Logger())
m.lastCCRLicenseMessageTimestamp = time.Now()
return fmt.Errorf(ccrUnavailableMessage)
}
return
return nil
}

content, err := m.HTTP.FetchContent()
if err != nil {
elastic.ReportAndLogError(err, r, m.Logger())
return
return err
}

if m.XPack {
err = eventsMappingXPack(r, m, *info, content)
if err != nil {
// Since this is an x-pack code path, we log the error but don't
// return it. Otherwise it would get reported into `metricbeat-*`
// indices.
m.Logger().Error(err)
return nil
}
} else {
err = eventsMapping(r, *info, content)
return eventsMapping(r, *info, content)
}

if err != nil {
m.Logger().Error(err)
return
}
return nil
}

func (m *MetricSet) checkCCRAvailability(currentElasticsearchVersion *common.Version) (message string, err error) {
Expand Down
8 changes: 2 additions & 6 deletions metricbeat/module/elasticsearch/ccr/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,7 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err
var data response
err := json.Unmarshal(content, &data)
if err != nil {
err = errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
r.Error(err)
return err
return errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response")
}

var errs multierror.Errors
Expand All @@ -81,9 +79,7 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err

event.MetricSetFields, err = schema.Apply(followerShard)
if err != nil {
event.Error = errors.Wrap(err, "failure applying shard schema")
r.Event(event)
errs = append(errs, event.Error)
errs = append(errs, errors.Wrap(err, "failure applying shard schema"))
continue
}

Expand Down

0 comments on commit 24374ea

Please sign in to comment.