Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hearbeat reporting #1469

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 51 additions & 72 deletions cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,82 +262,78 @@ func run(ctx context.Context) (err error) {
Index: commGroupIdx + 1,
}

scheduleBotNotifier := func(in bot.Bot) {
bots[fmt.Sprintf("%s-%s", commGroupName, in.IntegrationName())] = in
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter)
return in.Start(ctx)
})
scheduleNotifier := func(provider func() (notifier.Platform, error)) {
app, err := provider()
key := fmt.Sprintf("%s-%s", commGroupName, app.IntegrationName())
if err != nil {
commGroupLogger.WithError(err).Errorf("while creating %s bot", app.IntegrationName())
healthChecker.AddNotifier(key, health.NewFailed(health.FailureReasonConnectionError, err.Error()))
return
}

healthChecker.AddNotifier(key, app)

switch platform := app.(type) {
case notifier.Sink:
sinkNotifiers = append(sinkNotifiers, platform)
case bot.Bot:
bots[key] = platform
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter)
return platform.Start(ctx)
})
}
}

// Run bots
if commGroupCfg.SocketSlack.Enabled {
sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating SocketSlack bot", err)
}
scheduleBotNotifier(sb)
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter)
})
}

if commGroupCfg.CloudSlack.Enabled {
sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating CloudSlack bot", err)
}
scheduleBotNotifier(sb)
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter)
})
}

if commGroupCfg.Mattermost.Enabled {
mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Mattermost bot", err)
}
scheduleBotNotifier(mb)
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter)
})
}

if commGroupCfg.CloudTeams.Enabled {
ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating CloudTeams bot", err)
}
scheduleBotNotifier(ctb)
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
})
}

if commGroupCfg.Discord.Enabled {
db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter)
if err != nil {
return reportFatalError("while creating Discord bot", err)
}
scheduleBotNotifier(db)
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter)
})
}

// Run sinks
if commGroupCfg.Elasticsearch.Enabled {
es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter)
if err != nil {
return reportFatalError("while creating Elasticsearch sink", err)
}
sinkNotifiers = append(sinkNotifiers, es)
scheduleNotifier(func() (notifier.Platform, error) {
return sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter)
})
}

if commGroupCfg.Webhook.Enabled {
wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter)
if err != nil {
return reportFatalError("while creating Webhook sink", err)
}

sinkNotifiers = append(sinkNotifiers, wh)
scheduleNotifier(func() (notifier.Platform, error) {
return sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter)
})
}
if commGroupCfg.PagerDuty.Enabled {
pd, err := sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter)
if err != nil {
return reportFatalError("while creating PagerDuty sink", err)
}

sinkNotifiers = append(sinkNotifiers, pd)
scheduleNotifier(func() (notifier.Platform, error) {
return sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter)
})
}
}
healthChecker.SetNotifiers(getHealthNotifiers(bots, sinkNotifiers))

if conf.ConfigWatcher.Enabled {
restarter := reloader.NewRestarter(
Expand Down Expand Up @@ -448,7 +444,7 @@ func run(ctx context.Context) (err error) {
logger.Errorf("while reporting fatal error: %s", reportErr.Error())
}
}()
heartbeatReporter := heartbeat.GetReporter(logger, gqlClient)
heartbeatReporter := heartbeat.GetReporter(logger, gqlClient, healthChecker)
k8sCollector := insights.NewK8sCollector(k8sCli, heartbeatReporter, logger, reportHeartbeatInterval, reportHeartbeatMaxRetries)
return k8sCollector.Start(ctx)
})
Expand Down Expand Up @@ -496,12 +492,7 @@ func getAnalyticsReporter(disableAnalytics bool, logger logrus.FieldLogger) (ana
return nil, fmt.Errorf("while creating new Analytics Client: %w", err)
}

analyticsReporter := analytics.NewSegmentReporter(wrappedLogger, segmentCli)
if err != nil {
return nil, err
}

return analyticsReporter, nil
return analytics.NewSegmentReporter(wrappedLogger, segmentCli), nil
}

