Skip to content

Commit

Permalink
variable for ES Index naming
Browse files Browse the repository at this point in the history
  • Loading branch information
kmoolya committed Nov 9, 2020
1 parent f14200e commit 9012170
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions pkg/notify/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (

const (
// indexSuffixFormat is the date format that would be appended to the index name
indexSuffixFormat = "02-01-2006"
indexSuffixFormat = "2006-01-02" // YYYY-MM-DD
// awsService for the AWS client to authenticate against
awsService = "es"
)
Expand Down Expand Up @@ -69,11 +69,17 @@ func NewElasticSearch(c config.ElasticSearch) (Notifier, error) {

signer := v4.NewSigner(creds)
awsClient, err := aws_signing_client.New(signer, nil, awsService, c.AWSSigning.AWSRegion)

if err != nil {
return nil, err
}
elsClient, err = elastic.NewClient(elastic.SetURL(c.Server), elastic.SetScheme("https"), elastic.SetHttpClient(awsClient), elastic.SetSniff(false), elastic.SetHealthcheck(false), elastic.SetGzip(false))
elsClient, err = elastic.NewClient(
elastic.SetURL(c.Server),
elastic.SetScheme("https"),
elastic.SetHttpClient(awsClient),
elastic.SetSniff(false),
elastic.SetHealthcheck(false),
elastic.SetGzip(false),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -112,8 +118,10 @@ type index struct {
}

func (e *ElasticSearch) flushIndex(ctx context.Context, event interface{}) error {
// Construct elasticsearch Index with timestamp
indexName := e.Index + "-" + time.Now().Format(indexSuffixFormat)
// Create index if not exists
exists, err := e.ELSClient.IndexExists(e.Index + "-" + time.Now().Format(indexSuffixFormat)).Do(ctx)
exists, err := e.ELSClient.IndexExists(indexName).Do(ctx)
if err != nil {
log.Error(fmt.Sprintf("Failed to get index. Error:%s", err.Error()))
return err
Expand All @@ -128,20 +136,20 @@ func (e *ElasticSearch) flushIndex(ctx context.Context, event interface{}) error
},
},
}
_, err := e.ELSClient.CreateIndex(e.Index + "-" + time.Now().Format(indexSuffixFormat)).BodyJson(mapping).Do(ctx)
_, err := e.ELSClient.CreateIndex(indexName).BodyJson(mapping).Do(ctx)
if err != nil {
log.Error(fmt.Sprintf("Failed to create index. Error:%s", err.Error()))
return err
}
}

// Send event to els
_, err = e.ELSClient.Index().Index(e.Index + "-" + time.Now().Format(indexSuffixFormat)).Type(e.Type).BodyJson(event).Do(ctx)
_, err = e.ELSClient.Index().Index(indexName).Type(e.Type).BodyJson(event).Do(ctx)
if err != nil {
log.Error(fmt.Sprintf("Failed to post data to els. Error:%s", err.Error()))
return err
}
_, err = e.ELSClient.Flush().Index(e.Index + "-" + time.Now().Format(indexSuffixFormat)).Do(ctx)
_, err = e.ELSClient.Flush().Index(indexName).Do(ctx)
if err != nil {
log.Error(fmt.Sprintf("Failed to flush data to els. Error:%s", err.Error()))
return err
Expand Down

0 comments on commit 9012170

Please sign in to comment.