From 0618dd5ec9fbb36b579b6deddd68c198a8aacff0 Mon Sep 17 00:00:00 2001 From: Pawel Kosiec Date: Thu, 12 Oct 2023 12:55:45 +0200 Subject: [PATCH] Report Communication Group index and convert command report to struct --- cmd/botkube-agent/main.go | 20 +++-- internal/analytics/noop_reporter.go | 7 +- internal/analytics/reporter.go | 14 ++- internal/analytics/segment_reporter.go | 19 ++-- internal/analytics/segment_reporter_test.go | 30 +++++-- .../TestSegmentReporter_ReportBotEnabled.json | 3 + .../TestSegmentReporter_ReportCommand.json | 2 +- ...TestSegmentReporter_ReportSinkEnabled.json | 2 + pkg/bot/bot.go | 7 +- pkg/bot/discord.go | 10 +-- pkg/bot/mattermost.go | 86 +++++++++---------- pkg/bot/slack_cloud.go | 82 ++++++++++-------- pkg/bot/slack_legacy.go | 10 +-- pkg/bot/slack_socket.go | 76 ++++++++-------- pkg/bot/teams.go | 40 ++++----- pkg/execute/executor.go | 9 +- pkg/execute/factory.go | 3 +- pkg/sink/elasticsearch.go | 4 +- pkg/sink/types.go | 2 +- pkg/sink/webhook.go | 4 +- 20 files changed, 240 insertions(+), 190 deletions(-) diff --git a/cmd/botkube-agent/main.go b/cmd/botkube-agent/main.go index 1345e4d09e..154a3969c0 100644 --- a/cmd/botkube-agent/main.go +++ b/cmd/botkube-agent/main.go @@ -239,8 +239,10 @@ func run(ctx context.Context) (err error) { // For example, if in both communication groups there's a Slack configuration pointing to the same workspace, // when user executes `kubectl` command, one Bot instance will execute the command and return response, // and the second "Sorry, this channel is not authorized to execute kubectl command" error. + commGroupIdx := 1 for commGroupName, commGroupCfg := range conf.Communications { commGroupLogger := logger.WithField(commGroupFieldKey, commGroupName) + commGroupMeta := bot.CommGroupMetadata{Name: commGroupName, Index: commGroupIdx} scheduleBotNotifier := func(in bot.Bot) { bots[fmt.Sprintf("%s-%s", commGroupName, in.IntegrationName())] = in @@ -252,7 +254,7 @@ func run(ctx context.Context) (err error) { // Run bots if commGroupCfg.Slack.Enabled { - sb, err := bot.NewSlack(commGroupLogger.WithField(botLogFieldKey, "Slack"), commGroupName, commGroupCfg.Slack, executorFactory, reporter) + sb, err := bot.NewSlack(commGroupLogger.WithField(botLogFieldKey, "Slack"), commGroupMeta, commGroupCfg.Slack, executorFactory, reporter) if err != nil { return reportFatalError("while creating Slack bot", err) } @@ -260,7 +262,7 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.SocketSlack.Enabled { - sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupName, commGroupCfg.SocketSlack, executorFactory, reporter) + sb, err := bot.NewSocketSlack(commGroupLogger.WithField(botLogFieldKey, "SocketSlack"), commGroupMeta, commGroupCfg.SocketSlack, executorFactory, reporter) if err != nil { return reportFatalError("while creating SocketSlack bot", err) } @@ -268,7 +270,7 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.CloudSlack.Enabled { - sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupName, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, reporter) + sb, err := bot.NewCloudSlack(commGroupLogger.WithField(botLogFieldKey, "CloudSlack"), commGroupMeta, commGroupCfg.CloudSlack, conf.Settings.ClusterName, executorFactory, reporter) if err != nil { return reportFatalError("while creating CloudSlack bot", err) } @@ -276,7 +278,7 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.Mattermost.Enabled { - mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupName, commGroupCfg.Mattermost, executorFactory, reporter) + mb, err := bot.NewMattermost(ctx, commGroupLogger.WithField(botLogFieldKey, "Mattermost"), commGroupMeta, commGroupCfg.Mattermost, executorFactory, reporter) if err != nil { return reportFatalError("while creating Mattermost bot", err) } @@ -284,7 +286,7 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.Teams.Enabled { - tb, err := bot.NewTeams(commGroupLogger.WithField(botLogFieldKey, "MS Teams"), commGroupName, commGroupCfg.Teams, conf.Settings.ClusterName, executorFactory, reporter) + tb, err := bot.NewTeams(commGroupLogger.WithField(botLogFieldKey, "MS Teams"), commGroupMeta, commGroupCfg.Teams, conf.Settings.ClusterName, executorFactory, reporter) if err != nil { return reportFatalError("while creating Teams bot", err) } @@ -292,7 +294,7 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.Discord.Enabled { - db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupName, commGroupCfg.Discord, executorFactory, reporter) + db, err := bot.NewDiscord(commGroupLogger.WithField(botLogFieldKey, "Discord"), commGroupMeta, commGroupCfg.Discord, executorFactory, reporter) if err != nil { return reportFatalError("while creating Discord bot", err) } @@ -301,7 +303,7 @@ func run(ctx context.Context) (err error) { // Run sinks if commGroupCfg.Elasticsearch.Enabled { - es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupCfg.Elasticsearch, reporter) + es, err := sink.NewElasticsearch(commGroupLogger.WithField(sinkLogFieldKey, "Elasticsearch"), commGroupMeta.Index, commGroupCfg.Elasticsearch, reporter) if err != nil { return reportFatalError("while creating Elasticsearch sink", err) } @@ -309,13 +311,15 @@ func run(ctx context.Context) (err error) { } if commGroupCfg.Webhook.Enabled { - wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupCfg.Webhook, reporter) + wh, err := sink.NewWebhook(commGroupLogger.WithField(sinkLogFieldKey, "Webhook"), commGroupMeta.Index, commGroupCfg.Webhook, reporter) if err != nil { return reportFatalError("while creating Webhook sink", err) } sinkNotifiers = append(sinkNotifiers, wh) } + + commGroupIdx++ } healthChecker.SetNotifiers(getHealthNotifiers(bots, sinkNotifiers)) diff --git a/internal/analytics/noop_reporter.go b/internal/analytics/noop_reporter.go index 66dfd07a0b..7e5c9481a9 100644 --- a/internal/analytics/noop_reporter.go +++ b/internal/analytics/noop_reporter.go @@ -6,7 +6,6 @@ import ( "k8s.io/client-go/kubernetes" "github.com/kubeshop/botkube/pkg/config" - "github.com/kubeshop/botkube/pkg/execute/command" ) var _ Reporter = &NoopReporter{} @@ -25,17 +24,17 @@ func (n NoopReporter) RegisterCurrentIdentity(_ context.Context, _ kubernetes.In } // ReportCommand reports a new executed command. The command should be anonymized before using this method. -func (n NoopReporter) ReportCommand(_ config.CommPlatformIntegration, _, _ string, _ command.Origin, _ bool) error { +func (n NoopReporter) ReportCommand(_ ReportCommand) error { return nil } // ReportBotEnabled reports an enabled bot. -func (n NoopReporter) ReportBotEnabled(_ config.CommPlatformIntegration) error { +func (n NoopReporter) ReportBotEnabled(_ config.CommPlatformIntegration, _ int) error { return nil } // ReportSinkEnabled reports an enabled sink. -func (n NoopReporter) ReportSinkEnabled(_ config.CommPlatformIntegration) error { +func (n NoopReporter) ReportSinkEnabled(_ config.CommPlatformIntegration, _ int) error { return nil } diff --git a/internal/analytics/reporter.go b/internal/analytics/reporter.go index 59c6e2eaad..2e13d3df16 100644 --- a/internal/analytics/reporter.go +++ b/internal/analytics/reporter.go @@ -15,13 +15,13 @@ type Reporter interface { RegisterCurrentIdentity(ctx context.Context, k8sCli kubernetes.Interface, deployID string) error // ReportCommand reports a new executed command. The command should be anonymized before using this method. - ReportCommand(platform config.CommPlatformIntegration, pluginName, command string, origin command.Origin, withFilter bool) error + ReportCommand(in ReportCommand) error // ReportBotEnabled reports an enabled bot. - ReportBotEnabled(platform config.CommPlatformIntegration) error + ReportBotEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error // ReportSinkEnabled reports an enabled sink. - ReportSinkEnabled(platform config.CommPlatformIntegration) error + ReportSinkEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error // ReportHandledEventSuccess reports a successfully handled event using a given integration type, communication platform, and plugin. ReportHandledEventSuccess(event ReportEvent) error @@ -42,3 +42,11 @@ type ReportEvent struct { PluginName string AnonymizedEventFields map[string]any } + +type ReportCommand struct { + Platform config.CommPlatformIntegration + PluginName string + Command string + Origin command.Origin + WithFilter bool +} diff --git a/internal/analytics/segment_reporter.go b/internal/analytics/segment_reporter.go index f9a5741da8..693c4e0881 100644 --- a/internal/analytics/segment_reporter.go +++ b/internal/analytics/segment_reporter.go @@ -11,7 +11,6 @@ import ( "k8s.io/client-go/kubernetes" "github.com/kubeshop/botkube/pkg/config" - "github.com/kubeshop/botkube/pkg/execute/command" "github.com/kubeshop/botkube/pkg/version" ) @@ -69,31 +68,33 @@ func (r *SegmentReporter) RegisterCurrentIdentity(ctx context.Context, k8sCli ku // ReportCommand reports a new executed command. The command should be anonymized before using this method. // The RegisterCurrentIdentity needs to be called first. -func (r *SegmentReporter) ReportCommand(platform config.CommPlatformIntegration, pluginName, command string, origin command.Origin, withFilter bool) error { +func (r *SegmentReporter) ReportCommand(in ReportCommand) error { return r.reportEvent("Command executed", map[string]interface{}{ - "platform": platform, - "command": command, - "plugin": pluginName, - "origin": origin, - "filtered": withFilter, + "platform": in.Platform, + "command": in.Command, + "plugin": in.PluginName, + "origin": in.Origin, + "filtered": in.WithFilter, }) } // ReportBotEnabled reports an enabled bot. // The RegisterCurrentIdentity needs to be called first. -func (r *SegmentReporter) ReportBotEnabled(platform config.CommPlatformIntegration) error { +func (r *SegmentReporter) ReportBotEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error { return r.reportEvent("Integration enabled", map[string]interface{}{ "platform": platform, "type": config.BotIntegrationType, + "index": commGroupIdx, }) } // ReportSinkEnabled reports an enabled sink. // The RegisterCurrentIdentity needs to be called first. -func (r *SegmentReporter) ReportSinkEnabled(platform config.CommPlatformIntegration) error { +func (r *SegmentReporter) ReportSinkEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error { return r.reportEvent("Integration enabled", map[string]interface{}{ "platform": platform, "type": config.SinkIntegrationType, + "index": commGroupIdx, }) } diff --git a/internal/analytics/segment_reporter_test.go b/internal/analytics/segment_reporter_test.go index 4f614b36b7..59f0441e8b 100644 --- a/internal/analytics/segment_reporter_test.go +++ b/internal/analytics/segment_reporter_test.go @@ -101,13 +101,27 @@ func TestSegmentReporter_ReportCommand(t *testing.T) { segmentReporter, segmentCli := fakeSegmentReporterWithIdentity(identity) // when - err := segmentReporter.ReportCommand(config.DiscordCommPlatformIntegration, "", "enable notifications", command.TypedOrigin, false) + err := segmentReporter.ReportCommand(analytics.ReportCommand{ + Platform: config.DiscordCommPlatformIntegration, + Command: "enable notifications", + Origin: command.TypedOrigin, + }) require.NoError(t, err) - err = segmentReporter.ReportCommand(config.SlackCommPlatformIntegration, "botkube/kubectl", "get", command.ButtonClickOrigin, false) + err = segmentReporter.ReportCommand(analytics.ReportCommand{ + Platform: config.SlackCommPlatformIntegration, + PluginName: "botkube/kubectl", + Command: "get", + Origin: command.ButtonClickOrigin, + WithFilter: true, + }) require.NoError(t, err) - err = segmentReporter.ReportCommand(config.TeamsCommPlatformIntegration, "", "disable notifications", command.SelectValueChangeOrigin, false) + err = segmentReporter.ReportCommand(analytics.ReportCommand{ + Platform: config.TeamsCommPlatformIntegration, + Command: "disable notifications", + Origin: command.SelectValueChangeOrigin, + }) require.NoError(t, err) // then @@ -120,15 +134,15 @@ func TestSegmentReporter_ReportBotEnabled(t *testing.T) { segmentReporter, segmentCli := fakeSegmentReporterWithIdentity(identity) // when - err := segmentReporter.ReportBotEnabled(config.SlackCommPlatformIntegration) + err := segmentReporter.ReportBotEnabled(config.SlackCommPlatformIntegration, 1) require.NoError(t, err) // when - err = segmentReporter.ReportBotEnabled(config.DiscordCommPlatformIntegration) + err = segmentReporter.ReportBotEnabled(config.DiscordCommPlatformIntegration, 2) require.NoError(t, err) // when - err = segmentReporter.ReportBotEnabled(config.TeamsCommPlatformIntegration) + err = segmentReporter.ReportBotEnabled(config.TeamsCommPlatformIntegration, 1) require.NoError(t, err) // then @@ -141,11 +155,11 @@ func TestSegmentReporter_ReportSinkEnabled(t *testing.T) { segmentReporter, segmentCli := fakeSegmentReporterWithIdentity(identity) // when - err := segmentReporter.ReportSinkEnabled(config.WebhookCommPlatformIntegration) + err := segmentReporter.ReportSinkEnabled(config.WebhookCommPlatformIntegration, 1) require.NoError(t, err) // when - err = segmentReporter.ReportSinkEnabled(config.ElasticsearchCommPlatformIntegration) + err = segmentReporter.ReportSinkEnabled(config.ElasticsearchCommPlatformIntegration, 2) require.NoError(t, err) // then diff --git a/internal/analytics/testdata/TestSegmentReporter_ReportBotEnabled.json b/internal/analytics/testdata/TestSegmentReporter_ReportBotEnabled.json index 815a3d9c52..e194a56780 100644 --- a/internal/analytics/testdata/TestSegmentReporter_ReportBotEnabled.json +++ b/internal/analytics/testdata/TestSegmentReporter_ReportBotEnabled.json @@ -6,6 +6,7 @@ "event": "Integration enabled", "timestamp": "2009-11-17T20:34:58.651387237Z", "properties": { + "index": 1, "platform": "slack", "type": "bot" } @@ -17,6 +18,7 @@ "event": "Integration enabled", "timestamp": "2009-11-17T20:34:58.651387237Z", "properties": { + "index": 2, "platform": "discord", "type": "bot" } @@ -28,6 +30,7 @@ "event": "Integration enabled", "timestamp": "2009-11-17T20:34:58.651387237Z", "properties": { + "index": 1, "platform": "teams", "type": "bot" } diff --git a/internal/analytics/testdata/TestSegmentReporter_ReportCommand.json b/internal/analytics/testdata/TestSegmentReporter_ReportCommand.json index eb6957ea34..846a19db3f 100644 --- a/internal/analytics/testdata/TestSegmentReporter_ReportCommand.json +++ b/internal/analytics/testdata/TestSegmentReporter_ReportCommand.json @@ -21,7 +21,7 @@ "timestamp": "2009-11-17T20:34:58.651387237Z", "properties": { "command": "get", - "filtered": false, + "filtered": true, "origin": "buttonClick", "platform": "slack", "plugin": "botkube/kubectl" diff --git a/internal/analytics/testdata/TestSegmentReporter_ReportSinkEnabled.json b/internal/analytics/testdata/TestSegmentReporter_ReportSinkEnabled.json index 737376b925..1638b911a9 100644 --- a/internal/analytics/testdata/TestSegmentReporter_ReportSinkEnabled.json +++ b/internal/analytics/testdata/TestSegmentReporter_ReportSinkEnabled.json @@ -6,6 +6,7 @@ "event": "Integration enabled", "timestamp": "2009-11-17T20:34:58.651387237Z", "properties": { + "index": 1, "platform": "webhook", "type": "sink" } @@ -17,6 +18,7 @@ "event": "Integration enabled", "timestamp": "2009-11-17T20:34:58.651387237Z", "properties": { + "index": 2, "platform": "elasticsearch", "type": "sink" } diff --git a/pkg/bot/bot.go b/pkg/bot/bot.go index 2a0c2cdadd..6616aa27ee 100644 --- a/pkg/bot/bot.go +++ b/pkg/bot/bot.go @@ -35,7 +35,7 @@ type ExecutorFactory interface { // AnalyticsReporter defines a reporter that collects analytics data. type AnalyticsReporter interface { // ReportBotEnabled reports an enabled bot. - ReportBotEnabled(platform config.CommPlatformIntegration) error + ReportBotEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error } // FatalErrorAnalyticsReporter reports a fatal errors. @@ -64,6 +64,11 @@ type channelConfigByName struct { notify bool } +type CommGroupMetadata struct { + Name string + Index int +} + func AsNotifiers(bots map[string]Bot) []notifier.Bot { notifiers := make([]notifier.Bot, 0, len(bots)) for _, bot := range bots { diff --git a/pkg/bot/discord.go b/pkg/bot/discord.go index 8885640c30..a95633e6a0 100644 --- a/pkg/bot/discord.go +++ b/pkg/bot/discord.go @@ -50,7 +50,7 @@ type Discord struct { channels map[string]channelConfigByID notifyMutex sync.Mutex botMentionRegex *regexp.Regexp - commGroupName string + commGroupMetadata CommGroupMetadata renderer *DiscordRenderer messages chan discordMessage discordMessageWorkers *pool.Pool @@ -65,7 +65,7 @@ type discordMessage struct { } // NewDiscord creates a new Discord instance. -func NewDiscord(log logrus.FieldLogger, commGroupName string, cfg config.Discord, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Discord, error) { +func NewDiscord(log logrus.FieldLogger, commGroupMetadata CommGroupMetadata, cfg config.Discord, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Discord, error) { botMentionRegex, err := discordBotMentionRegex(cfg.BotID) if err != nil { return nil, err @@ -87,7 +87,7 @@ func NewDiscord(log logrus.FieldLogger, commGroupName string, cfg config.Discord executorFactory: executorFactory, api: api, botID: cfg.BotID, - commGroupName: commGroupName, + commGroupMetadata: commGroupMetadata, channels: channelsCfg, botMentionRegex: botMentionRegex, renderer: NewDiscordRenderer(), @@ -128,7 +128,7 @@ func (b *Discord) Start(ctx context.Context) error { return fmt.Errorf("while opening connection: %w", err) } - err = b.reporter.ReportBotEnabled(b.IntegrationName()) + err = b.reporter.ReportBotEnabled(b.IntegrationName(), b.commGroupMetadata.Index) if err != nil { b.setFailureReason(health.FailureReasonConnectionError) return fmt.Errorf("while reporting analytics: %w", err) @@ -251,7 +251,7 @@ func (b *Discord) handleMessage(ctx context.Context, dm discordMessage) error { } e := b.executorFactory.NewDefault(execute.NewDefaultInput{ - CommGroupName: b.commGroupName, + CommGroupName: b.commGroupMetadata.Name, Platform: b.IntegrationName(), NotifierHandler: b, Conversation: execute.Conversation{ diff --git a/pkg/bot/mattermost.go b/pkg/bot/mattermost.go index fbc24dca94..ba5bf5caf4 100644 --- a/pkg/bot/mattermost.go +++ b/pkg/bot/mattermost.go @@ -51,28 +51,28 @@ const ( // Mattermost listens for user's message, execute commands and sends back the response. type Mattermost struct { - log logrus.FieldLogger - executorFactory ExecutorFactory - reporter AnalyticsReporter - serverURL string - botName string - botUserID string - teamName string - webSocketURL string - wsClient *model.WebSocketClient - apiClient *model.Client4 - channelsMutex sync.RWMutex - commGroupName string - channels map[string]channelConfigByID - notifyMutex sync.Mutex - botMentionRegex *regexp.Regexp - renderer *MattermostRenderer - userNamesForID map[string]string - messages chan mattermostMessage - messageWorkers *pool.Pool - shutdownOnce sync.Once - status health.PlatformStatusMsg - failureReason health.FailureReasonMsg + log logrus.FieldLogger + executorFactory ExecutorFactory + reporter AnalyticsReporter + serverURL string + botName string + botUserID string + teamName string + webSocketURL string + wsClient *model.WebSocketClient + apiClient *model.Client4 + channelsMutex sync.RWMutex + commGroupMetadata CommGroupMetadata + channels map[string]channelConfigByID + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp + renderer *MattermostRenderer + userNamesForID map[string]string + messages chan mattermostMessage + messageWorkers *pool.Pool + shutdownOnce sync.Once + status health.PlatformStatusMsg + failureReason health.FailureReasonMsg } // mattermostMessage contains message details to execute command and send back the result @@ -81,7 +81,7 @@ type mattermostMessage struct { } // NewMattermost creates a new Mattermost instance. -func NewMattermost(ctx context.Context, log logrus.FieldLogger, commGroupName string, cfg config.Mattermost, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Mattermost, error) { +func NewMattermost(ctx context.Context, log logrus.FieldLogger, commGroupMetadata CommGroupMetadata, cfg config.Mattermost, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Mattermost, error) { botMentionRegex, err := mattermostBotMentionRegex(cfg.BotName) if err != nil { return nil, err @@ -125,24 +125,24 @@ func NewMattermost(ctx context.Context, log logrus.FieldLogger, commGroupName st } return &Mattermost{ - log: log, - executorFactory: executorFactory, - reporter: reporter, - serverURL: cfg.URL, - botName: cfg.BotName, - botUserID: botUserID, - teamName: team.Name, - apiClient: client, - webSocketURL: webSocketURL, - commGroupName: commGroupName, - channels: channelsByIDCfg, - botMentionRegex: botMentionRegex, - renderer: NewMattermostRenderer(), - userNamesForID: map[string]string{}, - messages: make(chan mattermostMessage, platformMessageChannelSize), - messageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), - status: health.StatusUnknown, - failureReason: "", + log: log, + executorFactory: executorFactory, + reporter: reporter, + serverURL: cfg.URL, + botName: cfg.BotName, + botUserID: botUserID, + teamName: team.Name, + apiClient: client, + webSocketURL: webSocketURL, + commGroupMetadata: commGroupMetadata, + channels: channelsByIDCfg, + botMentionRegex: botMentionRegex, + renderer: NewMattermostRenderer(), + userNamesForID: map[string]string{}, + messages: make(chan mattermostMessage, platformMessageChannelSize), + messageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), + status: health.StatusUnknown, + failureReason: "", }, nil } @@ -169,7 +169,7 @@ func (b *Mattermost) Start(ctx context.Context) error { return fmt.Errorf("while pinging Mattermost server %q: %w", b.serverURL, err) } - err = b.reporter.ReportBotEnabled(b.IntegrationName()) + err = b.reporter.ReportBotEnabled(b.IntegrationName(), b.commGroupMetadata.Index) if err != nil { b.setStatusReason(health.FailureReasonConnectionError) return fmt.Errorf("while reporting analytics: %w", err) @@ -279,7 +279,7 @@ func (b *Mattermost) handleMessage(ctx context.Context, mm mattermostMessage) er } e := b.executorFactory.NewDefault(execute.NewDefaultInput{ - CommGroupName: b.commGroupName, + CommGroupName: b.commGroupMetadata.Name, Platform: b.IntegrationName(), NotifierHandler: b, Conversation: execute.Conversation{ diff --git a/pkg/bot/slack_cloud.go b/pkg/bot/slack_cloud.go index 1a0fa69fb1..49f125da14 100644 --- a/pkg/bot/slack_cloud.go +++ b/pkg/bot/slack_cloud.go @@ -23,6 +23,7 @@ import ( "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/internal/config/remote" "github.com/kubeshop/botkube/internal/health" "github.com/kubeshop/botkube/pkg/api" @@ -49,34 +50,34 @@ var _ Bot = &CloudSlack{} // CloudSlack listens for user's message, execute commands and sends back the response. type CloudSlack struct { - log logrus.FieldLogger - cfg config.CloudSlack - client *slack.Client - executorFactory ExecutorFactory - reporter cloudSlackAnalyticsReporter - commGroupName string - realNamesForID map[string]string - botMentionRegex *regexp.Regexp - botID string - channelsMutex sync.RWMutex - renderer *SlackRenderer - channels map[string]channelConfigByName - notifyMutex sync.Mutex - clusterName string - msgStatusTracker *SlackMessageStatusTracker - status health.PlatformStatusMsg - failuresNo int - failureReason health.FailureReasonMsg + log logrus.FieldLogger + cfg config.CloudSlack + client *slack.Client + executorFactory ExecutorFactory + reporter cloudSlackAnalyticsReporter + commGroupMetadata CommGroupMetadata + realNamesForID map[string]string + botMentionRegex *regexp.Regexp + botID string + channelsMutex sync.RWMutex + renderer *SlackRenderer + channels map[string]channelConfigByName + notifyMutex sync.Mutex + clusterName string + msgStatusTracker *SlackMessageStatusTracker + status health.PlatformStatusMsg + failuresNo int + failureReason health.FailureReasonMsg } // cloudSlackAnalyticsReporter defines a reporter that collects analytics data. type cloudSlackAnalyticsReporter interface { FatalErrorAnalyticsReporter - ReportCommand(platform config.CommPlatformIntegration, pluginName, command string, origin command.Origin, withFilter bool) error + ReportCommand(in analytics.ReportCommand) error } func NewCloudSlack(log logrus.FieldLogger, - commGroupName string, + commGroupMetadata CommGroupMetadata, cfg config.CloudSlack, clusterName string, executorFactory ExecutorFactory, @@ -99,22 +100,22 @@ func NewCloudSlack(log logrus.FieldLogger, } return &CloudSlack{ - log: log, - cfg: cfg, - executorFactory: executorFactory, - reporter: reporter, - commGroupName: commGroupName, - botMentionRegex: botMentionRegex, - renderer: NewSlackRenderer(), - channels: channels, - client: client, - botID: cfg.BotID, - clusterName: clusterName, - realNamesForID: map[string]string{}, - msgStatusTracker: NewSlackMessageStatusTracker(log, client), - status: health.StatusUnknown, - failuresNo: 0, - failureReason: "", + log: log, + cfg: cfg, + executorFactory: executorFactory, + reporter: reporter, + commGroupMetadata: commGroupMetadata, + botMentionRegex: botMentionRegex, + renderer: NewSlackRenderer(), + channels: channels, + client: client, + botID: cfg.BotID, + clusterName: clusterName, + realNamesForID: map[string]string{}, + msgStatusTracker: NewSlackMessageStatusTracker(log, client), + status: health.StatusUnknown, + failuresNo: 0, + failureReason: "", }, nil } @@ -318,7 +319,12 @@ func (b *CloudSlack) handleStreamMessage(ctx context.Context, data *pb.ConnectRe act := callback.ActionCallback.BlockActions[0] if act == nil || strings.HasPrefix(act.ActionID, urlButtonActionIDPrefix) { - reportErr := b.reporter.ReportCommand(b.IntegrationName(), "", act.ActionID, command.ButtonClickOrigin, false) + reportErr := b.reporter.ReportCommand( + analytics.ReportCommand{ + Platform: b.IntegrationName(), + Command: act.ActionID, + Origin: command.ButtonClickOrigin, + }) if reportErr != nil { b.log.Errorf("while reporting URL command, error: %s", reportErr.Error()) } @@ -481,7 +487,7 @@ func (b *CloudSlack) handleMessage(ctx context.Context, event slackMessage) erro channel, exists := b.getChannels()[info.Name] e := b.executorFactory.NewDefault(execute.NewDefaultInput{ - CommGroupName: b.commGroupName, + CommGroupName: b.commGroupMetadata.Name, Platform: b.IntegrationName(), NotifierHandler: b, Conversation: execute.Conversation{ diff --git a/pkg/bot/slack_legacy.go b/pkg/bot/slack_legacy.go index c433ba8008..26c6b68aa7 100644 --- a/pkg/bot/slack_legacy.go +++ b/pkg/bot/slack_legacy.go @@ -50,7 +50,7 @@ type Slack struct { channels map[string]channelConfigByName notifyMutex sync.Mutex botMentionRegex *regexp.Regexp - commGroupName string + commGroupMetadata CommGroupMetadata renderer *SlackRenderer messages chan slackLegacyMessage slackMessageWorkers *pool.Pool @@ -68,7 +68,7 @@ type slackLegacyMessage struct { } // NewSlack creates a new Slack instance. -func NewSlack(log logrus.FieldLogger, commGroupName string, cfg config.Slack, executorFactory ExecutorFactory, reporter FatalErrorAnalyticsReporter) (*Slack, error) { +func NewSlack(log logrus.FieldLogger, commGroupMetadata CommGroupMetadata, cfg config.Slack, executorFactory ExecutorFactory, reporter FatalErrorAnalyticsReporter) (*Slack, error) { client := slack.New(cfg.Token) authResp, err := client.AuthTest() @@ -94,7 +94,7 @@ func NewSlack(log logrus.FieldLogger, commGroupName string, cfg config.Slack, ex botID: botID, client: client, channels: channels, - commGroupName: commGroupName, + commGroupMetadata: commGroupMetadata, botMentionRegex: botMentionRegex, renderer: NewSlackRenderer(), messages: make(chan slackLegacyMessage, platformMessageChannelSize), @@ -145,7 +145,7 @@ func (b *Slack) Start(ctx context.Context) error { switch ev := msg.Data.(type) { case *slack.ConnectedEvent: - err := b.reporter.ReportBotEnabled(b.IntegrationName()) + err := b.reporter.ReportBotEnabled(b.IntegrationName(), b.commGroupMetadata.Index) if err != nil { b.setFailureReason(health.FailureReasonConnectionError) return fmt.Errorf("while reporting analytics: %w", err) @@ -260,7 +260,7 @@ func (b *Slack) handleMessage(ctx context.Context, msg slackLegacyMessage) error channel, exists := b.getChannels()[info.Name] e := b.executorFactory.NewDefault(execute.NewDefaultInput{ - CommGroupName: b.commGroupName, + CommGroupName: b.commGroupMetadata.Name, Platform: b.IntegrationName(), NotifierHandler: b, Conversation: execute.Conversation{ diff --git a/pkg/bot/slack_socket.go b/pkg/bot/slack_socket.go index 9633ba0bb6..abcfedc95e 100644 --- a/pkg/bot/slack_socket.go +++ b/pkg/bot/slack_socket.go @@ -35,34 +35,34 @@ var _ Bot = &SocketSlack{} // SocketSlack listens for user's message, execute commands and sends back the response. type SocketSlack struct { - log logrus.FieldLogger - executorFactory ExecutorFactory - reporter socketSlackAnalyticsReporter - botID string - client *slack.Client - channelsMutex sync.RWMutex - channels map[string]channelConfigByName - notifyMutex sync.Mutex - botMentionRegex *regexp.Regexp - commGroupName string - renderer *SlackRenderer - realNamesForID map[string]string - msgStatusTracker *SlackMessageStatusTracker - messages chan slackMessage - messageWorkers *pool.Pool - shutdownOnce sync.Once - status health.PlatformStatusMsg - failureReason health.FailureReasonMsg + log logrus.FieldLogger + executorFactory ExecutorFactory + reporter socketSlackAnalyticsReporter + botID string + client *slack.Client + channelsMutex sync.RWMutex + channels map[string]channelConfigByName + notifyMutex sync.Mutex + botMentionRegex *regexp.Regexp + commGroupMetadata CommGroupMetadata + renderer *SlackRenderer + realNamesForID map[string]string + msgStatusTracker *SlackMessageStatusTracker + messages chan slackMessage + messageWorkers *pool.Pool + shutdownOnce sync.Once + status health.PlatformStatusMsg + failureReason health.FailureReasonMsg } // socketSlackAnalyticsReporter defines a reporter that collects analytics data. type socketSlackAnalyticsReporter interface { FatalErrorAnalyticsReporter - ReportCommand(platform config.CommPlatformIntegration, pluginName, command string, origin command.Origin, withFilter bool) error + ReportCommand(in analytics.ReportCommand) error } // NewSocketSlack creates a new SocketSlack instance. -func NewSocketSlack(log logrus.FieldLogger, commGroupName string, cfg config.SocketSlack, executorFactory ExecutorFactory, reporter socketSlackAnalyticsReporter) (*SocketSlack, error) { +func NewSocketSlack(log logrus.FieldLogger, commGroupMetadata CommGroupMetadata, cfg config.SocketSlack, executorFactory ExecutorFactory, reporter socketSlackAnalyticsReporter) (*SocketSlack, error) { client := slack.New(cfg.BotToken, slack.OptionAppLevelToken(cfg.AppToken)) authResp, err := client.AuthTest() @@ -82,21 +82,21 @@ func NewSocketSlack(log logrus.FieldLogger, commGroupName string, cfg config.Soc } return &SocketSlack{ - log: log, - executorFactory: executorFactory, - reporter: reporter, - botID: botID, - client: client, - channels: channels, - commGroupName: commGroupName, - renderer: NewSlackRenderer(), - botMentionRegex: botMentionRegex, - realNamesForID: map[string]string{}, - msgStatusTracker: NewSlackMessageStatusTracker(log, client), - messages: make(chan slackMessage, platformMessageChannelSize), - messageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), - status: health.StatusUnknown, - failureReason: "", + log: log, + executorFactory: executorFactory, + reporter: reporter, + botID: botID, + client: client, + channels: channels, + commGroupMetadata: commGroupMetadata, + renderer: NewSlackRenderer(), + botMentionRegex: botMentionRegex, + realNamesForID: map[string]string{}, + msgStatusTracker: NewSlackMessageStatusTracker(log, client), + messages: make(chan slackMessage, platformMessageChannelSize), + messageWorkers: pool.New().WithMaxGoroutines(platformMessageWorkersCount), + status: health.StatusUnknown, + failureReason: "", }, nil } @@ -131,7 +131,7 @@ func (b *SocketSlack) Start(ctx context.Context) error { case socketmode.EventTypeConnecting: b.log.Info("Botkube is connecting to Slack...") case socketmode.EventTypeConnected: - if err := b.reporter.ReportBotEnabled(b.IntegrationName()); err != nil { + if err := b.reporter.ReportBotEnabled(b.IntegrationName(), b.commGroupMetadata.Index); err != nil { b.setFailureReason(health.FailureReasonConnectionError) return fmt.Errorf("report analytics error: %w", err) } @@ -183,7 +183,7 @@ func (b *SocketSlack) Start(ctx context.Context) error { act := callback.ActionCallback.BlockActions[0] if act == nil || strings.HasPrefix(act.ActionID, urlButtonActionIDPrefix) { - reportErr := b.reporter.ReportCommand(b.IntegrationName(), "", act.ActionID, command.ButtonClickOrigin, false) + reportErr := b.reporter.ReportCommand(analytics.ReportCommand{Platform: b.IntegrationName(), Command: act.ActionID, Origin: command.ButtonClickOrigin}) if reportErr != nil { b.log.Errorf("while reporting URL command, error: %s", reportErr.Error()) } @@ -366,7 +366,7 @@ func (b *SocketSlack) handleMessage(ctx context.Context, event slackMessage) err channel, exists := b.getChannels()[info.Name] e := b.executorFactory.NewDefault(execute.NewDefaultInput{ - CommGroupName: b.commGroupName, + CommGroupName: b.commGroupMetadata.Name, Platform: b.IntegrationName(), NotifierHandler: b, Conversation: execute.Conversation{ diff --git a/pkg/bot/teams.go b/pkg/bot/teams.go index ee913ebc27..ddbd1681eb 100644 --- a/pkg/bot/teams.go +++ b/pkg/bot/teams.go @@ -69,7 +69,7 @@ type Teams struct { //channels map[string][ChannelBindingsByName] bindings config.BotBindings conversationsMutex sync.RWMutex - commGroupName string + commGroupMetadata CommGroupMetadata conversations map[string]conversation notifyMutex sync.Mutex botMentionRegex *regexp.Regexp @@ -91,7 +91,7 @@ type consentContext struct { } // NewTeams creates a new Teams instance. -func NewTeams(log logrus.FieldLogger, commGroupName string, cfg config.Teams, clusterName string, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Teams, error) { +func NewTeams(log logrus.FieldLogger, commGroupMetadata CommGroupMetadata, cfg config.Teams, clusterName string, executorFactory ExecutorFactory, reporter AnalyticsReporter) (*Teams, error) { botMentionRegex, err := teamsBotMentionRegex(cfg.BotName) if err != nil { return nil, err @@ -107,22 +107,22 @@ func NewTeams(log logrus.FieldLogger, commGroupName string, cfg config.Teams, cl } return &Teams{ - log: log, - executorFactory: executorFactory, - reporter: reporter, - botName: cfg.BotName, - ClusterName: clusterName, - AppID: cfg.AppID, - AppPassword: cfg.AppPassword, - bindings: cfg.Bindings, - commGroupName: commGroupName, - MessagePath: msgPath, - Port: port, - renderer: NewTeamsRenderer(), - conversations: make(map[string]conversation), - botMentionRegex: botMentionRegex, - status: health.StatusUnknown, - failureReason: "", + log: log, + executorFactory: executorFactory, + reporter: reporter, + botName: cfg.BotName, + ClusterName: clusterName, + AppID: cfg.AppID, + AppPassword: cfg.AppPassword, + bindings: cfg.Bindings, + commGroupMetadata: commGroupMetadata, + MessagePath: msgPath, + Port: port, + renderer: NewTeamsRenderer(), + conversations: make(map[string]conversation), + botMentionRegex: botMentionRegex, + status: health.StatusUnknown, + failureReason: "", }, nil } @@ -145,7 +145,7 @@ func (b *Teams) Start(ctx context.Context) error { router := mux.NewRouter() router.PathPrefix(b.MessagePath).HandlerFunc(b.processActivity) - err = b.reporter.ReportBotEnabled(b.IntegrationName()) + err = b.reporter.ReportBotEnabled(b.IntegrationName(), b.commGroupMetadata.Index) if err != nil { b.setFailureReason(health.FailureReasonConnectionError) return fmt.Errorf("while reporting analytics: %w", err) @@ -289,7 +289,7 @@ func (b *Teams) processMessage(ctx context.Context, activity schema.Activity) (i } e := b.executorFactory.NewDefault(execute.NewDefaultInput{ - CommGroupName: b.commGroupName, + CommGroupName: b.commGroupMetadata.Name, Platform: b.IntegrationName(), NotifierHandler: newTeamsNotifMgrForActivity(b, ref), Conversation: execute.Conversation{ diff --git a/pkg/execute/executor.go b/pkg/execute/executor.go index dafbe7adea..f01950193f 100644 --- a/pkg/execute/executor.go +++ b/pkg/execute/executor.go @@ -10,6 +10,7 @@ import ( "github.com/sirupsen/logrus" + "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/internal/audit" "github.com/kubeshop/botkube/internal/plugin" remoteapi "github.com/kubeshop/botkube/internal/remote" @@ -259,7 +260,13 @@ func removeMultipleSpaces(s string) string { } func (e *DefaultExecutor) reportCommand(ctx context.Context, pluginName, cmd string, withFilter bool, cmdCtx CommandContext) { - if err := e.analyticsReporter.ReportCommand(e.platform, pluginName, cmd, e.conversation.CommandOrigin, withFilter); err != nil { + if err := e.analyticsReporter.ReportCommand(analytics.ReportCommand{ + Platform: e.platform, + PluginName: pluginName, + Command: cmd, + Origin: e.conversation.CommandOrigin, + WithFilter: withFilter, + }); err != nil { e.log.Errorf("while reporting %s command: %s", cmd, err.Error()) } if err := e.reportAuditEvent(ctx, pluginName, cmdCtx); err != nil { diff --git a/pkg/execute/factory.go b/pkg/execute/factory.go index b49275a1fb..f4d389282c 100644 --- a/pkg/execute/factory.go +++ b/pkg/execute/factory.go @@ -7,6 +7,7 @@ import ( "github.com/slack-go/slack" "k8s.io/client-go/rest" + "github.com/kubeshop/botkube/internal/analytics" "github.com/kubeshop/botkube/internal/audit" guard "github.com/kubeshop/botkube/internal/command" "github.com/kubeshop/botkube/internal/plugin" @@ -58,7 +59,7 @@ type Executor interface { // AnalyticsReporter defines a reporter that collects analytics data. type AnalyticsReporter interface { // ReportCommand reports a new executed command. The command should be anonymized before using this method. - ReportCommand(platform config.CommPlatformIntegration, pluginName, command string, origin command.Origin, withFilter bool) error + ReportCommand(in analytics.ReportCommand) error } // CommandGuard is an interface that allows to check if a given command is allowed to be executed. diff --git a/pkg/sink/elasticsearch.go b/pkg/sink/elasticsearch.go index a83f3ce2e3..a3edc9bacd 100644 --- a/pkg/sink/elasticsearch.go +++ b/pkg/sink/elasticsearch.go @@ -55,7 +55,7 @@ type Elasticsearch struct { } // NewElasticsearch creates a new Elasticsearch instance. -func NewElasticsearch(log logrus.FieldLogger, c config.Elasticsearch, reporter AnalyticsReporter) (*Elasticsearch, error) { +func NewElasticsearch(log logrus.FieldLogger, commGroupIdx int, c config.Elasticsearch, reporter AnalyticsReporter) (*Elasticsearch, error) { var elsClient *elastic.Client var err error @@ -138,7 +138,7 @@ func NewElasticsearch(log logrus.FieldLogger, c config.Elasticsearch, reporter A failureReason: "", } - err = reporter.ReportSinkEnabled(esNotifier.IntegrationName()) + err = reporter.ReportSinkEnabled(esNotifier.IntegrationName(), commGroupIdx) if err != nil { return nil, fmt.Errorf("while reporting analytics: %w", err) } diff --git a/pkg/sink/types.go b/pkg/sink/types.go index a5efd3a32f..e7d9ae31b5 100644 --- a/pkg/sink/types.go +++ b/pkg/sink/types.go @@ -13,5 +13,5 @@ type Sink interface { // AnalyticsReporter defines a reporter that collects analytics data for sinks. type AnalyticsReporter interface { // ReportSinkEnabled reports an enabled sink. - ReportSinkEnabled(platform config.CommPlatformIntegration) error + ReportSinkEnabled(platform config.CommPlatformIntegration, commGroupIdx int) error } diff --git a/pkg/sink/webhook.go b/pkg/sink/webhook.go index 3464213363..b56a8e856d 100644 --- a/pkg/sink/webhook.go +++ b/pkg/sink/webhook.go @@ -37,7 +37,7 @@ type WebhookPayload struct { } // NewWebhook creates a new Webhook instance. -func NewWebhook(log logrus.FieldLogger, c config.Webhook, reporter AnalyticsReporter) (*Webhook, error) { +func NewWebhook(log logrus.FieldLogger, commGroupIdx int, c config.Webhook, reporter AnalyticsReporter) (*Webhook, error) { whNotifier := &Webhook{ log: log, reporter: reporter, @@ -47,7 +47,7 @@ func NewWebhook(log logrus.FieldLogger, c config.Webhook, reporter AnalyticsRepo failureReason: "", } - err := reporter.ReportSinkEnabled(whNotifier.IntegrationName()) + err := reporter.ReportSinkEnabled(whNotifier.IntegrationName(), commGroupIdx) if err != nil { return nil, fmt.Errorf("while reporting analytics: %w", err) }