Skip to content

Commit

Permalink
Set conv ref when "notifier start" command is executed
Browse files Browse the repository at this point in the history
Signed-off-by: Prasad Ghangal <prasad.ghangal@gmail.com>
  • Loading branch information
PrasadG193 committed Apr 13, 2020
1 parent f901912 commit f1de11a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 75 deletions.
112 changes: 44 additions & 68 deletions pkg/bot/teams.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,44 +51,36 @@ var _ Bot = (*Teams)(nil)

// Teams contains credentials to start Teams backend server
type Teams struct {
AppID string
AppPassword string
MessagePath string
Port string
AllowKubectl bool
RestrictAccess bool
ClusterName string
NotifType config.NotifType
Adapter core.Adapter
ProcessedConsents chan processedConsent
CleanupDone chan bool
AppID string
AppPassword string
MessagePath string
Port string
AllowKubectl bool
RestrictAccess bool
ClusterName string
NotifType config.NotifType
Adapter core.Adapter

ConversationRef *schema.ConversationReference
}

type processedConsent struct {
ID string
conversationRef schema.ConversationReference
}

type ConsentContext struct {
Command string
}

// NewTeamsBot returns Teams instance
func NewTeamsBot(c *config.Config) *Teams {
logging.Logger.Infof("Config:: %+v", c.Communications.Teams)
// Set notifier off by default
config.Notify = false
return &Teams{
AppID: c.Communications.Teams.AppID,
AppPassword: c.Communications.Teams.AppPassword,
NotifType: c.Communications.Teams.NotifType,
MessagePath: defaultMsgPath,
Port: defaultPort,
AllowKubectl: c.Settings.AllowKubectl,
RestrictAccess: c.Settings.RestrictAccess,
ClusterName: c.Settings.ClusterName,
ProcessedConsents: make(chan processedConsent, consentBufferSize),
CleanupDone: make(chan bool),
AppID: c.Communications.Teams.AppID,
AppPassword: c.Communications.Teams.AppPassword,
NotifType: c.Communications.Teams.NotifType,
MessagePath: defaultMsgPath,
Port: defaultPort,
AllowKubectl: c.Settings.AllowKubectl,
RestrictAccess: c.Settings.RestrictAccess,
ClusterName: c.Settings.ClusterName,
}
}

Expand All @@ -105,24 +97,15 @@ func (t *Teams) Start() {
return
}
// Start consent cleanup
go t.cleanupConsents()
http.HandleFunc(t.MessagePath, t.processActivity)
logging.Logger.Infof("Started MS Teams server on port %s", defaultPort)
logging.Logger.Errorf("Error in MS Teams server. %v", http.ListenAndServe(fmt.Sprintf(":%s", t.Port), nil))
t.CleanupDone <- true
}