func getK8sClients(cfg *rest.Config) (dynamic.Interface, discovery.DiscoveryInterface, error) {
Expand Down Expand Up @@ -555,15 +546,15 @@ func sendHelp(ctx context.Context, s *storage.Help, clusterName string, executor

var sent []string

for key, notifier := range notifiers {
for key, notifierItem := range notifiers {
if alreadySentHelp[key] {
continue
}

help := interactive.NewHelpMessage(notifier.IntegrationName(), clusterName, executors).Build(true)
err := notifier.SendMessageToAll(ctx, help)
help := interactive.NewHelpMessage(notifierItem.IntegrationName(), clusterName, executors).Build(true)
err := notifierItem.SendMessageToAll(ctx, help)
if err != nil {
return fmt.Errorf("while sending help message for %s: %w", notifier.IntegrationName(), err)
return fmt.Errorf("while sending help message for %s: %w", notifierItem.IntegrationName(), err)
}
sent = append(sent, key)
}
Expand All @@ -584,15 +575,3 @@ func findVersions(cli *kubernetes.Clientset) (string, string, error) {

return fmt.Sprintf("K8s Server Version: %s\nBotkube version: %s", k8sVer.String(), botkubeVersion), k8sVer.String(), nil
}

func getHealthNotifiers(bots map[string]bot.Bot, sinks []notifier.Sink) map[string]health.Notifier {
notifiers := make(map[string]health.Notifier)
for key, botInstance := range bots {
notifiers[key] = botInstance
}
for key, sinkInstance := range sinks {
notifiers[fmt.Sprintf("%s-%d", sinkInstance.IntegrationName(), key)] = sinkInstance
}

return notifiers
}
27 changes: 27 additions & 0 deletions internal/health/failed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package health

// Failed represents failed platform.
type Failed struct {
status PlatformStatusMsg
failureReason FailureReasonMsg
errorMsg string
}

// NewFailed creates a new Failed instance.
func NewFailed(failureReason FailureReasonMsg, errorMsg string) *Failed {
return &Failed{
status: StatusUnHealthy,
failureReason: failureReason,
errorMsg: errorMsg,
}
}

// GetStatus gets bot status.
func (b *Failed) GetStatus() PlatformStatus {
return PlatformStatus{
Status: b.status,
Restarts: "0/0",
Reason: b.failureReason,
ErrorMsg: b.errorMsg,
}
}
25 changes: 13 additions & 12 deletions internal/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func NewChecker(ctx context.Context, config *config.Config, stats *plugin.Health
ctx: ctx,
config: config,
pluginHealthStats: stats,
notifiers: map[string]Notifier{},
}
}

Expand All @@ -60,7 +61,7 @@ func (h *Checker) ServeHTTP(resp http.ResponseWriter, _ *http.Request) {
}
resp.Header().Set("Content-Type", "application/json")

status := h.getStatus()
status := h.GetStatus()
respJSon, err := json.Marshal(status)
if err != nil {
http.Error(resp, err.Error(), http.StatusInternalServerError)
Expand All @@ -79,26 +80,26 @@ func (h *Checker) NewServer(log logrus.FieldLogger, port string) *httpx.Server {
return httpx.NewServer(log, addr, router)
}

// SetNotifiers sets platform bots instances.
func (h *Checker) SetNotifiers(notifiers map[string]Notifier) {
h.notifiers = notifiers
// AddNotifier add platform bot instance
func (h *Checker) AddNotifier(key string, notifier Notifier) {
h.notifiers[key] = notifier
}

func (h *Checker) getStatus() *status {
pluginsStats := make(map[string]pluginStatuses)
func (h *Checker) GetStatus() *Status {
pluginsStats := make(map[string]PluginStatus)
h.collectSourcePluginsStatuses(pluginsStats)
h.collectExecutorPluginsStatuses(pluginsStats)

return &status{
Botkube: botStatus{
return &Status{
Botkube: BotStatus{
Status: h.getBotkubeStatus(),
},
Plugins: pluginsStats,
Platforms: h.getPlatformsStatus(),
}
}

func (h *Checker) collectSourcePluginsStatuses(plugins map[string]pluginStatuses) {
func (h *Checker) collectSourcePluginsStatuses(plugins map[string]PluginStatus) {
if h.config == nil {
return
}
Expand All @@ -109,7 +110,7 @@ func (h *Checker) collectSourcePluginsStatuses(plugins map[string]pluginStatuses
}
}

func (h *Checker) collectExecutorPluginsStatuses(plugins map[string]pluginStatuses) {
func (h *Checker) collectExecutorPluginsStatuses(plugins map[string]PluginStatus) {
if h.config == nil {
return
}
Expand All @@ -120,9 +121,9 @@ func (h *Checker) collectExecutorPluginsStatuses(plugins map[string]pluginStatus
}
}

func (h *Checker) collectPluginStatus(plugins map[string]pluginStatuses, pluginConfigName string, pluginName string, enabled bool) {
func (h *Checker) collectPluginStatus(plugins map[string]PluginStatus, pluginConfigName string, pluginName string, enabled bool) {
status, restarts, threshold, _ := h.pluginHealthStats.GetStats(pluginName)
plugins[pluginConfigName] = pluginStatuses{
plugins[pluginConfigName] = PluginStatus{
Enabled: enabled,
Status: status,
Restarts: fmt.Sprintf("%d/%d", restarts, threshold),
Expand Down
8 changes: 4 additions & 4 deletions internal/health/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
func TestServeHTTPUnavailable(t *testing.T) {
// given
checker := NewChecker(context.TODO(), &config.Config{}, nil)
expectedStatus := checker.getStatus()
expectedStatus := checker.GetStatus()

req, err := http.NewRequest("GET", "/", nil)
require.NoError(t, err)
Expand All @@ -29,7 +29,7 @@ func TestServeHTTPUnavailable(t *testing.T) {
assert.Equal(t, http.StatusServiceUnavailable, rr.Code)
assert.Equal(t, "application/json", rr.Header().Get("Content-Type"))

var resp status
var resp Status
err = json.Unmarshal(rr.Body.Bytes(), &resp)
require.NoError(t, err)

Expand All @@ -41,7 +41,7 @@ func TestServeHTTPOK(t *testing.T) {
// given
checker := NewChecker(context.TODO(), &config.Config{}, nil)
checker.MarkAsReady()
expectedStatus := checker.getStatus()
expectedStatus := checker.GetStatus()

req, err := http.NewRequest("GET", "/", nil)
require.NoError(t, err)
Expand All @@ -54,7 +54,7 @@ func TestServeHTTPOK(t *testing.T) {
assert.Equal(t, http.StatusOK, rr.Code)
assert.Equal(t, "application/json", rr.Header().Get("Content-Type"))

var resp status
var resp Status
err = json.Unmarshal(rr.Body.Bytes(), &resp)
require.NoError(t, err)

Expand Down
15 changes: 8 additions & 7 deletions internal/health/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,24 @@ type PlatformStatus struct {
Status PlatformStatusMsg `json:"status,omitempty"`
Restarts string `json:"restarts,omitempty"`
Reason FailureReasonMsg `json:"reason,omitempty"`
ErrorMsg string `json:"errorMsg,omitempty"`
}

// status defines bot agent status.
type status struct {
Botkube botStatus `json:"botkube"`
Plugins map[string]pluginStatuses `json:"plugins,omitempty"`
Platforms platformStatuses `json:"platforms,omitempty"`
// Status defines bot agent status.
type Status struct {
Botkube BotStatus `json:"botkube"`
Plugins map[string]PluginStatus `json:"plugins,omitempty"`
Platforms platformStatuses `json:"platforms,omitempty"`
}

type platformStatuses map[string]PlatformStatus

type pluginStatuses struct {
type PluginStatus struct {
Enabled bool `json:"enabled,omitempty"`
Status string `json:"status,omitempty"`
Restarts string `json:"restarts,omitempty"`
}

type botStatus struct {
type BotStatus struct {
Status BotkubeStatus `json:"status,omitempty"`
}
Loading
Loading