Skip to content

Commit

Permalink
Simplify starting notifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
mszostok committed Jul 8, 2024
1 parent dea8ebe commit c472d4d
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 164 deletions.
140 changes: 45 additions & 95 deletions cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,116 +264,78 @@ func run(ctx context.Context) (err error) {
Index: commGroupIdx + 1,
}

scheduleBotNotifier := func(in bot.Bot, key string) {
setHealthBotNotifier(in, key)
bots[key] = in
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter)
return in.Start(ctx)
})
scheduleNotifier := func(provider func() (notifier.Platform, error)) {
app, err := provider()
key := fmt.Sprintf("%s-%s", commGroupName, app.IntegrationName())
if err != nil {
commGroupLogger.WithError(err).Errorf("while creating %s bot", app.IntegrationName())
healthChecker.AddNotifier(key, health.NewFailed(health.FailureReasonConnectionError, err.Error()))
return
}

healthChecker.AddNotifier(key, app)

switch platform := app.(type) {
case notifier.Sink:
sinkNotifiers = append(sinkNotifiers, platform)
case bot.Bot:
bots[key] = platform
errGroup.Go(func() error {
defer analytics.ReportPanicIfOccurs(commGroupLogger, analyticsReporter)
return platform.Start(ctx)
})
}
}

// Run bots
if commGroupCfg.SocketSlack.Enabled {
notifierKey := getNotifierKey(commGroupName, config.SocketSlackCommPlatformIntegration)
sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating SocketSlack bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(sb, notifierKey)
}
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, analyticsReporter)
})
}

if commGroupCfg.CloudSlack.Enabled {
notifierKey := getNotifierKey(commGroupName, config.CloudSlackCommPlatformIntegration)
sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating CloudSlack bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(sb, notifierKey)
}
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, analyticsReporter)
})
}

if commGroupCfg.Mattermost.Enabled {
notifierKey := getNotifierKey(commGroupName, config.MattermostCommPlatformIntegration)
mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating Mattermost bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(mb, notifierKey)
}
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, analyticsReporter)
})
}

if commGroupCfg.CloudTeams.Enabled {
notifierKey := getNotifierKey(commGroupName, config.CloudTeamsCommPlatformIntegration)
ctb, err := bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating CloudTeams bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(ctb, notifierKey)
}
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewCloudTeams(commGroupLogger.WithField(botLogFieldKey, "CloudTeams"), commGroupMeta, commGroupCfg.CloudTeams, conf.Settings.ClusterName, executorFactory, analyticsReporter)
})
}

if commGroupCfg.Discord.Enabled {
notifierKey := getNotifierKey(commGroupName, config.DiscordCommPlatformIntegration)
db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating Discord bot: %s", err.Error())
setHealthBotNotifier(bot.NewBotFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Error(errorMsg)
} else {
scheduleBotNotifier(db, notifierKey)
}
scheduleNotifier(func() (notifier.Platform, error) {
return bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, analyticsReporter)
})
}

// Run sinks
if commGroupCfg.Elasticsearch.Enabled {
notifierKey := getNotifierKey(commGroupName, config.ElasticsearchCommPlatformIntegration)
es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating Elasticsearch sink: %s", err.Error())
setHealthSinkNotifier(sink.NewSinkFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Errorf(errorMsg)
} else {
setHealthSinkNotifier(es, notifierKey)
sinkNotifiers = append(sinkNotifiers, es)
}
scheduleNotifier(func() (notifier.Platform, error) {
return sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, analyticsReporter)
})
}

if commGroupCfg.Webhook.Enabled {
notifierKey := getNotifierKey(commGroupName, config.WebhookCommPlatformIntegration)
wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating Webhook sink: %s", err.Error())
setHealthSinkNotifier(sink.NewSinkFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Errorf(errorMsg)
} else {
setHealthSinkNotifier(wh, notifierKey)
sinkNotifiers = append(sinkNotifiers, wh)
}
scheduleNotifier(func() (notifier.Platform, error) {
return sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, analyticsReporter)
})
}
if commGroupCfg.PagerDuty.Enabled {
notifierKey := getNotifierKey(commGroupName, config.PagerDutyCommPlatformIntegration)
pd, err := sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter)
if err != nil {
errorMsg := fmt.Sprintf("while creating PagerDuty sink: %s", err.Error())
setHealthSinkNotifier(sink.NewSinkFailed(health.FailureReasonConnectionError, errorMsg), notifierKey)
logger.Errorf(errorMsg)
} else {
setHealthSinkNotifier(pd, notifierKey)
sinkNotifiers = append(sinkNotifiers, pd)
}
scheduleNotifier(func() (notifier.Platform, error) {
return sink.NewPagerDuty(commGroupLogger.WithField(sinkLogFieldKey, "PagerDuty"), commGroupMeta.Index, commGroupCfg.PagerDuty, conf.Settings.ClusterName, analyticsReporter)
})
}
}
healthChecker.SetNotifiers(healthNotifiers)

