Skip to content

Commit

Permalink
Fix audit log reporting for source events and commands
Browse files Browse the repository at this point in the history
  • Loading branch information
pkosiec committed Mar 24, 2023
1 parent 2698787 commit 65b24ec
Show file tree
Hide file tree
Showing 24 changed files with 251 additions and 128 deletions.
6 changes: 6 additions & 0 deletions cmd/botkube/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
Expand Down Expand Up @@ -503,6 +504,11 @@ func getK8sClients(cfg *rest.Config) (discovery.DiscoveryInterface, error) {

func reportFatalErrFn(logger logrus.FieldLogger, reporter analytics.Reporter, status status.StatusReporter) func(ctx string, err error) error {
return func(ctx string, err error) error {
if errors.Is(err, context.Canceled) {
logger.Debugf("Context was cancelled. Skipping reporting error...")
return nil
}

// use separate ctx as parent ctx might be cancelled already
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
Expand Down
8 changes: 8 additions & 0 deletions internal/analytics/segment_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func (r *SegmentReporter) ReportHandledEventSuccess(event ReportEvent) error {
// ReportHandledEventError reports a failure while handling event using a given communication platform.
// The RegisterCurrentIdentity needs to be called first.
func (r *SegmentReporter) ReportHandledEventError(event ReportEvent, err error) error {
if err == nil {
return nil
}

return r.reportEvent("Event handled", map[string]interface{}{
"platform": event.Platform,
"type": event.IntegrationType,
Expand All @@ -123,6 +127,10 @@ func (r *SegmentReporter) ReportHandledEventError(event ReportEvent, err error)
// ReportFatalError reports a fatal app error.
// It doesn't need a registered identity.
func (r *SegmentReporter) ReportFatalError(err error) error {
if err == nil {
return nil
}

properties := map[string]interface{}{
"error": err.Error(),
}
Expand Down
7 changes: 5 additions & 2 deletions internal/audit/gql_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,11 @@ func (r *GraphQLAuditReporter) ReportSourceAuditEvent(ctx context.Context, e Sou
DeploymentID: r.gql.DeploymentID(),
Type: remoteapi.AuditEventTypeSourceEventEmitted,
SourceEventEmitted: &remoteapi.AuditEventSourceCreateInput{
Event: e.Event,
Bindings: e.Bindings,
Event: e.Event,
Source: remoteapi.AuditEventSourceDetailsInput{
Name: e.Source.Name,
DisplayName: e.Source.DisplayName,
},
},
},
}
Expand Down
9 changes: 7 additions & 2 deletions internal/audit/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type ExecutorAuditEvent struct {
CreatedAt string
PluginName string
PlatformUser string
BotPlatform remoteapi.BotPlatform
BotPlatform *remoteapi.BotPlatform
Command string
Channel string
}
Expand All @@ -29,7 +29,12 @@ type SourceAuditEvent struct {
CreatedAt string
PluginName string
Event string
Bindings []string
Source SourceDetails
}

type SourceDetails struct {
Name string
DisplayName string
}

// GetReporter creates new AuditReporter
Expand Down
5 changes: 0 additions & 5 deletions internal/config/remote/gql_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,6 @@ func NewGqlClient(options ...Option) *Gql {
opt(c)
}

// skip client creation when not requested
if c.endpoint == "" {
return nil
}

httpCli := &http.Client{
Transport: newAPIKeySecuredTransport(c.apiKey),
Timeout: defaultTimeout,
Expand Down
33 changes: 20 additions & 13 deletions internal/remote/api.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package remote

import (
"fmt"
"strings"
)

Expand All @@ -17,16 +16,21 @@ type AuditEventCreateInput struct {

// AuditEventCommandCreateInput contains create input specific to executor events
type AuditEventCommandCreateInput struct {
PlatformUser string `json:"platformUser"`
Channel string `json:"channel"`
BotPlatform BotPlatform `json:"botPlatform"`
Command string `json:"command"`
PlatformUser string `json:"platformUser"`
Channel string `json:"channel"`
BotPlatform *BotPlatform `json:"botPlatform"`
Command string `json:"command"`
}

// AuditEventSourceCreateInput contains create input specific to source events
type AuditEventSourceCreateInput struct {
Event string `json:"event"`
Bindings []string `json:"bindings"`
Event string `json:"event"`
Source AuditEventSourceDetailsInput `json:"source"`
}

type AuditEventSourceDetailsInput struct {
Name string `json:"name"`
DisplayName string `json:"displayName"`
}

// BotPlatform are the supported bot platforms
Expand All @@ -44,23 +48,26 @@ const (
)

// NewBotPlatform creates new BotPlatform from string
func NewBotPlatform(s string) (BotPlatform, error) {
func NewBotPlatform(s string) *BotPlatform {
var platform BotPlatform
switch strings.ToUpper(s) {
case "SLACK":
fallthrough
case "SOCKETSLACK":
return BotPlatformSlack, nil
platform = BotPlatformSlack
case "DISCORD":
return BotPlatformDiscord, nil
platform = BotPlatformDiscord
case "MATTERMOST":
return BotPlatformMattermost, nil
platform = BotPlatformMattermost
case "TEAMS":
fallthrough
case "MS_TEAMS":
return BotPlatformMsTeams, nil
platform = BotPlatformMsTeams
default:
return "", fmt.Errorf("given BotPlatform %s is not supported", s)
return nil
}

return &platform
}

// AuditEventType is the type of audit events
Expand Down
39 changes: 22 additions & 17 deletions internal/source/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package source

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -147,7 +148,7 @@ func (d *Dispatcher) dispatchMsg(ctx context.Context, event source.Event, dispat
}
err := n.SendMessage(ctx, msg, sources)
if err != nil {
reportErr := d.reportError(ctx, err, n, pluginName, event, dispatch.sourceName)
reportErr := d.reportError(err, n, pluginName, event)
if reportErr != nil {
err = multierror.Append(err, fmt.Errorf("while reporting error: %w", reportErr))
}
Expand All @@ -156,7 +157,7 @@ func (d *Dispatcher) dispatchMsg(ctx context.Context, event source.Event, dispat
return
}

reportErr := d.reportSuccess(ctx, n, pluginName, event, dispatch.sourceName)
reportErr := d.reportSuccess(n, pluginName, event)
if reportErr != nil {
d.log.Error(err)
}
Expand All @@ -168,7 +169,7 @@ func (d *Dispatcher) dispatchMsg(ctx context.Context, event source.Event, dispat
defer analytics.ReportPanicIfOccurs(d.log, d.reporter)
err := n.SendEvent(ctx, event.RawObject, sources)
if err != nil {
reportErr := d.reportError(ctx, err, n, pluginName, event, dispatch.sourceName)
reportErr := d.reportError(err, n, pluginName, event)
if reportErr != nil {
err = multierror.Append(err, fmt.Errorf("while reporting error: %w", reportErr))
}
Expand All @@ -177,13 +178,17 @@ func (d *Dispatcher) dispatchMsg(ctx context.Context, event source.Event, dispat
return
}

reportErr := d.reportSuccess(ctx, n, pluginName, event, dispatch.sourceName)
reportErr := d.reportSuccess(n, pluginName, event)
if reportErr != nil {
d.log.Error(err)
}
}(n)
}

if err := d.reportAuditEvent(ctx, pluginName, event.RawObject, dispatch.sourceName, dispatch.sourceDisplayName); err != nil {
d.log.Errorf("while reporting audit event for source %q: %s", dispatch.sourceName, err.Error())
}

// execute actions
actions, err := d.actionProvider.RenderedActions(event.RawObject, sources)
if err != nil {
Expand Down Expand Up @@ -219,12 +224,20 @@ func (d *Dispatcher) dispatch(ctx context.Context, event []byte, dispatch Plugin
}, dispatch)
}

func (d *Dispatcher) reportAudit(ctx context.Context, pluginName, event, source string) error {
func (d *Dispatcher) reportAuditEvent(ctx context.Context, pluginName string, event any, sourceName, sourceDisplayName string) error {
eventBytes, err := json.Marshal(event)
if err != nil {
return fmt.Errorf("while marshaling audit event: %w", err)
}

e := audit.SourceAuditEvent{
CreatedAt: time.Now().Format(time.RFC3339),
PluginName: pluginName,
Event: event,
Bindings: []string{source},
Event: string(eventBytes),
Source: audit.SourceDetails{
Name: sourceName,
DisplayName: sourceDisplayName,
},
}
return d.auditReporter.ReportSourceAuditEvent(ctx, e)
}
Expand All @@ -234,7 +247,7 @@ type genericNotifier interface {
Type() config.IntegrationType
}

func (d *Dispatcher) reportSuccess(ctx context.Context, n genericNotifier, pluginName string, event source.Event, sourceName string) error {
func (d *Dispatcher) reportSuccess(n genericNotifier, pluginName string, event source.Event) error {
errs := multierror.New()
reportErr := d.reporter.ReportHandledEventSuccess(analytics.ReportEvent{
IntegrationType: n.Type(),
Expand All @@ -245,14 +258,10 @@ func (d *Dispatcher) reportSuccess(ctx context.Context, n genericNotifier, plugi
if reportErr != nil {
errs = multierror.Append(errs, fmt.Errorf("while reporting %s analytics: %w", n.Type(), reportErr))
}
if err := d.reportAudit(ctx, pluginName, fmt.Sprintf("%v", event.RawObject), sourceName); err != nil {
errs = multierror.Append(errs, fmt.Errorf("while reporting %s audit event: %w", n.Type(), reportErr))
}

return errs.ErrorOrNil()
}

func (d *Dispatcher) reportError(ctx context.Context, err error, n genericNotifier, pluginName string, event source.Event, sourceName string) error {
func (d *Dispatcher) reportError(err error, n genericNotifier, pluginName string, event source.Event) error {
errs := multierror.New()
reportErr := d.reporter.ReportHandledEventError(analytics.ReportEvent{
IntegrationType: n.Type(),
Expand All @@ -263,10 +272,6 @@ func (d *Dispatcher) reportError(ctx context.Context, err error, n genericNotifi
if reportErr != nil {
errs = multierror.Append(errs, fmt.Errorf("while reporting %s analytics: %w", n.Type(), reportErr))
}
// TODO: add additional metadata about event failed to send
if err := d.reportAudit(ctx, pluginName, fmt.Sprintf("%v", event.RawObject), sourceName); err != nil {
errs = multierror.Append(errs, fmt.Errorf("while reporting %s audit event: %w", n.Type(), reportErr))
}

return errs.ErrorOrNil()
}
9 changes: 8 additions & 1 deletion internal/source/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type PluginDispatch struct {
pluginName string
pluginConfigs []*source.Config
sourceName string
sourceDisplayName string
isInteractivitySupported bool
cfg *config.Config
}
Expand Down Expand Up @@ -144,7 +145,12 @@ func (d *Scheduler) schedulePlugin(ctx context.Context, isInteractivitySupported
d.startProcesses[key] = struct{}{}

sourcePluginConfigs := map[string][]*source.Config{}
plugins := d.cfg.Sources[sourceName].Plugins
srcConfig, exists := d.cfg.Sources[sourceName]
if !exists {
return fmt.Errorf("source %q not found", sourceName)
}

plugins := srcConfig.Plugins
for pluginName, pluginCfg := range plugins {
if !pluginCfg.Enabled {
continue
Expand All @@ -168,6 +174,7 @@ func (d *Scheduler) schedulePlugin(ctx context.Context, isInteractivitySupported
pluginConfigs: configs,
isInteractivitySupported: isInteractivitySupported,
sourceName: sourceName,
sourceDisplayName: srcConfig.DisplayName,
cfg: d.cfg,
})
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions pkg/action/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

const (
// unknownValue defines an unknown string value.
unknownValue = "unknown"
unknownValue = "n/a"
)

// ExecutorFactory facilitates creation of execute.Executor instances.
Expand Down Expand Up @@ -80,6 +80,7 @@ func (p *Provider) RenderedActions(e any, sourceBindings []string) ([]event.Acti

// ExecuteAction executes action for given event.
func (p *Provider) ExecuteAction(ctx context.Context, action event.Action) interactive.CoreMessage {
userName := fmt.Sprintf("Automation %q", action.DisplayName)
e := p.executorFactory.NewDefault(execute.NewDefaultInput{
Conversation: execute.Conversation{
IsAuthenticated: true,
Expand All @@ -92,7 +93,10 @@ func (p *Provider) ExecuteAction(ctx context.Context, action event.Action) inter
Platform: unknownValue,
NotifierHandler: &universalNotifierHandler{},
Message: strings.TrimSpace(strings.TrimPrefix(action.Command, api.MessageBotNamePlaceholder)),
User: fmt.Sprintf("Automation %q", action.DisplayName),
User: execute.UserInput{
Mention: userName,
DisplayName: userName,
},
})
response := e.Execute(ctx)

Expand Down
14 changes: 9 additions & 5 deletions pkg/action/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,29 @@ func TestProvider_RenderedActionsForEvent(t *testing.T) {
func TestProvider_ExecuteEventAction(t *testing.T) {
// given
botName := "my-bot"
userName := `Automation "Test"`
executorBindings := []string{"executor-binding1", "executor-binding2"}
eventAction := event.Action{
Command: "kubectl get po foo",
ExecutorBindings: executorBindings,
DisplayName: "Test",
}
expectedExecutorInput := execute.NewDefaultInput{
CommGroupName: "unknown",
Platform: "unknown",
CommGroupName: "n/a",
Platform: "n/a",
NotifierHandler: nil, // won't check it
Conversation: execute.Conversation{
Alias: "unknown",
ID: "unknown",
Alias: "n/a",
ID: "n/a",
ExecutorBindings: executorBindings,
IsAuthenticated: true,
CommandOrigin: command.AutomationOrigin,
},
Message: "kubectl get po foo",
User: `Automation "Test"`,
User: execute.UserInput{
Mention: userName,
DisplayName: userName,
},
}

execFactory := &fakeFactory{t: t, expectedInput: expectedExecutorInput}
Expand Down
1 change: 1 addition & 0 deletions pkg/bot/bot.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type channelConfigByID struct {

alias string
notify bool
name string
}

type channelConfigByName struct {
Expand Down
Loading

0 comments on commit 65b24ec

Please sign in to comment.