diff --git a/internal/pkg/coordinator/monitor.go b/internal/pkg/coordinator/monitor.go index 66c934154..03019ac71 100644 --- a/internal/pkg/coordinator/monitor.go +++ b/internal/pkg/coordinator/monitor.go @@ -7,6 +7,7 @@ package coordinator import ( "context" "errors" + "fmt" "net" "os" "runtime" @@ -132,7 +133,7 @@ func (m *monitorT) Run(ctx context.Context) (err error) { case hits := <-s.Output(): err = m.handlePolicies(ctx, hits) if err != nil { - return err + m.log.Warn().Err(err).Msgf("Encountered an error while policy leadership changes; continuing to retry.") } case <-mT.C: m.calcMetadata() @@ -140,7 +141,7 @@ func (m *monitorT) Run(ctx context.Context) (err error) { case <-lT.C: err = m.ensureLeadership(ctx) if err != nil { - return err + m.log.Warn().Err(err).Msgf("Encountered an error while checking/assigning policy leaders; continuing to retry.") } lT.Reset(m.checkInterval) case <-ctx.Done(): @@ -157,6 +158,7 @@ func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error { var policy model.Policy err := hit.Unmarshal(&policy) if err != nil { + m.log.Debug().Err(err).Msg("Failed to deserialize policy json") return err } if policy.CoordinatorIdx != 0 { @@ -170,6 +172,7 @@ func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error { // current leader send to its coordinator err = p.cord.Update(ctx, policy) if err != nil { + m.log.Info().Err(err).Msg("Failed to update policy leader") return err } } @@ -192,8 +195,9 @@ func (m *monitorT) handlePolicies(ctx context.Context, hits []es.HitT) error { func (m *monitorT) ensureLeadership(ctx context.Context) error { m.log.Debug().Msg("ensuring leadership of policies") err := dl.EnsureServer(ctx, m.bulker, m.version, m.agentMetadata, m.hostMetadata, dl.WithIndexName(m.serversIndex)) + if err != nil { - return err + return fmt.Errorf("Failed to check server status on Elasticsearch (%s): %w", m.hostMetadata.Name, err) } // fetch current policies and leaders @@ -204,7 +208,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { m.log.Debug().Str("index", m.policiesIndex).Msg(es.ErrIndexNotFound.Error()) return nil } - return err + return fmt.Errorf("Encountered error while querying policies: %w", err) } if len(policies) > 0 { ids := make([]string, len(policies)) @@ -214,7 +218,7 @@ func (m *monitorT) ensureLeadership(ctx context.Context) error { leaders, err = dl.SearchPolicyLeaders(ctx, m.bulker, ids, dl.WithIndexName(m.leadersIndex)) if err != nil { if !errors.Is(err, es.ErrIndexNotFound) { - return err + return fmt.Errorf("Encountered error while fetching policy leaders: %w", err) } } }