Skip to content

Commit

Permalink
Report Communication Group index and convert command report to struct
Browse files Browse the repository at this point in the history
  • Loading branch information
pkosiec committed Oct 12, 2023
1 parent 954f8aa commit 0618dd5
Show file tree
Hide file tree
Showing 20 changed files with 240 additions and 190 deletions.
20 changes: 12 additions & 8 deletions cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,10 @@ func run(ctx context.Context) (err error) {
// For example, if in both communication groups there's a Slack configuration pointing to the same workspace,
// when user executes `kubectl` command, one Bot instance will execute the command and return response,
// and the second "Sorry, this channel is not authorized to execute kubectl command" error.
commGroupIdx := 1
for commGroupName, commGroupCfg := range conf.Communications {
commGroupLogger := logger.WithField(commGroupFieldKey, commGroupName)
commGroupMeta := bot.CommGroupMetadata{Name: commGroupName, Index: commGroupIdx}

scheduleBotNotifier := func(in bot.Bot) {
bots[fmt.Sprintf("%s-%s", commGroupName, in.IntegrationName())] = in
Expand All @@ -252,47 +254,47 @@ func run(ctx context.Context) (err error) {

// Run bots
if commGroupCfg.Slack.Enabled {
sb, err := bot.NewSlack(commGroupLogger.WithField(botLogFieldKey, "Slack"), commGroupName, commGroupCfg.Slack, executorFactory, reporter)
sb, err := bot.NewSlack(commGroupLogger.WithField(botLogFieldKey, "Slack"), commGroupMeta, commGroupCfg.Slack, executorFactory, reporter)
if err != nil {
return reportFatalError("while creating Slack bot", err)
}
scheduleBotNotifier(sb)
}

if commGroupCfg.SocketSlack.Enabled {
sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupName, commGroupCfg.SocketSlack, executorFactory, reporter)
sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, reporter)
if err != nil {
return reportFatalError("while creating SocketSlack bot", err)
}
scheduleBotNotifier(sb)
}

if commGroupCfg.CloudSlack.Enabled {
sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupName, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, reporter)
sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, reporter)
if err != nil {
return reportFatalError("while creating CloudSlack bot", err)
}
scheduleBotNotifier(sb)
}

if commGroupCfg.Mattermost.Enabled {
mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupName, commGroupCfg.Mattermost, executorFactory, reporter)
mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, reporter)
if err != nil {
return reportFatalError("while creating Mattermost bot", err)
}
scheduleBotNotifier(mb)
}

if commGroupCfg.Teams.Enabled {
tb, err := bot.NewTeams(commGroupLogger.WithField(botLogFieldKey, "MS Teams"), commGroupName, commGroupCfg.Teams, conf.Settings.ClusterName, executorFactory, reporter)
tb, err := bot.NewTeams(commGroupLogger.WithField(botLogFieldKey, "MS Teams"), commGroupMeta, commGroupCfg.Teams, conf.Settings.ClusterName, executorFactory, reporter)
if err != nil {
return reportFatalError("while creating Teams bot", err)
}
scheduleBotNotifier(tb)
}

if commGroupCfg.Discord.Enabled {
db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupName, commGroupCfg.Discord, executorFactory, reporter)
db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, reporter)
if err != nil {
return reportFatalError("while creating Discord bot", err)
}
Expand All @@ -301,21 +303,23 @@ func run(ctx context.Context) (err error) {

// Run sinks
if commGroupCfg.Elasticsearch.Enabled {
es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupCfg.Elasticsearch, reporter)
es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, reporter)
if err != nil {
return reportFatalError("while creating Elasticsearch sink", err)
}
sinkNotifiers = append(sinkNotifiers, es)
}

if commGroupCfg.Webhook.Enabled {
wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupCfg.Webhook, reporter)
wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, reporter)
if err != nil {
return reportFatalError("while creating Webhook sink", err)
}

sinkNotifiers = append(sinkNotifiers, wh)
}