func (t *Teams) cleanupConsents() {
for {
select {
case consent := <-t.ProcessedConsents:
fmt.Printf("Deleting activity %s\n", consent.ID)
if err := t.Adapter.DeleteActivity(context.Background(), consent.ID, consent.conversationRef); err != nil {
logging.Logger.Errorf("Failed to delete activity. %s", err.Error())
}
case <-t.CleanupDone:
return
}
func (t *Teams) deleteConsent(ID string, convRef schema.ConversationReference) {
logging.Logger.Debugf("Deleting activity %s\n", ID)
if err := t.Adapter.DeleteActivity(context.Background(), ID, convRef); err != nil {
logging.Logger.Errorf("Failed to delete activity. %s", err.Error())
}
}

Expand All @@ -137,8 +120,6 @@ func (t *Teams) processActivity(w http.ResponseWriter, req *http.Request) {

err = t.Adapter.ProcessActivity(ctx, activity, coreActivity.HandlerFuncs{
OnMessageFunc: func(turn *coreActivity.TurnContext) (schema.Activity, error) {
//actjson, _ := json.MarshalIndent(turn.Activity, "", " ")
//logging.Logger.Debugf("Received activity: %s", actjson)
resp := t.processMessage(turn.Activity)
if len(resp) >= maxMessageSize {
if turn.Activity.Conversation.ConversationType == convTypePersonal {
Expand Down Expand Up @@ -166,7 +147,7 @@ func (t *Teams) processActivity(w http.ResponseWriter, req *http.Request) {
// handle invoke events
// https://developer.microsoft.com/en-us/microsoft-teams/blogs/working-with-files-in-your-microsoft-teams-bot/
OnInvokeFunc: func(turn *coreActivity.TurnContext) (schema.Activity, error) {
t.pushProcessedConsent(turn.Activity.ReplyToID, coreActivity.GetCoversationReference(turn.Activity))
t.deleteConsent(turn.Activity.ReplyToID, coreActivity.GetCoversationReference(turn.Activity))
if err != nil {
return schema.Activity{}, fmt.Errorf("failed to read file: %s", err.Error())
}
Expand Down Expand Up @@ -204,8 +185,8 @@ func (t *Teams) processActivity(w http.ResponseWriter, req *http.Request) {
e := execute.NewDefaultExecutor(msg, t.AllowKubectl, t.RestrictAccess, t.ClusterName, true)
out := e.Execute()

aj, _ := json.MarshalIndent(turn.Activity, "", " ")
fmt.Printf("Incoming Activity:: \n%s\n", aj)
actJSON, _ := json.MarshalIndent(turn.Activity, "", " ")
logging.Logger.Debugf("Incoming MSTeams Activity: %s", actJSON)

// upload file
err = t.putRequest(uploadInfo.UploadURL, []byte(out))
Expand All @@ -225,7 +206,6 @@ func (t *Teams) processActivity(w http.ResponseWriter, req *http.Request) {
},
},
}

return turn.SendActivity(coreActivity.MsgOptionAttachments(fileAttach))
},
})
Expand All @@ -238,33 +218,29 @@ func (t *Teams) processMessage(activity schema.Activity) string {
// Trim @BotKube prefix
msg := strings.TrimSpace(strings.TrimPrefix(strings.TrimSpace(activity.Text), "<at>BotKube</at>"))

// Parse "set default channel" command and set conversation reference
if msg == channelSetCmd {
ref := coreActivity.GetCoversationReference(activity)
t.ConversationRef = &ref
// Remove messageID from the ChannelID
if ID, ok := activity.ChannelData["teamsChannelId"]; ok {
t.ConversationRef.ChannelID = ID.(string)
t.ConversationRef.Conversation.ID = ID.(string)
// User needs to execute "notifier start" cmd to enable notifications
// Parse "notifier" command and set conversation reference
args := strings.Fields(msg)
if activity.Conversation.ConversationType != convTypePersonal && len(args) > 0 && execute.ValidNotifierCommand[args[0]] {
if len(args) < 2 {
return execute.IncompleteCmdMsg
}
if execute.Start.String() == args[1] {
config.Notify = true
ref := coreActivity.GetCoversationReference(activity)
t.ConversationRef = &ref
// Remove messageID from the ChannelID
if ID, ok := activity.ChannelData["teamsChannelId"]; ok {
t.ConversationRef.ChannelID = ID.(string)
t.ConversationRef.Conversation.ID = ID.(string)
}
return fmt.Sprintf(execute.NotifierStartMsg, t.ClusterName)
}
return "Okay. I'll send notifications to this channel"
}

// Multicluster is not supported for Teams
e := execute.NewDefaultExecutor(msg, t.AllowKubectl, t.RestrictAccess, t.ClusterName, true)
out := e.Execute()
return fmt.Sprintf("```%s```", out)
}

func (t *Teams) pushProcessedConsent(ID string, ref schema.ConversationReference) {
select {
case t.ProcessedConsents <- processedConsent{ID: ID, conversationRef: ref}:
break
default:
// Remove older ID if buffer is full
<-t.ProcessedConsents
t.ProcessedConsents <- processedConsent{ID: ID, conversationRef: ref}
}
return fmt.Sprintf("```\n%s\n```", e.Execute())
}

func (t *Teams) putRequest(u string, data []byte) error {
Expand Down
14 changes: 7 additions & 7 deletions pkg/execute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var validKubectlCommands = map[string]bool{
"auth": true,
}

var validNotifierCommand = map[string]bool{
var ValidNotifierCommand = map[string]bool{
"notifier": true,
}

Expand All @@ -50,10 +50,10 @@ var validFilterCommand = map[string]bool{
var kubectlBinary = "/usr/local/bin/kubectl"

const (
notifierStartMsg = "Brace yourselves, notifications are coming from cluster '%s'."
NotifierStartMsg = "Brace yourselves, notifications are coming from cluster '%s'."
notifierStopMsg = "Sure! I won't send you notifications from cluster '%s' anymore."
unsupportedCmdMsg = "Command not supported. Please run /botkubehelp to see supported commands."
incompleteCmdMsg = "You missed to pass options for the command. Please run /botkubehelp to see command options."
IncompleteCmdMsg = "You missed to pass options for the command. Please run /botkubehelp to see command options."
kubectlDisabledMsg = "Sorry, the admin hasn't given me the permission to execute kubectl command on cluster '%s'."
filterNameMissing = "You forgot to pass filter name. Please pass one of the following valid filters:\n\n%s"
filterEnabled = "I have enabled '%s' filter on '%s' cluster."
Expand Down Expand Up @@ -153,7 +153,7 @@ func (e *DefaultExecutor) Execute() string {
}
return runKubectlCommand(args, e.ClusterName, e.IsAuthChannel)
}
if validNotifierCommand[args[0]] {
if ValidNotifierCommand[args[0]] {
return runNotifierCommand(args, e.ClusterName, e.IsAuthChannel)
}
if validPingCommand[args[0]] {
Expand Down Expand Up @@ -245,14 +245,14 @@ func runNotifierCommand(args []string, clusterName string, isAuthChannel bool) s
return ""
}
if len(args) < 2 {
return incompleteCmdMsg
return IncompleteCmdMsg
}

switch args[1] {
case Start.String():
config.Notify = true
log.Logger.Info("Notifier enabled")
return fmt.Sprintf(notifierStartMsg, clusterName)
return fmt.Sprintf(NotifierStartMsg, clusterName)
case Stop.String():
config.Notify = false
log.Logger.Info("Notifier disabled")
Expand All @@ -279,7 +279,7 @@ func runFilterCommand(args []string, clusterName string, isAuthChannel bool) str
return ""
}
if len(args) < 2 {
return incompleteCmdMsg
return IncompleteCmdMsg
}

switch args[1] {
Expand Down

0 comments on commit f1de11a

Please sign in to comment.