Skip to content

Commit

Permalink
Add support for elasticsearch interface
Browse files Browse the repository at this point in the history
  • Loading branch information
PrasadG193 committed Mar 24, 2019
1 parent 81553b8 commit 371ad86
Show file tree
Hide file tree
Showing 277 changed files with 50,446 additions and 33 deletions.
26 changes: 26 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,7 @@
[prune]
go-tests = true
unused-packages = true

[[constraint]]
name = "github.com/olivere/elastic"
version = "6.0.0"
6 changes: 4 additions & 2 deletions cmd/botkube/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ func main() {
}
log.Logger.Info(fmt.Sprintf("Configuration:: %+v\n", Config))

sb := slack.NewSlackBot()
go sb.Start()
if Config.Communications.Slack.Enable {
sb := slack.NewSlackBot()
go sb.Start()
}

controller.RegisterInformers(Config)
}
15 changes: 15 additions & 0 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,25 @@ recommendations: true

# Channels configuration
communications:
# Settings for Slack
slack:
enable: false
channel: 'SLACK_CHANNEL'
token: 'SLACK_API_TOKEN'

# Settings for ELS
elasticsearch:
enable: false
server: 'ELASTICSEARCH_ADDRESS' # e.g https://example.com:9243
username: 'ELASTICSEARCH_USERNAME'
password: 'ELASTICSEARCH_PASSWORD'
# ELS index settings
index:
name: botkube
type: botkube-event
shards: 1
replicas: 0

# Setting to support multiple clusters
settings:
# Cluster name to differentiate incoming messages
Expand Down
20 changes: 18 additions & 2 deletions deploy-all-in-one.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ metadata:
app: botkube
data:
config.yaml: |
## Resources you want to watch
resources:
- name: pods # Name of the resources e.g pods, deployments, ingresses, etc. (Resource name must be in plural form)
namespaces: # List of namespaces, "all" will watch all the namespaces
Expand Down Expand Up @@ -124,9 +125,24 @@ data:
# Channels configuration
communications:
# Settings for Slack
slack:
channel: <SLACK_CHANNEL>
token: <SLACK_API_TOKEN>
enable: false
channel: 'SLACK_CHANNEL'
token: 'SLACK_API_TOKEN'
# Settings for ELS
elasticsearch:
enable: false
server: 'ELASTICSEARCH_ADDRESS' # e.g https://example.com:9243
username: 'ELASTICSEARCH_USERNAME'
password: 'ELASTICSEARCH_PASSWORD'
# ELS index settings
index:
name: botkube
type: botkube-event
shards: 1
replicas: 0
# Setting to support multiple clusters
settings:
Expand Down
15 changes: 15 additions & 0 deletions helm/botkube/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,25 @@ config:

# Channels configuration
communications:
# Settings for Slack
slack:
enable: false
channel: 'SLACK_CHANNEL'
token: 'SLACK_API_TOKEN'

# Settings for ELS
elasticsearch:
enable: false
server: 'ELASTICSEARCH_ADDRESS' # e.g https://example.com:9243
username: 'ELASTICSEARCH_USERNAME'
password: 'ELASTICSEARCH_PASSWORD'
# ELS index settings
index:
name: botkube
type: botkube-event
shards: 1
replicas: 0

