diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6a72877ff66..31500e23f85 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -297,6 +297,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix CPU count in docker/cpu in cases where no `online_cpus` are reported {pull}15070[15070] - Fix mixed modules loading standard and light metricsets {pull}15011[15011] - Make `kibana` module more resilient to Kibana unavailability. {issue}15258[15258] {pull}15270[15270] +- Make `logstash` module more resilient to Logstash unavailability. {issue}15276[15276] {pull}15306[15306] *Packetbeat* diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index 6e5bd523d0d..5f221b02177 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -19,6 +19,7 @@ package logstash import ( "encoding/json" + "fmt" "net/url" "github.com/pkg/errors" @@ -158,8 +159,25 @@ func GetPipelines(m *MetricSet) ([]PipelineState, error) { return pipelines, nil } -// GetVersion returns the version of the Logstash node -func GetVersion(m *MetricSet) (*common.Version, error) { +// CheckPipelineGraphAPIsAvailable returns an error if pipeline graph APIs are not +// available in the version of the Logstash node. +func (m *MetricSet) CheckPipelineGraphAPIsAvailable() error { + logstashVersion, err := m.getVersion() + if err != nil { + return err + } + + arePipelineGraphAPIsAvailable := elastic.IsFeatureAvailable(logstashVersion, PipelineGraphAPIsAvailableVersion) + + if !arePipelineGraphAPIsAvailable { + const errorMsg = "the %v metricset with X-Pack enabled is only supported with Logstash >= %v. You are currently running Logstash %v" + return fmt.Errorf(errorMsg, m.FullyQualifiedName(), PipelineGraphAPIsAvailableVersion, logstashVersion) + } + + return nil +} + +func (m *MetricSet) getVersion() (*common.Version, error) { const rootPath = "/" content, err := fetchPath(m.HTTP, rootPath, "") if err != nil { @@ -178,12 +196,6 @@ func GetVersion(m *MetricSet) (*common.Version, error) { return response.Version, nil } -// ArePipelineGraphAPIsAvailable returns whether Logstash APIs that returns pipeline graphs -// are available in the given version of Logstash -func ArePipelineGraphAPIsAvailable(currentLogstashVersion *common.Version) bool { - return elastic.IsFeatureAvailable(currentLogstashVersion, PipelineGraphAPIsAvailableVersion) -} - func fetchPath(httpHelper *helper.HTTP, path string, query string) ([]byte, error) { currentURI := httpHelper.GetURI() defer httpHelper.SetURI(currentURI) diff --git a/metricbeat/module/logstash/node/node.go b/metricbeat/module/logstash/node/node.go index fb07165c8b2..75b4a3bc4f1 100644 --- a/metricbeat/module/logstash/node/node.go +++ b/metricbeat/module/logstash/node/node.go @@ -18,8 +18,6 @@ package node import ( - "fmt" - "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/logstash" @@ -58,23 +56,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, err } - if ms.XPack { - logstashVersion, err := logstash.GetVersion(ms) - if err != nil { - return nil, err - } - - arePipelineGraphAPIsAvailable := logstash.ArePipelineGraphAPIsAvailable(logstashVersion) - if err != nil { - return nil, err - } - - if !arePipelineGraphAPIsAvailable { - const errorMsg = "The %v metricset with X-Pack enabled is only supported with Logstash >= %v. You are currently running Logstash %v" - return nil, fmt.Errorf(errorMsg, ms.FullyQualifiedName(), logstash.PipelineGraphAPIsAvailableVersion, logstashVersion) - } - } - return &MetricSet{ ms, }, nil @@ -106,3 +87,11 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return nil } + +func (m *MetricSet) init() error { + if m.XPack { + return m.CheckPipelineGraphAPIsAvailable() + } + + return nil +} diff --git a/metricbeat/module/logstash/node_stats/node_stats.go b/metricbeat/module/logstash/node_stats/node_stats.go index b2b24cfb1c3..37af127958d 100644 --- a/metricbeat/module/logstash/node_stats/node_stats.go +++ b/metricbeat/module/logstash/node_stats/node_stats.go @@ -18,8 +18,6 @@ package node_stats import ( - "fmt" - "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/logstash" @@ -59,25 +57,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, err } - if ms.XPack { - logstashVersion, err := logstash.GetVersion(ms) - if err != nil { - return nil, err - } - - arePipelineGraphAPIsAvailable := logstash.ArePipelineGraphAPIsAvailable(logstashVersion) - if err != nil { - return nil, err - } - - if !arePipelineGraphAPIsAvailable { - const errorMsg = "The %v metricset with X-Pack enabled is only supported with Logstash >= %v. You are currently running Logstash %v" - return nil, fmt.Errorf(errorMsg, ms.FullyQualifiedName(), logstash.PipelineGraphAPIsAvailableVersion, logstashVersion) - } - - ms.HTTP.SetURI(ms.HTTP.GetURI() + "?vertices=true") - } - return &MetricSet{ ms, }, nil @@ -87,6 +66,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch(r mb.ReporterV2) error { + err := m.init() + if err != nil { + if m.XPack { + m.Logger().Error(err) + return nil + } + return err + } + content, err := m.HTTP.FetchContent() if err != nil { if m.XPack { @@ -107,3 +95,16 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return nil } + +func (m *MetricSet) init() error { + if m.XPack { + err := m.CheckPipelineGraphAPIsAvailable() + if err != nil { + return err + } + + m.HTTP.SetURI(m.HTTP.GetURI() + "?vertices=true") + } + + return nil +}