From 26bee1c78e9e2849933f79bc860c12bbdc73c348 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 2 Jan 2020 11:26:57 -0800 Subject: [PATCH 1/5] node_stats: move HTTP calls from New() to Fetch() --- .../module/logstash/node_stats/node_stats.go | 48 +++++++++++-------- 1 file changed, 29 insertions(+), 19 deletions(-) diff --git a/metricbeat/module/logstash/node_stats/node_stats.go b/metricbeat/module/logstash/node_stats/node_stats.go index b2b24cfb1c3..fdbcfb7dfe6 100644 --- a/metricbeat/module/logstash/node_stats/node_stats.go +++ b/metricbeat/module/logstash/node_stats/node_stats.go @@ -59,25 +59,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, err } - if ms.XPack { - logstashVersion, err := logstash.GetVersion(ms) - if err != nil { - return nil, err - } - - arePipelineGraphAPIsAvailable := logstash.ArePipelineGraphAPIsAvailable(logstashVersion) - if err != nil { - return nil, err - } - - if !arePipelineGraphAPIsAvailable { - const errorMsg = "The %v metricset with X-Pack enabled is only supported with Logstash >= %v. You are currently running Logstash %v" - return nil, fmt.Errorf(errorMsg, ms.FullyQualifiedName(), logstash.PipelineGraphAPIsAvailableVersion, logstashVersion) - } - - ms.HTTP.SetURI(ms.HTTP.GetURI() + "?vertices=true") - } - return &MetricSet{ ms, }, nil @@ -87,6 +68,15 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // It returns the event which is then forward to the output. In case of an error, a // descriptive error must be returned. func (m *MetricSet) Fetch(r mb.ReporterV2) error { + err := m.init() + if err != nil { + if m.XPack { + m.Logger().Error(err) + return nil + } + return err + } + content, err := m.HTTP.FetchContent() if err != nil { if m.XPack { @@ -107,3 +97,23 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return nil } + +func (m *MetricSet) init() error { + if m.XPack { + logstashVersion, err := logstash.GetVersion(m.MetricSet) + if err != nil { + return err + } + + arePipelineGraphAPIsAvailable := logstash.ArePipelineGraphAPIsAvailable(logstashVersion) + + if !arePipelineGraphAPIsAvailable { + const errorMsg = "the %v metricset with X-Pack enabled is only supported with Logstash >= %v. You are currently running Logstash %v" + return fmt.Errorf(errorMsg, m.FullyQualifiedName(), logstash.PipelineGraphAPIsAvailableVersion, logstashVersion) + } + + m.HTTP.SetURI(m.HTTP.GetURI() + "?vertices=true") + } + + return nil +} From ad2b588c6778e55feb00b965ccfa7a71ac6844c7 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 2 Jan 2020 11:32:26 -0800 Subject: [PATCH 2/5] node: move HTTP calls from New() to Fetch() --- metricbeat/module/logstash/node/node.go | 35 +++++++++++++------------ 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/metricbeat/module/logstash/node/node.go b/metricbeat/module/logstash/node/node.go index fb07165c8b2..b9a74de3b9e 100644 --- a/metricbeat/module/logstash/node/node.go +++ b/metricbeat/module/logstash/node/node.go @@ -58,23 +58,6 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, err } - if ms.XPack { - logstashVersion, err := logstash.GetVersion(ms) - if err != nil { - return nil, err - } - - arePipelineGraphAPIsAvailable := logstash.ArePipelineGraphAPIsAvailable(logstashVersion) - if err != nil { - return nil, err - } - - if !arePipelineGraphAPIsAvailable { - const errorMsg = "The %v metricset with X-Pack enabled is only supported with Logstash >= %v. You are currently running Logstash %v" - return nil, fmt.Errorf(errorMsg, ms.FullyQualifiedName(), logstash.PipelineGraphAPIsAvailableVersion, logstashVersion) - } - } - return &MetricSet{ ms, }, nil @@ -106,3 +89,21 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return nil } + +func (m *MetricSet) init() error { + if m.XPack { + logstashVersion, err := logstash.GetVersion(m.MetricSet) + if err != nil { + return err + } + + arePipelineGraphAPIsAvailable := logstash.ArePipelineGraphAPIsAvailable(logstashVersion) + + if !arePipelineGraphAPIsAvailable { + const errorMsg = "the %v metricset with X-Pack enabled is only supported with Logstash >= %v. You are currently running Logstash %v" + return fmt.Errorf(errorMsg, m.FullyQualifiedName(), logstash.PipelineGraphAPIsAvailableVersion, logstashVersion) + } + } + + return nil +} From b695681956761dd66b071fb48524631ab52493fb Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 2 Jan 2020 11:56:12 -0800 Subject: [PATCH 3/5] Adding CHANGELOG entry --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6a72877ff66..31500e23f85 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -297,6 +297,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix CPU count in docker/cpu in cases where no `online_cpus` are reported {pull}15070[15070] - Fix mixed modules loading standard and light metricsets {pull}15011[15011] - Make `kibana` module more resilient to Kibana unavailability. {issue}15258[15258] {pull}15270[15270] +- Make `logstash` module more resilient to Logstash unavailability. {issue}15276[15276] {pull}15306[15306] *Packetbeat* From f077d1d019cabc9a69d46f472389455f140926a1 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 2 Jan 2020 13:52:06 -0800 Subject: [PATCH 4/5] Refactoring: extract common code into logstash package --- metricbeat/module/logstash/logstash.go | 23 +++++++++++++++---- metricbeat/module/logstash/node/node.go | 14 +---------- .../module/logstash/node_stats/node_stats.go | 11 +-------- 3 files changed, 20 insertions(+), 28 deletions(-) diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index 6e5bd523d0d..6e6aef6b05b 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -19,6 +19,7 @@ package logstash import ( "encoding/json" + "fmt" "net/url" "github.com/pkg/errors" @@ -159,7 +160,7 @@ func GetPipelines(m *MetricSet) ([]PipelineState, error) { } // GetVersion returns the version of the Logstash node -func GetVersion(m *MetricSet) (*common.Version, error) { +func (m *MetricSet) GetVersion() (*common.Version, error) { const rootPath = "/" content, err := fetchPath(m.HTTP, rootPath, "") if err != nil { @@ -178,10 +179,22 @@ func GetVersion(m *MetricSet) (*common.Version, error) { return response.Version, nil } -// ArePipelineGraphAPIsAvailable returns whether Logstash APIs that returns pipeline graphs -// are available in the given version of Logstash -func ArePipelineGraphAPIsAvailable(currentLogstashVersion *common.Version) bool { - return elastic.IsFeatureAvailable(currentLogstashVersion, PipelineGraphAPIsAvailableVersion) +// CheckPipelineGraphAPIs returns an error if pipeline graph APIs are not available +// in the version of the Logstash node. +func (m *MetricSet) CheckPipelineGraphAPIsAvailable() error { + logstashVersion, err := m.GetVersion() + if err != nil { + return err + } + + arePipelineGraphAPIsAvailable := elastic.IsFeatureAvailable(logstashVersion, PipelineGraphAPIsAvailableVersion) + + if !arePipelineGraphAPIsAvailable { + const errorMsg = "the %v metricset with X-Pack enabled is only supported with Logstash >= %v. You are currently running Logstash %v" + return fmt.Errorf(errorMsg, m.FullyQualifiedName(), PipelineGraphAPIsAvailableVersion, logstashVersion) + } + + return nil } func fetchPath(httpHelper *helper.HTTP, path string, query string) ([]byte, error) { diff --git a/metricbeat/module/logstash/node/node.go b/metricbeat/module/logstash/node/node.go index b9a74de3b9e..75b4a3bc4f1 100644 --- a/metricbeat/module/logstash/node/node.go +++ b/metricbeat/module/logstash/node/node.go @@ -18,8 +18,6 @@ package node import ( - "fmt" - "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/logstash" @@ -92,17 +90,7 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { func (m *MetricSet) init() error { if m.XPack { - logstashVersion, err := logstash.GetVersion(m.MetricSet) - if err != nil { - return err - } - - arePipelineGraphAPIsAvailable := logstash.ArePipelineGraphAPIsAvailable(logstashVersion) - - if !arePipelineGraphAPIsAvailable { - const errorMsg = "the %v metricset with X-Pack enabled is only supported with Logstash >= %v. You are currently running Logstash %v" - return fmt.Errorf(errorMsg, m.FullyQualifiedName(), logstash.PipelineGraphAPIsAvailableVersion, logstashVersion) - } + return m.CheckPipelineGraphAPIsAvailable() } return nil diff --git a/metricbeat/module/logstash/node_stats/node_stats.go b/metricbeat/module/logstash/node_stats/node_stats.go index fdbcfb7dfe6..37af127958d 100644 --- a/metricbeat/module/logstash/node_stats/node_stats.go +++ b/metricbeat/module/logstash/node_stats/node_stats.go @@ -18,8 +18,6 @@ package node_stats import ( - "fmt" - "github.com/elastic/beats/metricbeat/mb" "github.com/elastic/beats/metricbeat/mb/parse" "github.com/elastic/beats/metricbeat/module/logstash" @@ -100,18 +98,11 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { func (m *MetricSet) init() error { if m.XPack { - logstashVersion, err := logstash.GetVersion(m.MetricSet) + err := m.CheckPipelineGraphAPIsAvailable() if err != nil { return err } - arePipelineGraphAPIsAvailable := logstash.ArePipelineGraphAPIsAvailable(logstashVersion) - - if !arePipelineGraphAPIsAvailable { - const errorMsg = "the %v metricset with X-Pack enabled is only supported with Logstash >= %v. You are currently running Logstash %v" - return fmt.Errorf(errorMsg, m.FullyQualifiedName(), logstash.PipelineGraphAPIsAvailableVersion, logstashVersion) - } - m.HTTP.SetURI(m.HTTP.GetURI() + "?vertices=true") } From 7ca1709bda690f8a3295f7b891e8c847ad85d344 Mon Sep 17 00:00:00 2001 From: Shaunak Kashyap Date: Thu, 2 Jan 2020 13:55:09 -0800 Subject: [PATCH 5/5] Unexporting getVersion --- metricbeat/module/logstash/logstash.go | 39 +++++++++++++------------- 1 file changed, 19 insertions(+), 20 deletions(-) diff --git a/metricbeat/module/logstash/logstash.go b/metricbeat/module/logstash/logstash.go index 6e6aef6b05b..5f221b02177 100644 --- a/metricbeat/module/logstash/logstash.go +++ b/metricbeat/module/logstash/logstash.go @@ -159,8 +159,25 @@ func GetPipelines(m *MetricSet) ([]PipelineState, error) { return pipelines, nil } -// GetVersion returns the version of the Logstash node -func (m *MetricSet) GetVersion() (*common.Version, error) { +// CheckPipelineGraphAPIsAvailable returns an error if pipeline graph APIs are not +// available in the version of the Logstash node. +func (m *MetricSet) CheckPipelineGraphAPIsAvailable() error { + logstashVersion, err := m.getVersion() + if err != nil { + return err + } + + arePipelineGraphAPIsAvailable := elastic.IsFeatureAvailable(logstashVersion, PipelineGraphAPIsAvailableVersion) + + if !arePipelineGraphAPIsAvailable { + const errorMsg = "the %v metricset with X-Pack enabled is only supported with Logstash >= %v. You are currently running Logstash %v" + return fmt.Errorf(errorMsg, m.FullyQualifiedName(), PipelineGraphAPIsAvailableVersion, logstashVersion) + } + + return nil +} + +func (m *MetricSet) getVersion() (*common.Version, error) { const rootPath = "/" content, err := fetchPath(m.HTTP, rootPath, "") if err != nil { @@ -179,24 +196,6 @@ func (m *MetricSet) GetVersion() (*common.Version, error) { return response.Version, nil } -// CheckPipelineGraphAPIs returns an error if pipeline graph APIs are not available -// in the version of the Logstash node. -func (m *MetricSet) CheckPipelineGraphAPIsAvailable() error { - logstashVersion, err := m.GetVersion() - if err != nil { - return err - } - - arePipelineGraphAPIsAvailable := elastic.IsFeatureAvailable(logstashVersion, PipelineGraphAPIsAvailableVersion) - - if !arePipelineGraphAPIsAvailable { - const errorMsg = "the %v metricset with X-Pack enabled is only supported with Logstash >= %v. You are currently running Logstash %v" - return fmt.Errorf(errorMsg, m.FullyQualifiedName(), PipelineGraphAPIsAvailableVersion, logstashVersion) - } - - return nil -} - func fetchPath(httpHelper *helper.HTTP, path string, query string) ([]byte, error) { currentURI := httpHelper.GetURI() defer httpHelper.SetURI(currentURI)