diff --git a/metricbeat/module/elasticsearch/ccr/ccr.go b/metricbeat/module/elasticsearch/ccr/ccr.go index ccd27112bf3..1bf275cf365 100644 --- a/metricbeat/module/elasticsearch/ccr/ccr.go +++ b/metricbeat/module/elasticsearch/ccr/ccr.go @@ -56,60 +56,57 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { } // Fetch gathers stats for each follower shard from the _ccr/stats API -func (m *MetricSet) Fetch(r mb.ReporterV2) { +func (m *MetricSet) Fetch(r mb.ReporterV2) error { isMaster, err := elasticsearch.IsMaster(m.HTTP, m.GetServiceURI()) if err != nil { - err = errors.Wrap(err, "error determining if connected Elasticsearch node is master") - elastic.ReportAndLogError(err, r, m.Logger()) - return + return errors.Wrap(err, "error determining if connected Elasticsearch node is master") } // Not master, no event sent if !isMaster { m.Logger().Debug("trying to fetch ccr stats from a non-master node") - return + return nil } info, err := elasticsearch.GetInfo(m.HTTP, m.GetServiceURI()) if err != nil { - elastic.ReportAndLogError(err, r, m.Logger()) - return + return err } // CCR is only available in Trial or Platinum license of Elasticsearch. So we check // the license first. ccrUnavailableMessage, err := m.checkCCRAvailability(info.Version.Number) if err != nil { - err = errors.Wrap(err, "error determining if CCR is available") - elastic.ReportAndLogError(err, r, m.Logger()) - return + return errors.Wrap(err, "error determining if CCR is available") } if ccrUnavailableMessage != "" { if time.Since(m.lastCCRLicenseMessageTimestamp) > 1*time.Minute { - err := fmt.Errorf(ccrUnavailableMessage) - elastic.ReportAndLogError(err, r, m.Logger()) m.lastCCRLicenseMessageTimestamp = time.Now() + return fmt.Errorf(ccrUnavailableMessage) } - return + return nil } content, err := m.HTTP.FetchContent() if err != nil { - elastic.ReportAndLogError(err, r, m.Logger()) - return + return err } if m.XPack { err = eventsMappingXPack(r, m, *info, content) + if err != nil { + // Since this is an x-pack code path, we log the error but don't + // return it. Otherwise it would get reported into `metricbeat-*` + // indices. + m.Logger().Error(err) + return nil + } } else { - err = eventsMapping(r, *info, content) + return eventsMapping(r, *info, content) } - if err != nil { - m.Logger().Error(err) - return - } + return nil } func (m *MetricSet) checkCCRAvailability(currentElasticsearchVersion *common.Version) (message string, err error) { diff --git a/metricbeat/module/elasticsearch/ccr/data.go b/metricbeat/module/elasticsearch/ccr/data.go index 37c0e4791c8..748e42ca703 100644 --- a/metricbeat/module/elasticsearch/ccr/data.go +++ b/metricbeat/module/elasticsearch/ccr/data.go @@ -63,9 +63,7 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err var data response err := json.Unmarshal(content, &data) if err != nil { - err = errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response") - r.Error(err) - return err + return errors.Wrap(err, "failure parsing Elasticsearch CCR Stats API response") } var errs multierror.Errors @@ -81,9 +79,7 @@ func eventsMapping(r mb.ReporterV2, info elasticsearch.Info, content []byte) err event.MetricSetFields, err = schema.Apply(followerShard) if err != nil { - event.Error = errors.Wrap(err, "failure applying shard schema") - r.Event(event) - errs = append(errs, event.Error) + errs = append(errs, errors.Wrap(err, "failure applying shard schema")) continue }