Skip to content

Commit

Permalink
[ES] indices to be created with daywise timestamp (#430)
Browse files Browse the repository at this point in the history
##### ISSUE TYPE
<!--- Pick one below and delete the rest: -->
 - Feature Pull Request

##### SUMMARY
<!--- Describe the change, including rationale and design decisions -->
Elasticsearch Indices will be created with timestamp suffix
<!---
If you are fixing an existing issue, please include "Fixes #nnn" in your
PR comment; and describe briefly what the change does.
-->

<!--- Please list dependencies added with your change also -->
  • Loading branch information
kartik-moolya authored Nov 9, 2020
1 parent cb8ed93 commit 5ad31ed
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions pkg/notify/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package notify
import (
"context"
"fmt"
"time"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
Expand All @@ -37,7 +38,7 @@ import (

const (
// indexSuffixFormat is the date format that would be appended to the index name
indexSuffixFormat = "02-01-2006"
indexSuffixFormat = "2006-01-02" // YYYY-MM-DD
// awsService for the AWS client to authenticate against
awsService = "es"
)
Expand Down Expand Up @@ -68,11 +69,17 @@ func NewElasticSearch(c config.ElasticSearch) (Notifier, error) {

signer := v4.NewSigner(creds)
awsClient, err := aws_signing_client.New(signer, nil, awsService, c.AWSSigning.AWSRegion)

if err != nil {
return nil, err
}
elsClient, err = elastic.NewClient(elastic.SetURL(c.Server), elastic.SetScheme("https"), elastic.SetHttpClient(awsClient), elastic.SetSniff(false), elastic.SetHealthcheck(false), elastic.SetGzip(false))
elsClient, err = elastic.NewClient(
elastic.SetURL(c.Server),
elastic.SetScheme("https"),
elastic.SetHttpClient(awsClient),
elastic.SetSniff(false),
elastic.SetHealthcheck(false),
elastic.SetGzip(false),
)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -111,8 +118,10 @@ type index struct {
}

func (e *ElasticSearch) flushIndex(ctx context.Context, event interface{}) error {
// Construct the ELS Index Name with timestamp suffix
indexName := e.Index + "-" + time.Now().Format(indexSuffixFormat)
// Create index if not exists
exists, err := e.ELSClient.IndexExists(e.Index).Do(ctx)
exists, err := e.ELSClient.IndexExists(indexName).Do(ctx)
if err != nil {
log.Error(fmt.Sprintf("Failed to get index. Error:%s", err.Error()))
return err
Expand All @@ -127,25 +136,25 @@ func (e *ElasticSearch) flushIndex(ctx context.Context, event interface{}) error
},
},
}
_, err := e.ELSClient.CreateIndex(e.Index).BodyJson(mapping).Do(ctx)
_, err := e.ELSClient.CreateIndex(indexName).BodyJson(mapping).Do(ctx)
if err != nil {
log.Error(fmt.Sprintf("Failed to create index. Error:%s", err.Error()))
return err
}
}

// Send event to els
_, err = e.ELSClient.Index().Index(e.Index).Type(e.Type).BodyJson(event).Do(ctx)
_, err = e.ELSClient.Index().Index(indexName).Type(e.Type).BodyJson(event).Do(ctx)
if err != nil {
log.Error(fmt.Sprintf("Failed to post data to els. Error:%s", err.Error()))
return err
}
_, err = e.ELSClient.Flush().Index(e.Index).Do(ctx)
_, err = e.ELSClient.Flush().Index(indexName).Do(ctx)
if err != nil {
log.Error(fmt.Sprintf("Failed to flush data to els. Error:%s", err.Error()))
return err
}
log.Debugf("Event successfully sent to ElasticSearch index %s", e.Index)
log.Debugf("Event successfully sent to ElasticSearch index %s", indexName)
return nil
}

Expand All @@ -158,7 +167,6 @@ func (e *ElasticSearch) SendEvent(event events.Event) (err error) {
if err := e.flushIndex(ctx, event); err != nil {
return err
}

return nil
}

Expand Down

0 comments on commit 5ad31ed

Please sign in to comment.