commGroupIdx++
}
healthChecker.SetNotifiers(getHealthNotifiers(bots, sinkNotifiers))

Expand Down
7 changes: 3 additions & 4 deletions internal/analytics/noop_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"k8s.io/client-go/kubernetes"

"github.com/kubeshop/botkube/pkg/config"
"github.com/kubeshop/botkube/pkg/execute/command"
)

var _ Reporter = &NoopReporter{}
Expand All @@ -25,17 +24,17 @@ func (n NoopReporter) RegisterCurrentIdentity(_ context.Context, _ kubernetes.In
}

// ReportCommand reports a new executed command. The command should be anonymized before using this method.
func (n NoopReporter) ReportCommand(_ config.CommPlatformIntegration, _, _ string, _ command.Origin, _ bool) error {
func (n NoopReporter) ReportCommand(_ ReportCommand) error {
return nil
}

// ReportBotEnabled reports an enabled bot.
func (n NoopReporter) ReportBotEnabled(_ config.CommPlatformIntegration) error {
func (n NoopReporter) ReportBotEnabled(_ config.CommPlatformIntegration, _ int) error {
return nil
}

// ReportSinkEnabled reports an enabled sink.
func (n NoopReporter) ReportSinkEnabled(_ config.CommPlatformIntegration) error {
func (n NoopReporter) ReportSinkEnabled(_ config.CommPlatformIntegration, _ int) error {
return nil
}

Expand Down
14 changes: 11 additions & 3 deletions internal/analytics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type Reporter interface {
RegisterCurrentIdentity(ctx context.Context, k8sCli kubernetes.Interface, deployID string) error

// ReportCommand reports a new executed command. The command should be anonymized before using this method.
ReportCommand(platform config.CommPlatformIntegration, pluginName, command string, origin command.Origin, withFilter bool) error
ReportCommand(in ReportCommand) error

// ReportBotEnabled reports an enabled bot.
ReportBotEnabled(platform config.CommPlatformIntegration) error
ReportBotEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error

// ReportSinkEnabled reports an enabled sink.
ReportSinkEnabled(platform config.CommPlatformIntegration) error
ReportSinkEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error

// ReportHandledEventSuccess reports a successfully handled event using a given integration type, communication platform, and plugin.
ReportHandledEventSuccess(event ReportEvent) error
Expand All @@ -42,3 +42,11 @@ type ReportEvent struct {
PluginName string
AnonymizedEventFields map[string]any
}

type ReportCommand struct {
Platform config.CommPlatformIntegration
PluginName string
Command string
Origin command.Origin
WithFilter bool
}
19 changes: 10 additions & 9 deletions internal/analytics/segment_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"k8s.io/client-go/kubernetes"

"github.com/kubeshop/botkube/pkg/config"
"github.com/kubeshop/botkube/pkg/execute/command"
"github.com/kubeshop/botkube/pkg/version"
)

Expand Down Expand Up @@ -69,31 +68,33 @@ func (r *SegmentReporter) RegisterCurrentIdentity(ctx context.Context, k8sCli ku

// ReportCommand reports a new executed command. The command should be anonymized before using this method.
// The RegisterCurrentIdentity needs to be called first.
func (r *SegmentReporter) ReportCommand(platform config.CommPlatformIntegration, pluginName, command string, origin command.Origin, withFilter bool) error {
func (r *SegmentReporter) ReportCommand(in ReportCommand) error {
return r.reportEvent("Command executed", map[string]interface{}{
"platform": platform,
"command": command,
"plugin": pluginName,
"origin": origin,
"filtered": withFilter,
"platform": in.Platform,
"command": in.Command,
"plugin": in.PluginName,
"origin": in.Origin,
"filtered": in.WithFilter,
})
}

// ReportBotEnabled reports an enabled bot.
// The RegisterCurrentIdentity needs to be called first.
func (r *SegmentReporter) ReportBotEnabled(platform config.CommPlatformIntegration) error {
func (r *SegmentReporter) ReportBotEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error {
return r.reportEvent("Integration enabled", map[string]interface{}{
"platform": platform,
"type": config.BotIntegrationType,
"index": commGroupIdx,
})
}

// ReportSinkEnabled reports an enabled sink.
// The RegisterCurrentIdentity needs to be called first.
func (r *SegmentReporter) ReportSinkEnabled(platform config.CommPlatformIntegration) error {
func (r *SegmentReporter) ReportSinkEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error {
return r.reportEvent("Integration enabled", map[string]interface{}{
"platform": platform,
"type": config.SinkIntegrationType,
"index": commGroupIdx,
})
}

Expand Down
30 changes: 22 additions & 8 deletions internal/analytics/segment_reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,27 @@ func TestSegmentReporter_ReportCommand(t *testing.T) {
segmentReporter, segmentCli := fakeSegmentReporterWithIdentity(identity)

// when
err := segmentReporter.ReportCommand(config.DiscordCommPlatformIntegration, "", "enable notifications", command.TypedOrigin, false)
err := segmentReporter.ReportCommand(analytics.ReportCommand{
Platform: config.DiscordCommPlatformIntegration,
Command: "enable notifications",
Origin: command.TypedOrigin,
})
require.NoError(t, err)

err = segmentReporter.ReportCommand(config.SlackCommPlatformIntegration, "botkube/kubectl", "get", command.ButtonClickOrigin, false)
err = segmentReporter.ReportCommand(analytics.ReportCommand{
Platform: config.SlackCommPlatformIntegration,
PluginName: "botkube/kubectl",
Command: "get",
Origin: command.ButtonClickOrigin,
WithFilter: true,
})
require.NoError(t, err)

err = segmentReporter.ReportCommand(config.TeamsCommPlatformIntegration, "", "disable notifications", command.SelectValueChangeOrigin, false)
err = segmentReporter.ReportCommand(analytics.ReportCommand{
Platform: config.TeamsCommPlatformIntegration,
Command: "disable notifications",
Origin: command.SelectValueChangeOrigin,
})
require.NoError(t, err)

// then
Expand All @@ -120,15 +134,15 @@ func TestSegmentReporter_ReportBotEnabled(t *testing.T) {
segmentReporter, segmentCli := fakeSegmentReporterWithIdentity(identity)

// when
err := segmentReporter.ReportBotEnabled(config.SlackCommPlatformIntegration)
err := segmentReporter.ReportBotEnabled(config.SlackCommPlatformIntegration, 1)
require.NoError(t, err)

// when
err = segmentReporter.ReportBotEnabled(config.DiscordCommPlatformIntegration)
err = segmentReporter.ReportBotEnabled(config.DiscordCommPlatformIntegration, 2)
require.NoError(t, err)

// when
err = segmentReporter.ReportBotEnabled(config.TeamsCommPlatformIntegration)
err = segmentReporter.ReportBotEnabled(config.TeamsCommPlatformIntegration, 1)
require.NoError(t, err)

// then
Expand All @@ -141,11 +155,11 @@ func TestSegmentReporter_ReportSinkEnabled(t *testing.T) {
segmentReporter, segmentCli := fakeSegmentReporterWithIdentity(identity)

// when
err := segmentReporter.ReportSinkEnabled(config.WebhookCommPlatformIntegration)
err := segmentReporter.ReportSinkEnabled(config.WebhookCommPlatformIntegration, 1)
require.NoError(t, err)

// when
err = segmentReporter.ReportSinkEnabled(config.ElasticsearchCommPlatformIntegration)
err = segmentReporter.ReportSinkEnabled(config.ElasticsearchCommPlatformIntegration, 2)
require.NoError(t, err)

// then
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"event": "Integration enabled",
"timestamp": "2009-11-17T20:34:58.651387237Z",
"properties": {
"index": 1,
"platform": "slack",
"type": "bot"
}
Expand All @@ -17,6 +18,7 @@
"event": "Integration enabled",
"timestamp": "2009-11-17T20:34:58.651387237Z",
"properties": {
"index": 2,
"platform": "discord",
"type": "bot"
}
Expand All @@ -28,6 +30,7 @@
"event": "Integration enabled",
"timestamp": "2009-11-17T20:34:58.651387237Z",
"properties": {
"index": 1,
"platform": "teams",
"type": "bot"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"timestamp": "2009-11-17T20:34:58.651387237Z",
"properties": {
"command": "get",
"filtered": false,
"filtered": true,
"origin": "buttonClick",
"platform": "slack",
"plugin": "botkube/kubectl"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"event": "Integration enabled",
"timestamp": "2009-11-17T20:34:58.651387237Z",
"properties": {
"index": 1,
"platform": "webhook",
"type": "sink"
}
Expand All @@ -17,6 +18,7 @@
"event": "Integration enabled",
"timestamp": "2009-11-17T20:34:58.651387237Z",
"properties": {
"index": 2,
"platform": "elasticsearch",
"type": "sink"
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type ExecutorFactory interface {
// AnalyticsReporter defines a reporter that collects analytics data.
type AnalyticsReporter interface {
// ReportBotEnabled reports an enabled bot.
ReportBotEnabled(platform config.CommPlatformIntegration) error
ReportBotEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error
}

// FatalErrorAnalyticsReporter reports a fatal errors.
Expand Down Expand Up @@ -64,6 +64,11 @@ type channelConfigByName struct {
notify bool
}

type CommGroupMetadata struct {
Name string
Index int
}

func AsNotifiers(bots map[string]Bot) []notifier.Bot {
notifiers := make([]notifier.Bot, 0, len(bots))
for _, bot := range bots {
Expand Down
10 changes: 5 additions & 5 deletions pkg/bot/discord.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type Discord struct {
channels map[string]channelConfigByID
notifyMutex sync.Mutex
botMentionRegex *regexp.Regexp
commGroupName string
commGroupMetadata CommGroupMetadata
renderer *DiscordRenderer
messages chan discordMessage
discordMessageWorkers *pool.Pool
Expand All @@ -65,7 +65,7 @@ type discordMessage struct {
}

// NewDiscord creates a new Discord instance.
func NewDiscord(log logrus.FieldLogger, commGroupName string, cfg config.Discord, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Discord, error) {
func NewDiscord(log logrus.FieldLogger, commGroupMetadata CommGroupMetadata, cfg config.Discord, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Discord, error) {
botMentionRegex, err := discordBotMentionRegex(cfg.BotID)
if err != nil {
return nil, err
Expand All @@ -87,7 +87,7 @@ func NewDiscord(log logrus.FieldLogger, commGroupName string, cfg config.Discord
executorFactory: executorFactory,
api: api,
botID: cfg.BotID,
commGroupName: commGroupName,
commGroupMetadata: commGroupMetadata,
channels: channelsCfg,
botMentionRegex: botMentionRegex,
renderer: NewDiscordRenderer(),
Expand Down Expand Up @@ -128,7 +128,7 @@ func (b *Discord) Start(ctx context.Context) error {
return fmt.Errorf("while opening connection: %w", err)
}

err = b.reporter.ReportBotEnabled(b.IntegrationName())
err = b.reporter.ReportBotEnabled(b.IntegrationName(), b.commGroupMetadata.Index)
if err != nil {
b.setFailureReason(health.FailureReasonConnectionError)
return fmt.Errorf("while reporting analytics: %w", err)
Expand Down Expand Up @@ -251,7 +251,7 @@ func (b *Discord) handleMessage(ctx context.Context, dm discordMessage) error {
}

e := b.executorFactory.NewDefault(execute.NewDefaultInput{
CommGroupName: b.commGroupName,
CommGroupName: b.commGroupMetadata.Name,
Platform: b.IntegrationName(),
NotifierHandler: b,
Conversation: execute.Conversation{
Expand Down
Loading

0 comments on commit 0618dd5

Please sign in to comment.