From 7fb01389fadc239545f35ded4c67e9d80a00fddf Mon Sep 17 00:00:00 2001 From: bryan Date: Sun, 21 Nov 2021 20:56:36 -0800 Subject: [PATCH 1/5] keep trucking on ES availability errors; more tests to come --- internal/pkg/coordinator/monitor.go | 20 +++++++++++++ internal/pkg/coordinator/monitor_test.go | 37 ++++++++++++++++++++++++ 2 files changed, 57 insertions(+) create mode 100644 internal/pkg/coordinator/monitor_test.go diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 66c934154..55250ae12 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -9,6 +9,7 @@ import ( "errors" "net" "os" + "regexp" "runtime" "sync" "time" @@ -188,10 +189,29 @@ func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error { return nil } +func isAvailabilityError(e error) bool { + if e == nil { + return false + } + match, err := regexp.MatchString("connect: connection refused$|net/http: timeout awaiting response headers$", e.Error()) + if err != nil { + log.Warn().Err(err).Msg("Error occured while parsing error message") + return false + } + return match +} + // ensureLeadership ensures leadership is held or needs to be taken over. func (m *monitorT) ensureLeadership(ctx context.Context) error { m.log.Debug().Msg("ensuring leadership of policies") err := dl.EnsureServer(ctx, m.bulker, m.version, m.agentMetadata, m.hostMetadata, dl.WithIndexName(m.serversIndex)) + + // If we are doing a read on an unavailable ES instance ignore it and we will check back later. + if isAvailabilityError(err) { + log.Info().Err(err).Msgf("Encountered availability error while attempting ES read; continuing to retry.") + return nil + } + if err != nil { return err } diff --git a/internal/pkg/coordinator/monitor_test.go b/internal/pkg/coordinator/monitor_test.go new file mode 100644 index 000000000..8c8fa6ba8 --- /dev/null +++ b/internal/pkg/coordinator/monitor_test.go @@ -0,0 +1,37 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package coordinator + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestIsAvailabilityErrorNil(t *testing.T) { + matched := isAvailabilityError(nil) + assert.Equal(t, matched, false) +} + +func TestIsAvailabilityErrorTimeout(t *testing.T) { + matched := isAvailabilityError(errors.New("net/http: timeout awaiting response headers")) + assert.Equal(t, matched, true) +} + +func TestIsAvailabilityErrorConnectRefused(t *testing.T) { + matched := isAvailabilityError(errors.New("dial tcp 127.0.0.1:9200: connect: connection refused")) + assert.Equal(t, matched, true) +} + +func TestIsAvailabilityErrorConnectRefusedRemote(t *testing.T) { + matched := isAvailabilityError(errors.New("dial tcp 65.234.123:9200: connect: connection refused")) + assert.Equal(t, matched, true) +} + +func TestIsAvailabilityErrorUnhandledError(t *testing.T) { + matched := isAvailabilityError(errors.New("novel error")) + assert.Equal(t, matched, false) +} From 2c75552ac3f97088f2fd84f5c6bfd4bbe0448e36 Mon Sep 17 00:00:00 2001 From: bryan Date: Wed, 24 Nov 2021 08:37:52 -0800 Subject: [PATCH 2/5] don't attempt to distinguish between errors, just keep retrying --- internal/pkg/coordinator/monitor.go | 23 ++------------- internal/pkg/coordinator/monitor_test.go | 37 ------------------------ 2 files changed, 3 insertions(+), 57 deletions(-) delete mode 100644 internal/pkg/coordinator/monitor_test.go diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 55250ae12..2b0056d05 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -9,7 +9,6 @@ import ( "errors" "net" "os" - "regexp" "runtime" "sync" "time" @@ -189,31 +188,15 @@ func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error { return nil } -func isAvailabilityError(e error) bool { - if e == nil { - return false - } - match, err := regexp.MatchString("connect: connection refused$|net/http: timeout awaiting response headers$", e.Error()) - if err != nil { - log.Warn().Err(err).Msg("Error occured while parsing error message") - return false - } - return match -} - // ensureLeadership ensures leadership is held or needs to be taken over. func (m *monitorT) ensureLeadership(ctx context.Context) error { m.log.Debug().Msg("ensuring leadership of policies") err := dl.EnsureServer(ctx, m.bulker, m.version, m.agentMetadata, m.hostMetadata, dl.WithIndexName(m.serversIndex)) - // If we are doing a read on an unavailable ES instance ignore it and we will check back later. - if isAvailabilityError(err) { - log.Info().Err(err).Msgf("Encountered availability error while attempting ES read; continuing to retry.") - return nil - } - + // If we run into problems communicating with our ES instance, ignore it and we will check back later. if err != nil { - return err + log.Info().Err(err).Msgf("Encountered an error while trying to commnuicate with Elasticsearch host %s; continuing to retry.", m.hostMetadata.Name) + return nil } // fetch current policies and leaders diff --git a/internal/pkg/coordinator/monitor_test.go b/internal/pkg/coordinator/monitor_test.go deleted file mode 100644 index 8c8fa6ba8..000000000 --- a/internal/pkg/coordinator/monitor_test.go +++ /dev/null @@ -1,37 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package coordinator - -import ( - "errors" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestIsAvailabilityErrorNil(t *testing.T) { - matched := isAvailabilityError(nil) - assert.Equal(t, matched, false) -} - -func TestIsAvailabilityErrorTimeout(t *testing.T) { - matched := isAvailabilityError(errors.New("net/http: timeout awaiting response headers")) - assert.Equal(t, matched, true) -} - -func TestIsAvailabilityErrorConnectRefused(t *testing.T) { - matched := isAvailabilityError(errors.New("dial tcp 127.0.0.1:9200: connect: connection refused")) - assert.Equal(t, matched, true) -} - -func TestIsAvailabilityErrorConnectRefusedRemote(t *testing.T) { - matched := isAvailabilityError(errors.New("dial tcp 65.234.123:9200: connect: connection refused")) - assert.Equal(t, matched, true) -} - -func TestIsAvailabilityErrorUnhandledError(t *testing.T) { - matched := isAvailabilityError(errors.New("novel error")) - assert.Equal(t, matched, false) -} From f5fead9d8446743c521446bd038d648f8c60c42d Mon Sep 17 00:00:00 2001 From: bryan Date: Wed, 24 Nov 2021 11:34:49 -0800 Subject: [PATCH 3/5] move error blackholing up the stack so the monitor will never crash, added additional logging --- internal/pkg/coordinator/monitor.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 2b0056d05..d340af9a0 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -132,15 +132,16 @@ func (m *monitorT) Run(ctx context.Context) (err error) { case hits := <-s.Output(): err = m.handlePolicies(ctx, hits) if err != nil { - return err + m.log.Info().Err(err).Msgf("Encountered an error while policy leadership changes; continuing to retry.") } case <-mT.C: m.calcMetadata() mT.Reset(m.metadataInterval) case <-lT.C: err = m.ensureLeadership(ctx) + // If we run into problems communicating with our ES instance, ignore it and we will check back later. if err != nil { - return err + m.log.Info().Err(err).Msgf("Encountered an error while checking/assigning policy leaders; continuing to retry.") } lT.Reset(m.checkInterval) case <-ctx.Done(): @@ -157,6 +158,7 @@ func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error { var policy model.Policy err := hit.Unmarshal(&policy) if err != nil { + m.log.Debug().Err(err).Msg("Failed to deserialize policy json") return err } if policy.CoordinatorIdx != 0 { @@ -170,6 +172,7 @@ func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error { // current leader send to its coordinator err = p.cord.Update(ctx, policy) if err != nil { + m.log.Info().Err(err).Msg("Failed to update policy leader") return err } } @@ -193,10 +196,9 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { m.log.Debug().Msg("ensuring leadership of policies") err := dl.EnsureServer(ctx, m.bulker, m.version, m.agentMetadata, m.hostMetadata, dl.WithIndexName(m.serversIndex)) - // If we run into problems communicating with our ES instance, ignore it and we will check back later. if err != nil { - log.Info().Err(err).Msgf("Encountered an error while trying to commnuicate with Elasticsearch host %s; continuing to retry.", m.hostMetadata.Name) - return nil + m.log.Debug().Err(err).Str("eshost", m.hostMetadata.Name).Msg("Failed to ") + return err } // fetch current policies and leaders @@ -207,6 +209,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) return nil } + m.log.Debug().Err(err).Msg("Encountered error while querying policies") return err } if len(policies) > 0 { @@ -217,6 +220,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { leaders, err = dl.SearchPolicyLeaders(ctx, m.bulker, ids, dl.WithIndexName(m.leadersIndex)) if err != nil { if !errors.Is(err, es.ErrIndexNotFound) { + m.log.Debug().Err(err).Msg("Encountered error while fetching policy leaders") return err } } From 1886dc57f3f1ccaf21b704ae49743079184b9c0b Mon Sep 17 00:00:00 2001 From: bryan Date: Sun, 28 Nov 2021 14:42:13 -0800 Subject: [PATCH 4/5] pr feedback --- internal/pkg/coordinator/monitor.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index d340af9a0..11c04cafa 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -7,6 +7,7 @@ package coordinator import ( "context" "errors" + "fmt" "net" "os" "runtime" @@ -139,7 +140,6 @@ func (m *monitorT) Run(ctx context.Context) (err error) { mT.Reset(m.metadataInterval) case <-lT.C: err = m.ensureLeadership(ctx) - // If we run into problems communicating with our ES instance, ignore it and we will check back later. if err != nil { m.log.Info().Err(err).Msgf("Encountered an error while checking/assigning policy leaders; continuing to retry.") } @@ -197,8 +197,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { err := dl.EnsureServer(ctx, m.bulker, m.version, m.agentMetadata, m.hostMetadata, dl.WithIndexName(m.serversIndex)) if err != nil { - m.log.Debug().Err(err).Str("eshost", m.hostMetadata.Name).Msg("Failed to ") - return err + return errors.New(fmt.Sprintf("Failed to check server status on Elasticsearch (%s): %s", m.hostMetadata.Name, err.Error())) } // fetch current policies and leaders @@ -209,8 +208,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) return nil } - m.log.Debug().Err(err).Msg("Encountered error while querying policies") - return err + return errors.New(fmt.Sprintf("Encountered error while querying policies: %s", err.Error())) } if len(policies) > 0 { ids := make([]string, len(policies)) @@ -220,8 +218,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { leaders, err = dl.SearchPolicyLeaders(ctx, m.bulker, ids, dl.WithIndexName(m.leadersIndex)) if err != nil { if !errors.Is(err, es.ErrIndexNotFound) { - m.log.Debug().Err(err).Msg("Encountered error while fetching policy leaders") - return err + return errors.New(fmt.Sprintf("Encountered error while fetching policy leaders: %s", err.Error())) } } } From 97524dca4241d34f877a9d83d2b1a1c9198eaeeb Mon Sep 17 00:00:00 2001 From: bryan Date: Tue, 30 Nov 2021 00:02:52 -0800 Subject: [PATCH 5/5] upped logging level, properly wrapped errors --- internal/pkg/coordinator/monitor.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 11c04cafa..03019ac71 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -133,7 +133,7 @@ func (m *monitorT) Run(ctx context.Context) (err error) { case hits := <-s.Output(): err = m.handlePolicies(ctx, hits) if err != nil { - m.log.Info().Err(err).Msgf("Encountered an error while policy leadership changes; continuing to retry.") + m.log.Warn().Err(err).Msgf("Encountered an error while policy leadership changes; continuing to retry.") } case <-mT.C: m.calcMetadata() @@ -141,7 +141,7 @@ func (m *monitorT) Run(ctx context.Context) (err error) { case <-lT.C: err = m.ensureLeadership(ctx) if err != nil { - m.log.Info().Err(err).Msgf("Encountered an error while checking/assigning policy leaders; continuing to retry.") + m.log.Warn().Err(err).Msgf("Encountered an error while checking/assigning policy leaders; continuing to retry.") } lT.Reset(m.checkInterval) case <-ctx.Done(): @@ -197,7 +197,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { err := dl.EnsureServer(ctx, m.bulker, m.version, m.agentMetadata, m.hostMetadata, dl.WithIndexName(m.serversIndex)) if err != nil { - return errors.New(fmt.Sprintf("Failed to check server status on Elasticsearch (%s): %s", m.hostMetadata.Name, err.Error())) + return fmt.Errorf("Failed to check server status on Elasticsearch (%s): %w", m.hostMetadata.Name, err) } // fetch current policies and leaders @@ -208,7 +208,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) return nil } - return errors.New(fmt.Sprintf("Encountered error while querying policies: %s", err.Error())) + return fmt.Errorf("Encountered error while querying policies: %w", err) } if len(policies) > 0 { ids := make([]string, len(policies)) @@ -218,7 +218,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { leaders, err = dl.SearchPolicyLeaders(ctx, m.bulker, ids, dl.WithIndexName(m.leadersIndex)) if err != nil { if !errors.Is(err, es.ErrIndexNotFound) { - return errors.New(fmt.Sprintf("Encountered error while fetching policy leaders: %s", err.Error())) + return fmt.Errorf("Encountered error while fetching policy leaders: %w", err) } } }