# Setting to support multiple clusters
settings:
# Cluster name to differentiate incoming messages
Expand Down
20 changes: 20 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,34 @@ type Resource struct {
// Communications channels to send events to
type Communications struct {
Slack Slack

ElasticSearch ElasticSearch
}

// Slack configuration to authentication and send notifications
type Slack struct {
Enable bool
Channel string
Token string
}

// ElasticSearch config auth settings
type ElasticSearch struct {
Enable bool
Username string
Password string
Server string
Index Index
}

// Index settings for ELS
type Index struct {
Name string
Type string
Shards int
Replicas int
}

// Settings for multicluster support
type Settings struct {
ClusterName string
Expand Down
43 changes: 26 additions & 17 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func findNamespace(ns string) string {

// RegisterInformers creates new informer controllers to watch k8s resources
func RegisterInformers(c *config.Config) {
sendMessage(fmt.Sprintf(controllerStartMsg, c.Settings.ClusterName))
sendMessage(c, fmt.Sprintf(controllerStartMsg, c.Settings.ClusterName))
startTime = time.Now().Local()

// Get resync period
Expand Down Expand Up @@ -75,7 +75,7 @@ func RegisterInformers(c *config.Config) {
watchlist,
object,
time.Duration(rsyncTime)*time.Minute,
registerEventHandlers(r.Name, r.Events),
registerEventHandlers(c, r.Name, r.Events),
)
stopCh := make(chan struct{})
defer close(stopCh)
Expand Down Expand Up @@ -113,7 +113,7 @@ func RegisterInformers(c *config.Config) {
if (utils.AllowedEventKindsMap[utils.EventKind{kind, "all"}] ||
utils.AllowedEventKindsMap[utils.EventKind{kind, ns}]) && (utils.AllowedEventTypesMap[eType]) {
log.Logger.Infof("Processing add to events: %s. Invoked Object: %s:%s", key, eventObj.InvolvedObject.Kind, eventObj.InvolvedObject.Namespace)
sendEvent(obj, "events", "create", err)
sendEvent(obj, c, "events", "create", err)
}
},
},
Expand All @@ -128,39 +128,39 @@ func RegisterInformers(c *config.Config) {
signal.Notify(sigterm, syscall.SIGTERM)
signal.Notify(sigterm, syscall.SIGINT)
<-sigterm
sendMessage(fmt.Sprintf(controllerStopMsg, c.Settings.ClusterName))
sendMessage(c, fmt.Sprintf(controllerStopMsg, c.Settings.ClusterName))
}

func registerEventHandlers(resourceType string, events []string) (handlerFns cache.ResourceEventHandlerFuncs) {
func registerEventHandlers(c *config.Config, resourceType string, events []string) (handlerFns cache.ResourceEventHandlerFuncs) {
for _, event := range events {
if event == "all" || event == "create" {
handlerFns.AddFunc = func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
log.Logger.Debugf("Processing add to %v: %s", resourceType, key)
sendEvent(obj, resourceType, "create", err)
sendEvent(obj, c, resourceType, "create", err)
}
}

if event == "all" || event == "update" {
handlerFns.UpdateFunc = func(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
log.Logger.Debugf("Processing update to %v: %s", resourceType, key)
sendEvent(new, resourceType, "update", err)
sendEvent(new, c, resourceType, "update", err)
}
}

if event == "all" || event == "delete" {
handlerFns.DeleteFunc = func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
log.Logger.Debugf("Processing delete to %v: %s", resourceType, key)
sendEvent(obj, resourceType, "delete", err)
sendEvent(obj, c, resourceType, "delete", err)
}
}
}
return handlerFns
}

func sendEvent(obj interface{}, kind, eventType string, err error) {
func sendEvent(obj interface{}, c *config.Config, kind, eventType string, err error) {
if err != nil {
log.Logger.Error("Error while receiving event: ", err.Error())
return
Expand All @@ -178,7 +178,7 @@ func sendEvent(obj interface{}, kind, eventType string, err error) {
// Skip older events
if eventType == "delete" {
objectMeta := utils.GetObjectMetaData(obj)
if objectMeta.DeletionTimestamp.Sub(startTime).Seconds() <= 0 {
if objectMeta.DeletionTimestamp != nil && objectMeta.DeletionTimestamp.Sub(startTime).Seconds() <= 0 {
log.Logger.Debug("Skipping older events")
return
}
Expand All @@ -199,17 +199,26 @@ func sendEvent(obj interface{}, kind, eventType string, err error) {
return
}

// Send notification to communication chennel
notifier := notify.NewSlack()
notifier.SendEvent(event)
var notifier notify.Notifier
// Send notification to communication channel
if c.Communications.Slack.Enable {
notifier = notify.NewSlack(c)
go notifier.SendEvent(event)
}

if c.Communications.ElasticSearch.Enable {
notifier = notify.NewElasticSearch(c)
go notifier.SendEvent(event)
}
}

func sendMessage(msg string) {
func sendMessage(c *config.Config, msg string) {
if len(msg) <= 0 {
log.Logger.Warn("sendMessage received string with length 0. Hence skipping.")
return
}

notifier := notify.NewSlack()
notifier.SendMessage(msg)
if c.Communications.Slack.Enable {
notifier := notify.NewSlack(c)
go notifier.SendMessage(msg)
}
}
4 changes: 3 additions & 1 deletion pkg/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,9 @@ func New(object interface{}, eventType string, kind string) Event {
}

if eventType == "delete" {
event.TimeStamp = objectMeta.DeletionTimestamp.Time
if objectMeta.DeletionTimestamp != nil {
event.TimeStamp = objectMeta.DeletionTimestamp.Time
}
}

if kind != "events" {
Expand Down
2 changes: 1 addition & 1 deletion pkg/filterengine/filters/image_tag_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ func (f *ImageTagChecker) Run(object interface{}, event *events.Event) {
event.Recommendations = append(event.Recommendations, ":latest tag used in image '"+c.Image+"' of Container '"+c.Name+"' should be avoided.\n")
}
}
log.Logger.Info("Image tag filter successful!")
log.Logger.Debug("Image tag filter successful!")
}
Loading

0 comments on commit 371ad86

Please sign in to comment.