if conf.ConfigWatcher.Enabled {
restarter := reloader.NewRestarter(
Expand Down Expand Up @@ -615,15 +577,3 @@ func findVersions(cli *kubernetes.Clientset) (string, string, error) {

return fmt.Sprintf("K8s Server Version: %s\nBotkube version: %s", k8sVer.String(), botkubeVersion), k8sVer.String(), nil
}

func setHealthBotNotifier(bot bot.HealthNotifierBot, key string) {
healthNotifiers[key] = bot
}

func setHealthSinkNotifier(sink sink.HealthNotifierSink, key string) {
healthNotifiers[key] = sink
}

func getNotifierKey(commGroupName string, commPlatformIntegration config.CommPlatformIntegration) string {
return fmt.Sprintf("%s-%s", commGroupName, commPlatformIntegration)
}
28 changes: 28 additions & 0 deletions internal/health/failed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package health


// Failed represents failed platform.
type Failed struct {
status PlatformStatusMsg
failureReason FailureReasonMsg
errorMsg string
}

// NewFailed creates a new Failed instance.
func NewFailed(failureReason FailureReasonMsg, errorMsg string) *Failed {
return &Failed{
status: StatusUnHealthy,
failureReason: failureReason,
errorMsg: errorMsg,
}
}

// GetStatus gets bot status.
func (b *Failed) GetStatus() PlatformStatus {
return PlatformStatus{
Status: b.status,
Restarts: "0/0",
Reason: b.failureReason,
ErrorMsg: b.errorMsg,
}
}
6 changes: 3 additions & 3 deletions internal/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func NewChecker(ctx context.Context, config *config.Config, stats *plugin.Health
ctx: ctx,
config: config,
pluginHealthStats: stats,
notifiers: map[string]Notifier{},
}
}

Expand Down Expand Up @@ -79,9 +80,8 @@ func (h *Checker) NewServer(log logrus.FieldLogger, port string) *httpx.Server {
return httpx.NewServer(log, addr, router)
}

// SetNotifiers sets platform bots instances.
func (h *Checker) SetNotifiers(notifiers map[string]Notifier) {
h.notifiers = notifiers
func (h *Checker) AddNotifier(key string, notifier Notifier) {
h.notifiers[key] = notifier
}

func (h *Checker) GetStatus() *Status {
Expand Down
35 changes: 0 additions & 35 deletions pkg/bot/bot_failed.go

This file was deleted.

11 changes: 11 additions & 0 deletions pkg/notifier/platform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package notifier

import (
"github.com/kubeshop/botkube/internal/health"
"github.com/kubeshop/botkube/pkg/config"
)

type Platform interface {
GetStatus() health.PlatformStatus
IntegrationName() config.CommPlatformIntegration
}
31 changes: 0 additions & 31 deletions pkg/sink/types.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package sink

import (
"github.com/kubeshop/botkube/internal/health"
"github.com/kubeshop/botkube/pkg/config"
"github.com/kubeshop/botkube/pkg/notifier"
)
Expand All @@ -16,33 +15,3 @@ type AnalyticsReporter interface {
// ReportSinkEnabled reports an enabled sink.
ReportSinkEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error
}

type HealthNotifierSink interface {
GetStatus() health.PlatformStatus
}

// FailedSink mock of sink, uses for healthChecker.
type FailedSink struct {
status health.PlatformStatusMsg
failureReason health.FailureReasonMsg
errorMsg string
}

// NewSinkFailed creates a new FailedSink instance.
func NewSinkFailed(failureReason health.FailureReasonMsg, errorMsg string) *FailedSink {
return &FailedSink{
status: health.StatusUnHealthy,
failureReason: failureReason,
errorMsg: errorMsg,
}
}

// GetStatus gets bot status.
func (s *FailedSink) GetStatus() health.PlatformStatus {
return health.PlatformStatus{
Status: s.status,
Restarts: "0/0",
Reason: s.failureReason,
ErrorMsg: s.errorMsg,
}
}

0 comments on commit c472d4d

Please sign in to comment.