From 422013314a3f1e332f6349df46767afbf511631a Mon Sep 17 00:00:00 2001 From: Dilan Bellinghoven Date: Wed, 5 Dec 2018 17:55:27 -0500 Subject: [PATCH] chore: Added more commenting to exported assets and formatting slack attachments better --- README.md | 40 +++++++------ command/alert/alert.go | 7 ++- command/alert/email/email.go | 6 +- command/alert/file/file.go | 4 ++ command/alert/slack/attachment.go | 45 ++++++++++---- command/alert/slack/slack.go | 29 +++++---- command/command.go | 8 +-- command/controller.go | 14 ++++- command/handlers.go | 17 +++++- command/query/job.go | 97 +++++++++++++++++++++++++------ command/query/job_test.go | 18 +++--- command/query/transform.go | 11 +++- config/client.go | 46 ++++++++++++--- config/client_test.go | 18 +++--- config/parse.go | 93 +++++++++++++++++++++-------- config/parse_test.go | 12 ++-- main.go | 2 +- utils/lock/lock.go | 20 +++++++ 18 files changed, 358 insertions(+), 129 deletions(-) diff --git a/README.md b/README.md index 66fd479..6b5d4cd 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,8 @@ # go-elasticsearch-alerts -Elasticsearch Alerting Daemon +[![Build Status](https://ci.morningconsultintelligence.com/api/v1/teams/oss/pipelines/go-elasticsearch-alerts/jobs/build-release/badge)](https://ci.morningconsultintelligence.com/teams/oss/pipelines/docker-credential-vault-login) + +A daemon for generating alerts on Elasticsearch data in real-time. ## Installation @@ -30,13 +32,13 @@ The binary will be output to `bin` in the local directory. # Setup -This application requires several configuration files: a [main configuration file](#main-configuration-file) and one or more [rule configuration files](#rule-configuration-files). The main configuration file is used to configure general behavior of the application. The rule files are used to specify what queries are executed, when they are executed, and where the results shall be sent. +This application requires several configuration files: a [main configuration file](#main-configuration-file) and one or more [rule configuration files](#rule-configuration-files). The main configuration file is used to configure general behavior of the application. The rule files are used to define your alerts (e.g. what queries are executed, when they are executed, where the results shall be sent, etc.). ## Main Configuration File The main configuration file is used to specify: -* Information pertaining to your ElasticSearch instance; -* How the application will interact with your ElasticSearch instance; +* Information pertaining to your Elasticsearch instance; +* How the application will interact with your Elasticsearch instance; * Whether it is to be run in a distributed fashion; and * If distributed, how the application will communicate with your Consul instance (used for synchronization). @@ -70,46 +72,46 @@ This example shows a sample main configuration file. } } ``` -* `elasticsearch` ([ElasticSearch](#elasticsearch-parameters): ``) - Configures the ElasticSearch client and specifies server parameters. See the [ElasticSearch](#elasticsearch-parameters) section for more details. This field is required. -* `distributed` (bool: `false`) - Whether this application should will be distributed across multiple processes. If this is set to `true`, the `consul` field is also required since this application uses the [Consul lock](https://www.consul.io/docs/commands/lock.html) for synchronization. This field is optional. +* `elasticsearch` ([Elasticsearch](#elasticsearch-parameters): ``) - Configures the Elasticsearch client and specifies server parameters. See the [Elasticsearch](#elasticsearch-parameters) section for more details. This field is required. +* `distributed` (bool: `false`) - Whether this application should will be distributed across multiple processes. If this is set to `true`, the `consul` field is also required since this application uses the [Consul lock](https://www.consul.io/docs/commands/lock.html) for synchronization between nodes. This field is optional. * `consul` ([Consul](#consul-parameters): ``) - Configures the Consul client if this application is distributed. This field is only required when `distributed` is set to `true`. ### `elasticsearch` parameters -* `server` ([Server](#server-parameters): ``) - Specifies ElasticSearch server information. See the [Server](#server-parameters) section for more information. This field is always required. +* `server` ([Server](#server-parameters): ``) - Specifies Elasticsearch server information. See the [Server](#server-parameters) section for more information. This field is always required. * `client` ([Client](#client-parameters): ``) - Configures the HTTP client with which the process will communicate with Elasticsearch. See the [Client](#client-parameters) section for more informiation. This field is always required. ### `consul` parameters Note: All values should be strings. For example, even if the value is technically a Boolean value such as `true`, you should provide a string (e.g. `"true"`) * `consul_lock_key` (string: `""`) - The name of the key to be assigned to the Consul lock. This field is always required. -* `consul_http_address` (string: `""`) - The URL of your Consul server. This field is always required. +* `consul_http_addr` (string: `""`) - The URL of your Consul server. This field is always required. * `consul_http_token` (string: `""`) - The API access token required when access control lists (ACLs) are enabled. This field is optional.**\*** * `consul_http_ssl` (string: `"false"`) - A boolean value (default is false) that enables the HTTPS URI scheme and SSL connections to the HTTP API. This field is optional.**\*** * `consul_http_ssl_verify` (string: `""`) - A boolean value (default true) to specify SSL certificate verification; setting this value to false is not recommended for production use. This field is optional.**\*** * `consul_cacert` (string: `""`) - Path to a CA file to use for TLS when communicating with Consul. This field is optional.**\*** * `consul_capath` (string: `""`) - Path to a directory of CA certificates to use for TLS when communicating with Consul. This field is optional.**\*** -* `consul_client_cert` (string: `""`) - Path to a client cert file to use for TLS when verify_incoming is enabled. This field is optional.**\*** -* `consul_client_key` (string: `""`) - Path to a client key file to use for TLS when verify_incoming is enabled. This field is optional.**\*** +* `consul_client_cert` (string: `""`) - Path to a client cert file to use for TLS when `verify_incoming` is enabled. This field is optional.**\*** +* `consul_client_key` (string: `""`) - Path to a client key file to use for TLS when `verify_incoming` is enabled. This field is optional.**\*** * `consul_tls_server_name` (string: `""`) - The server name to use as the SNI host when connecting via TLS. This field is optional.**\*** **\*** This field can be specified using its corresponding [environment variable](https://www.consul.io/docs/commands/index.html#environment-variables) instead. The environment variable takes precedence. ### `server` parameters -* `url` (string: `""`) - The URL of your ElasticSearch instance. This field is always required. +* `url` (string: `""`) - The URL of your Elasticsearch instance. This field is always required. ### `client` parameters -* `tls_enabled` (bool: `false`) - Whether the application should use TLS when communicating with your ElasticSearch instance. This field is optional. -* `ca_cert` (string: `""`) - Path to a PEM-encoded CA certificate file on the local disk. This file is used to verify the ElasticSearch server's SSL certificate. -* `client_cert` (string: `""`) - Path to a PEM-encoded client certificate on the local disk. This file is used for TLS communication with the ElasticSearch server. +* `tls_enabled` (bool: `false`) - Whether the application should use TLS when communicating with your Elasticsearch instance. This field is optional. +* `ca_cert` (string: `""`) - Path to a PEM-encoded CA certificate file on the local disk. This file is used to verify the Elasticsearch server's SSL certificate. +* `client_cert` (string: `""`) - Path to a PEM-encoded client certificate on the local disk. This file is used for TLS communication with the Elasticsearch server. * `client_key` (string: `""`) - Path to an unencrypted, PEM-encoded private key on disk which corresponds to the matching client certificate. * `server_name` (string: `""`) - Name to use as the SNI host when connecting via TLS. ### Rule Configuration Files -The rule configuration files are used to configure what ElasticSearch queries will be run, how often they will be run, how the data will be transformed, and how the transformed data will be output. These files should be JSON format. The application will look for the rule files at `/etc/go-elasticsearch-alerts/rules` by default, but if you wish to keep them elsewhere you can specify this directory using the `GO_ELASTICSEARCH_ALERTS_RULES_DIR` environment variable. +The rule configuration files are used to configure what Elasticsearch queries will be run, how often they will be run, how the data will be transformed, and how the transformed data will be output. These files should be JSON format. The application will look for the rule files at `/etc/go-elasticsearch-alerts/rules` by default, but if you wish to keep them elsewhere you can specify this directory using the `GO_ELASTICSEARCH_ALERTS_RULES_DIR` environment variable. ### Example @@ -165,7 +167,7 @@ The rule configuration files are used to configure what ElasticSearch queries wi } ``` -In the example above, the application would execute the following query (illustrated by the `cURL` request below) to ElasticSearch every ten minutes, group by `aggregations.service_name.buckets` and `aggregations.service_name.buckets.program.buckets`, and write the results to Slack and local disk. +In the example above, the application would execute the following query (illustrated by the `cURL` request below) to Elasticsearch every ten minutes, group by `aggregations.service_name.buckets` and `aggregations.service_name.buckets.program.buckets`, and write the results to Slack and local disk. ```shell $ curl http:///filebeat-*/_search \ @@ -199,14 +201,14 @@ $ curl http:///filebeat-*/_search \ * `name` (string: `""`) - The name of the rule (e.g. "Filebeat Errors"). This field is required. * `index` (string: `""`) - The index to be queried. This field is required. * `schedule` (string: `""`) - The schedule of when the query will be executed in [cron syntax](https://en.wikipedia.org/wiki/Cron). This application uses [this cron scheduler](https://godoc.org/github.com/robfig/cron#hdr-CRON_Expression_Format) so please refer to it for more information on the exact syntax of the cron schedule. -* `body` (JSON object: ``) - The body of the [search query](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html) request. This should be exactly what you would include in an ElasticSearch `_search` request to the index specified above. This value will dictate the layout of the data that your ElasticSearch instance sends to this application; therefore, the subsequent `filters` section is dictated by this section. It is recommended that you manually run this query and understand the structure of the response data before writing the `filters` section. +* `body` (JSON object: ``) - The body of the [search query](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-body.html) request. This should be exactly what you would include in an Elasticsearch `_search` request to the index specified above. This value will dictate the layout of the data that your Elasticsearch instance sends to this application; therefore, the subsequent `filters` section is dictated by this section. It is recommended that you manually run this query and understand the structure of the response data before writing the `filters` section. * `filters` (\[\]string: `[]`) - How the response to this query should be grouped. More information on this field is provided in the [filters](#filters) section. This field is optional. If no filters are provided, only elements of the `hits.hits._source` field of the response will be recorded. -* `body_field` (string: `"hits.hits._source"`) - The field of the JSON response to collected and sent to outputs. If not specified, the application will group by the field `hits.hits._source` by default. +* `body_field` (string: `"hits.hits._source"`) - The field on which to group the response. The grouped results will be sent to the specified outputs. This field is optional. If not specified, the application will group by the field `hits.hits._source` by default. * `outputs` (\[\][Output](#outputs-parameter): `[]`) - Specifies the outputs to which the results of the query should be written. See the [Output](#output-parameter) section for more details. At least one output must be specified. ### Filters -The application will group the response to the ElasticSearch query by each element of the `filters` field and include each result of the filters as a separate record. For example, given the [rule file above](#example) let's assume that ElasticSearch returns the following in response to the query: +The application will group the response to the Elasticsearch query by each element of the `filters` field and include each result of the filters as a separate record. For example, given the [rule file above](#example) let's assume that Elasticsearch returns the following in response to the query: ```json { "hits": { diff --git a/command/alert/alert.go b/command/alert/alert.go index 244f915..5439eca 100644 --- a/command/alert/alert.go +++ b/command/alert/alert.go @@ -28,9 +28,10 @@ type Field struct { } type Record struct { - Title string `json:"filter,omitempty"` - Text string `json:"text,omitempty"` - Fields []*Field `json:"fields,omitempty"` + Title string `json:"filter,omitempty"` + Text string `json:"text,omitempty"` + BodyField bool `json:"-"` + Fields []*Field `json:"fields,omitempty"` } type Alert struct { diff --git a/command/alert/email/email.go b/command/alert/email/email.go index e5a96ef..26bed67 100644 --- a/command/alert/email/email.go +++ b/command/alert/email/email.go @@ -42,6 +42,10 @@ type EmailAlertMethodConfig struct { } func NewEmailAlertMethod(config *EmailAlertMethodConfig) (*EmailAlertMethod, error) { + if config == nil { + config = &EmailAlertMethodConfig{} + } + errors := []string{} if config.Host == "" { errors = append(errors, "no SMTP host provided") @@ -120,7 +124,7 @@ func (e *EmailAlertMethod) buildMessage(rule string, records []*alert.Record) (s } tpl := `Content-Type: text/html -Subject: Go ElasticSearch Alerts: {{ .Name }} +Subject: Go Elasticsearch Alerts: {{ .Name }} diff --git a/command/alert/file/file.go b/command/alert/file/file.go index 7170324..ab053ca 100644 --- a/command/alert/file/file.go +++ b/command/alert/file/file.go @@ -44,6 +44,10 @@ type FileAlertMethod struct { } func NewFileAlertMethod(config *FileAlertMethodConfig) (*FileAlertMethod, error) { + if config == nil { + config = &FileAlertMethodConfig{} + } + if config.OutputFilepath == "" { return nil, errors.New("no file path provided") } diff --git a/command/alert/slack/attachment.go b/command/alert/slack/attachment.go index 7c3b2e7..9d73d2f 100644 --- a/command/alert/slack/attachment.go +++ b/command/alert/slack/attachment.go @@ -13,10 +13,13 @@ package slack +import "time" + const ( - defaultAttachmentColor string = "#36a64f" - defaultAttachmentShort bool = true - defaultAttachmentFooter string = "#data" + defaultAttachmentColor = "#36a64f" + defaultAttachmentShort = true + defaultAttachmentFooter = "Go Elasticsearch Alerts" + defaultAttachmentFooterIcon = "https://www.elastic.co/static/images/elastic-logo-200.png" ) type Field struct { @@ -32,19 +35,27 @@ type AttachmentConfig struct { Pretext string Fields []*Field Text string + AuthorName string + AuthorLink string Footer string + FooterIcon string + Timestamp int64 MarkdownIn []string } type Attachment struct { - Fallback string `json:"fallback"` - Color string `json:"color,omitempty"` - Title string `json:"title,omitempty"` - Pretext string `json:"pretext,omitempty"` - Fields []*Field `json:"fields,omitempty"` - Text string `json:"text,omitempty"` - Footer string `json:"footer,omitempty"` - MarkdownIn []string `json:"mrkdwn_in,omitempty"` + Fallback string `json:"fallback"` + Color string `json:"color,omitempty"` + Title string `json:"title,omitempty"` + Pretext string `json:"pretext,omitempty"` + Fields []*Field `json:"fields,omitempty"` + Text string `json:"text,omitempty"` + AuthorName string `json:"author_name,omitempty"` + AuthorLink string `json:"author_link,omitempty"` + Footer string `json:"footer,omitempty"` + FooterIcon string `json:"footer_icon,omitempty"` + Timestamp int64 `json:"ts,omitempty"` + MarkdownIn []string `json:"mrkdwn_in,omitempty"` } func NewAttachment(config *AttachmentConfig) *Attachment { @@ -56,6 +67,14 @@ func NewAttachment(config *AttachmentConfig) *Attachment { config.Footer = defaultAttachmentFooter } + if config.FooterIcon == "" { + config.FooterIcon = defaultAttachmentFooterIcon + } + + if config.Timestamp == 0 { + config.Timestamp = time.Now().Unix() + } + return &Attachment{ Fallback: config.Fallback, Color: config.Color, @@ -63,7 +82,11 @@ func NewAttachment(config *AttachmentConfig) *Attachment { Pretext: config.Pretext, Fields: config.Fields, Text: config.Text, + AuthorName: config.AuthorName, + AuthorLink: config.AuthorLink, Footer: config.Footer, + FooterIcon: config.FooterIcon, + Timestamp: config.Timestamp, MarkdownIn: config.MarkdownIn, } } diff --git a/command/alert/slack/slack.go b/command/alert/slack/slack.go index 91e3cc1..a6a3c16 100644 --- a/command/alert/slack/slack.go +++ b/command/alert/slack/slack.go @@ -18,13 +18,14 @@ import ( "context" "fmt" "net/http" + // "time" "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/vault/helper/jsonutil" "github.com/morningconsult/go-elasticsearch-alerts/command/alert" ) -const defaultTextLimit = 50 +const defaultTextLimit = 6000 // Ensure SlackAlertMethod adheres to the alert.AlertMethod interface var _ alert.AlertMethod = (*SlackAlertMethod)(nil) @@ -58,6 +59,10 @@ type Payload struct { } func NewSlackAlertMethod(config *SlackAlertMethodConfig) (*SlackAlertMethod, error) { + if config == nil { + config = &SlackAlertMethodConfig{} + } + if config.WebhookURL == "" { return nil, fmt.Errorf("field 'output.config.webhook' must not be empty when using the Slack output method") } @@ -99,12 +104,12 @@ func (s *SlackAlertMethod) BuildPayload(rule string, records []*alert.Record) *P for _, record := range records { config := &AttachmentConfig{ - Fallback: rule, - Pretext: record.Title, - Text: record.Text, + Title: rule, + Text: record.Title, + MarkdownIn: []string{"text"}, } - if config.Text != "" { - config.Text = "```\n"+config.Text+"\n```" + if record.BodyField { + config.Text = config.Text+"\n```\n"+record.Text+"\n```" config.Color = "#ff0000" config.MarkdownIn = []string{"text"} } @@ -150,6 +155,8 @@ func (s *SlackAlertMethod) post(ctx context.Context, payload *Payload) error { return err } +// preprocess breaks attachments with text greater than s.textLimit +// into multiple attachments in order to prevent trucation func (s *SlackAlertMethod) preprocess(records []*alert.Record) []*alert.Record { var output []*alert.Record for _, record := range records { @@ -162,15 +169,17 @@ func (s *SlackAlertMethod) preprocess(records []*alert.Record) []*alert.Record { for i = 0; i < n; i++ { chopped := fmt.Sprintf("(part %d of %d)\n\n%s\n\n(continued)", i+1, n+1, record.Text[s.textLimit*i:s.textLimit*(i+1)]) record := &alert.Record{ - Title: fmt.Sprintf("%s (%d of %d)", record.Title, i+1, n+1), - Text: chopped, + Title: fmt.Sprintf("%s (%d of %d)", record.Title, i+1, n+1), + Text: chopped, + BodyField: record.BodyField, } output = append(output, record) } chopped := fmt.Sprintf("(part %d of %d)\n\n%s", i+1, n+1, record.Text[s.textLimit*i:]) record := &alert.Record{ - Title: fmt.Sprintf("%s (%d of %d)", record.Title, i+1, n+1), - Text: chopped, + Title: fmt.Sprintf("%s (%d of %d)", record.Title, i+1, n+1), + Text: chopped, + BodyField: record.BodyField, } output = append(output, record) } diff --git a/command/command.go b/command/command.go index e986e6b..8ede15f 100644 --- a/command/command.go +++ b/command/command.go @@ -43,11 +43,11 @@ func Run() int { esClient, err := cfg.NewESClient() if err != nil { - logger.Error("Error creating new ElasticSearch HTTP client", "error", err) + logger.Error("Error creating new Elasticsearch HTTP client", "error", err) return 1 } - qhs, err := buildQueryHandlers(cfg.Rules, cfg.ElasticSearch.Server.ElasticSearchURL, esClient, logger) + qhs, err := buildQueryHandlers(cfg.Rules, cfg.Elasticsearch.Server.ElasticsearchURL, esClient, logger) if err != nil { logger.Error("Error creating query handlers from rules", "error", err) return 1 @@ -150,14 +150,13 @@ func Run() int { return 0 case <-reloadCh: logger.Info("SIGHUP received. Updating rules.") - rules, err := config.ParseRules() if err != nil { logger.Error("Error parsing rules. Exiting", "error", err) cancel() return 1 } - qhs, err := buildQueryHandlers(rules, cfg.ElasticSearch.Server.ElasticSearchURL, esClient, logger) + qhs, err := buildQueryHandlers(rules, cfg.Elasticsearch.Server.ElasticsearchURL, esClient, logger) if err != nil { logger.Error("Error creating query handlers from rules. Exiting", "error", err) cancel() @@ -226,6 +225,5 @@ func newConsulClient(config map[string]string) (*api.Client, error) { defer os.Unsetenv(env) } } - return api.NewClient(&api.Config{}) } diff --git a/command/controller.go b/command/controller.go index 51f32f5..004dc86 100644 --- a/command/controller.go +++ b/command/controller.go @@ -1,3 +1,16 @@ +// Copyright 2018 The Morning Consult, LLC or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + package command import ( @@ -5,7 +18,6 @@ import ( "errors" "sync" - // "github.com/morningconsult/go-elasticsearch-alerts/config" "github.com/morningconsult/go-elasticsearch-alerts/utils/lock" "github.com/morningconsult/go-elasticsearch-alerts/command/query" "github.com/morningconsult/go-elasticsearch-alerts/command/alert" diff --git a/command/handlers.go b/command/handlers.go index ef9c516..09daf9f 100644 --- a/command/handlers.go +++ b/command/handlers.go @@ -1,3 +1,16 @@ +// Copyright 2018 The Morning Consult, LLC or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + package command import ( @@ -45,8 +58,8 @@ func buildQueryHandlers(rules []*config.RuleConfig, esURL string, esClient *http AlertMethods: methods, Client: esClient, ESUrl: esURL, - QueryData: rule.ElasticSearchBody, - QueryIndex: rule.ElasticSearchIndex, + QueryData: rule.ElasticsearchBody, + QueryIndex: rule.ElasticsearchIndex, Schedule: rule.CronSchedule, BodyField: rule.BodyField, Filters: rule.Filters, diff --git a/command/query/job.go b/command/query/job.go index 11ce725..69fc6d2 100644 --- a/command/query/job.go +++ b/command/query/job.go @@ -44,20 +44,58 @@ const ( defaultBodyField string = "hits.hits._source" ) +// QueryHandlerConfig is passed as an argument to NewQueryHandler() type QueryHandlerConfig struct { - Name string - Logger hclog.Logger + // Name is the name of the rule. This should come from + // the 'name' field of the rule configuration file + Name string + + // AlertMethods will be passed along with any results returned + // by a query to the alert handler via the outputCh AlertMethods []alert.AlertMethod - Client *http.Client - ESUrl string - QueryData map[string]interface{} - QueryIndex string - Schedule string - BodyField string - Filters []string + + // Client is an *http.Client instance that will be used to + // query Elasticsearch + Client *http.Client + + // ESUrl is the URL of the Elasticsearch instance. This should + // come from the 'elasticsearch.server.url' field of the main + // configuration file + ESUrl string + + // QueryData is the payload to be included in the query. This + // should come from the 'body' field of the rule configuration + // file + QueryData map[string]interface{} + + // QueryIndex is the Elasticsearch index to be queried. This + // should come from the 'index' field of the rule configuration + // file + QueryIndex string + + // Schedule is the interval at which the defined Elasticsearch + // query should executed (in cron syntax) + Schedule string + + // BodyField is the field of the JSON response returned by + // Elasticsearch to be grouped on and subsequently sent to + // the specified outputs. This should come from the 'body_field' + // field of the rule configuration file + BodyField string + + // Filters are the additional fields to be grouped on. These + // should come from the 'filters' field of the rule configuration + // file + Filters []string + + Logger hclog.Logger } +// QueryHandler performs the defined Elasticsearch query at the +// specified interval and sends results to the AlertHandler if +// there are any. type QueryHandler struct { + // StopCh terminates the Run() method when closed StopCh chan struct{} name string @@ -73,7 +111,12 @@ type QueryHandler struct { filters []string } +// NewQueryHandler creates a new *QueryHandler instance func NewQueryHandler(config *QueryHandlerConfig) (*QueryHandler, error) { + if config == nil { + config = &QueryHandlerConfig{} + } + if config.Name == "" { return nil, errors.New("no rule name provided") } @@ -81,11 +124,11 @@ func NewQueryHandler(config *QueryHandlerConfig) (*QueryHandler, error) { config.ESUrl = strings.TrimRight(config.ESUrl, "/") if config.ESUrl == "" { - return nil, errors.New("no ElasticSearch URL provided") + return nil, errors.New("no Elasticsearch URL provided") } if config.QueryIndex == "" { - return nil, errors.New("no ElasticSearch index provided") + return nil, errors.New("no Elasticsearch index provided") } if len(config.AlertMethods) < 1 { @@ -135,6 +178,15 @@ func NewQueryHandler(config *QueryHandlerConfig) (*QueryHandler, error) { }, nil } +// Run starts the QueryHandler. It first attempts to get the "state" +// document for this rule from Elasticsearch in order to schedule +// the next execution at the last scheduled time. If it does not find +// such a document, or if the next scheduled query is in the past, it +// will execute the query immediately. Afterwards, it will attempt to +// write a new state document to Elasticsearch in which the 'next_query' +// equals the next time the query shall be executed per the provided +// cron schedule. It will only execute the query if distLock.Acquired() +// is true. func (q *QueryHandler) Run(ctx context.Context, outputCh chan *alert.Alert, wg *sync.WaitGroup, distLock *lock.Lock) { var ( now = time.Now() @@ -148,7 +200,7 @@ func (q *QueryHandler) Run(ctx context.Context, outputCh chan *alert.Alert, wg * t, err := q.getNextQuery(ctx) if err != nil { - q.logger.Error(fmt.Sprintf("[Rule: %q] error looking up next scheduled query in ElasticSearch, running query now instead", q.name), + q.logger.Error(fmt.Sprintf("[Rule: %q] error looking up next scheduled query in Elasticsearch, running query now instead", q.name), "error", err) select { case <-ctx.Done(): @@ -175,7 +227,7 @@ func (q *QueryHandler) Run(ctx context.Context, outputCh chan *alert.Alert, wg * if distLock.Acquired() { data, err := q.query(ctx) if err != nil { - q.logger.Error(fmt.Sprintf("[Rule: %q] error querying ElasticSearch", q.name), "error", err) + q.logger.Error(fmt.Sprintf("[Rule: %q] error querying Elasticsearch", q.name), "error", err) break } @@ -207,14 +259,18 @@ func (q *QueryHandler) Run(ctx context.Context, outputCh chan *alert.Alert, wg * next = q.schedule.Next(now) if maintainState { if err := q.setNextQuery(ctx, next, hits); err != nil { - q.logger.Error(fmt.Sprintf("[Rule: %q] error creating next query document in ElasticSearch", q.name), "error", err) - q.logger.Info(fmt.Sprintf("[Rule: %q] continuing without maintaining job state in ElasticSearch", q.name)) + q.logger.Error(fmt.Sprintf("[Rule: %q] error creating next query document in Elasticsearch", q.name), "error", err) + q.logger.Info(fmt.Sprintf("[Rule: %q] continuing without maintaining job state in Elasticsearch", q.name)) maintainState = false } } } } +// PutTemplate attempts to create a template in Elasticsearch which +// will serve as an alias for the state indices. The state indices +// will be named 'go-es-alerts-status-{date}'; therefore, this template +// enables searching all state indices via this alias func (q *QueryHandler) PutTemplate(ctx context.Context) error { payload := fmt.Sprintf(`{"index_patterns":["%s-status-%s-*"],"order":0,"aliases":{%q:{}},"settings":{"index":{"number_of_shards":5,"number_of_replicas":1,"auto_expand_replicas":"0-2","translog":{"flush_threshold_size":"752mb"},"sort":{"field":["next_query","rule_name","hostname"],"order":["desc","desc","desc"]}}},"mappings":{"_doc":{"dynamic_templates":[{"strings_as_keywords":{"match_mapping_type":"string","mapping":{"type":"keyword"}}}],"properties":{"@timestamp":{"type":"date"},"rule_name":{"type":"keyword"},"next_query":{"type":"date"},"hostname":{"type":"keyword"},"hits_count":{"type":"long","null_value":0},"hits":{"enabled":false}}}}}`, defaultStateIndexAlias, templateVersion, q.TemplateName()) @@ -243,11 +299,15 @@ func (q *QueryHandler) PutTemplate(ctx context.Context) error { return errors.New("value of 'acknowledged' field of JSON response cannot be cast to boolean") } if !ack { - return errors.New("ElasticSearch did not acknowledge creation of new template") + return errors.New("Elasticsearch did not acknowledge creation of new template") } return nil } +// getNextQuery queries the state indices for the most recently- +// created document belonging to this rule. It then attempts to +// parse the 'next_query' field in order to inform the Run() loop +// when to next execute the query. func (q *QueryHandler) getNextQuery(ctx context.Context) (*time.Time, error) { payload := fmt.Sprintf(`{"query":{"bool":{"must":[{"term":{"rule_name":{"value":%q}}}]}},"sort":[{"next_query":{"order":"desc"}}],"size":1}`, q.cleanedName()) @@ -295,6 +355,9 @@ func (q *QueryHandler) getNextQuery(ctx context.Context) (*time.Time, error) { return &t, nil } +// setNextQuery creates a new document in a state index to +// inform the Run() loop when to next execute the query if +// the process gets restarted func (q *QueryHandler) setNextQuery(ctx context.Context, ts time.Time, hits []map[string]interface{}) error { status := struct { Time string `json:"@timestamp"` @@ -333,7 +396,7 @@ func (q *QueryHandler) setNextQuery(ctx context.Context, ts time.Time, hits []ma func (q *QueryHandler) query(ctx context.Context) (map[string]interface{}, error) { queryData, err := jsonutil.EncodeJSON(q.queryData) if err != nil { - return nil, fmt.Errorf("error JSON-encoding ElasticSearch query body: %v", err) + return nil, fmt.Errorf("error JSON-encoding Elasticsearch query body: %v", err) } resp, err := q.makeRequest(ctx, "GET", fmt.Sprintf("%s/%s/_search", q.esURL, q.queryIndex), queryData) diff --git a/command/query/job_test.go b/command/query/job_test.go index 341d98b..c2546bc 100644 --- a/command/query/job_test.go +++ b/command/query/job_test.go @@ -35,10 +35,10 @@ import ( "github.com/morningconsult/go-elasticsearch-alerts/command/alert/file" ) -var SkipElasticSearchTests bool = false +var SkipElasticsearchTests bool = false const ( - ElasticSearchURL string = "http://127.0.0.1:9200" + ElasticsearchURL string = "http://127.0.0.1:9200" ConsulURL string = "http://127.0.0.1:8500" ) @@ -53,7 +53,7 @@ func TestNewQueryHandler(t *testing.T) { "success", &QueryHandlerConfig{ Name: "Test Errors", - ESUrl: ElasticSearchURL, + ESUrl: ElasticsearchURL, QueryIndex: "test-*", AlertMethods: []alert.AlertMethod{&file.FileAlertMethod{}}, QueryData: map[string]interface{}{ @@ -67,7 +67,7 @@ func TestNewQueryHandler(t *testing.T) { { "no-name", &QueryHandlerConfig{ - ESUrl: ElasticSearchURL, + ESUrl: ElasticsearchURL, QueryIndex: "test-*", AlertMethods: []alert.AlertMethod{&file.FileAlertMethod{}}, QueryData: map[string]interface{}{ @@ -90,13 +90,13 @@ func TestNewQueryHandler(t *testing.T) { Schedule: "@every 10m", }, true, - "no ElasticSearch URL provided", + "no Elasticsearch URL provided", }, { "no-query-index", &QueryHandlerConfig{ Name: "Test Errors", - ESUrl: ElasticSearchURL, + ESUrl: ElasticsearchURL, AlertMethods: []alert.AlertMethod{&file.FileAlertMethod{}}, QueryData: map[string]interface{}{ "ayy": "lmao", @@ -104,13 +104,13 @@ func TestNewQueryHandler(t *testing.T) { Schedule: "@every 10m", }, true, - "no ElasticSearch index provided", + "no Elasticsearch index provided", }, { "no-alert-methods", &QueryHandlerConfig{ Name: "Test Errors", - ESUrl: ElasticSearchURL, + ESUrl: ElasticsearchURL, QueryIndex: "test-*", AlertMethods: []alert.AlertMethod{}, QueryData: map[string]interface{}{ @@ -125,7 +125,7 @@ func TestNewQueryHandler(t *testing.T) { "cron-parse-error", &QueryHandlerConfig{ Name: "Test Errors", - ESUrl: ElasticSearchURL, + ESUrl: ElasticsearchURL, QueryIndex: "test-*", AlertMethods: []alert.AlertMethod{&file.FileAlertMethod{}}, QueryData: map[string]interface{}{ diff --git a/command/query/transform.go b/command/query/transform.go index 8aca6ab..eb927da 100644 --- a/command/query/transform.go +++ b/command/query/transform.go @@ -25,6 +25,12 @@ import ( const hitsDelimiter = "\n----------------------------------------\n" +// Transform converts the raw response returned from Elasticsearch into a +// []*github.com/morningconsult/go-elasticsearch-alerts/command/alert.Record +// array and returns that array, the response fields grouped by +// *QueryHandler.bodyField (if any), and an error if there was an error. +// If Transform returns a non-nil error, the other returned values will +// be nil. func (q *QueryHandler) Transform(respData map[string]interface{}) ([]*alert.Record, []map[string]interface{}, error) { var records []*alert.Record for _, filter := range q.filters { @@ -87,8 +93,9 @@ func (q *QueryHandler) Transform(respData map[string]interface{}) ([]*alert.Reco if len(stringifiedHits) > 0 { record := &alert.Record{ - Title: q.bodyField, - Text: strings.Join(stringifiedHits, hitsDelimiter), + Title: q.bodyField, + Text: strings.Join(stringifiedHits, hitsDelimiter), + BodyField: true, } records = append(records, record) } diff --git a/config/client.go b/config/client.go index 2167e2a..adcbd97 100644 --- a/config/client.go +++ b/config/client.go @@ -23,38 +23,66 @@ import ( "github.com/hashicorp/go-cleanhttp" ) +// ClientConfig is used to create new configured new +// *http.Client instances type ClientConfig struct { + // TLSEnabled is used to inform NewESClient whether to + // communicate with Elasticsearch via TLS. This value + // should come from the 'elasticsearch.client.tls_enabled' + // field of the main configuration file TLSEnabled bool `json:"tls_enabled"` - CACert string `json:"ca_cert"` + + // CACert is the path to a PEM-encoded CA certificate file. + // This value should come from the 'elasticsearch.client.ca_cert' + // field of the main configuration file + CACert string `json:"ca_cert"` + + // ClientCert is the path to a PEM-encoded client + // certificate file when connecting via TLS. This value + // should come from the 'elasticsearch.client.client_cert' + // field of the main configuration file ClientCert string `json:"client_cert"` - ClientKey string `json:"client_key"` + + // ClientKey is the path to a PEM-encoded client key file + // when connecting via TLS. This value should come from the + // 'elasticsearch.client.client_key' field of the main + // configuration file + ClientKey string `json:"client_key"` + + // ServerName is the server name to use as the SNI host when + // connecting via TLS. This value should come from the + // 'elasticsearch.client.server_name' field of the main + // configuration file ServerName string `json:"server_name"` } +// NewESClient creates a new HTTP client based on the +// values of ClientConfig's fields. This client should +// be used to communicate with Elasticsearch func (c *Config) NewESClient() (*http.Client, error) { client := cleanhttp.DefaultClient() - if c.ElasticSearch.Client == nil || !c.ElasticSearch.Client.TLSEnabled { + if c.Elasticsearch.Client == nil || !c.Elasticsearch.Client.TLSEnabled { return client, nil } - if c.ElasticSearch.Client.CACert == "" { + if c.Elasticsearch.Client.CACert == "" { return nil, fmt.Errorf("no path to CA certificate") } - if c.ElasticSearch.Client.ClientCert == "" { + if c.Elasticsearch.Client.ClientCert == "" { return nil, fmt.Errorf("no path to client certificate") } - if c.ElasticSearch.Client.ClientKey == "" { + if c.Elasticsearch.Client.ClientKey == "" { return nil, fmt.Errorf("no path to client key") } // Load client certificate - cert, err := tls.LoadX509KeyPair(c.ElasticSearch.Client.ClientCert, c.ElasticSearch.Client.ClientKey) + cert, err := tls.LoadX509KeyPair(c.Elasticsearch.Client.ClientCert, c.Elasticsearch.Client.ClientKey) if err != nil { return nil, fmt.Errorf("error loading X509 key pair: %v", err) } // Load CA certificate - caCert, err := ioutil.ReadFile(c.ElasticSearch.Client.CACert) + caCert, err := ioutil.ReadFile(c.Elasticsearch.Client.CACert) if err != nil { return nil, fmt.Errorf("error reading CA certificate file: %v", err) } @@ -64,7 +92,7 @@ func (c *Config) NewESClient() (*http.Client, error) { tlsConfig := &tls.Config{ Certificates: []tls.Certificate{cert}, RootCAs: caCertPool, - ServerName: c.ElasticSearch.Client.ServerName, + ServerName: c.Elasticsearch.Client.ServerName, } tlsConfig.BuildNameToCertificate() client.Transport.(*http.Transport).TLSClientConfig = tlsConfig diff --git a/config/client_test.go b/config/client_test.go index 5f11f08..2d43ff7 100644 --- a/config/client_test.go +++ b/config/client_test.go @@ -13,9 +13,7 @@ package config -import ( - "testing" -) +import "testing" func TestNewESClient(t *testing.T) { cases := []struct { @@ -26,7 +24,7 @@ func TestNewESClient(t *testing.T) { { "tls-disabled", &Config{ - ElasticSearch: &ESConfig{ + Elasticsearch: &ESConfig{ Client: &ClientConfig{ TLSEnabled: false, }, @@ -37,7 +35,7 @@ func TestNewESClient(t *testing.T) { { "no-ca-cert", &Config{ - ElasticSearch: &ESConfig{ + Elasticsearch: &ESConfig{ Client: &ClientConfig{ TLSEnabled: true, }, @@ -48,7 +46,7 @@ func TestNewESClient(t *testing.T) { { "no-client-cert", &Config{ - ElasticSearch: &ESConfig{ + Elasticsearch: &ESConfig{ Client: &ClientConfig{ TLSEnabled: true, CACert: "testdata/certs/cacert.pem", @@ -60,7 +58,7 @@ func TestNewESClient(t *testing.T) { { "no-client-key", &Config{ - ElasticSearch: &ESConfig{ + Elasticsearch: &ESConfig{ Client: &ClientConfig{ TLSEnabled: true, CACert: "testdata/certs/cacert.pem", @@ -73,7 +71,7 @@ func TestNewESClient(t *testing.T) { { "error-loading-pair", &Config{ - ElasticSearch: &ESConfig{ + Elasticsearch: &ESConfig{ Client: &ClientConfig{ TLSEnabled: true, CACert: "testdata/certs/cacert.pem", @@ -87,7 +85,7 @@ func TestNewESClient(t *testing.T) { { "error-reading-ca-cert", &Config{ - ElasticSearch: &ESConfig{ + Elasticsearch: &ESConfig{ Client: &ClientConfig{ TLSEnabled: true, CACert: "testdata/certs/i-dont-exist.pem", @@ -101,7 +99,7 @@ func TestNewESClient(t *testing.T) { { "success", &Config{ - ElasticSearch: &ESConfig{ + Elasticsearch: &ESConfig{ Client: &ClientConfig{ TLSEnabled: true, CACert: "testdata/certs/cacert.pem", diff --git a/config/parse.go b/config/parse.go index 012fcfc..012f792 100644 --- a/config/parse.go +++ b/config/parse.go @@ -30,43 +30,88 @@ const ( defaultRulesDir string = "/etc/go-elasticsearch-alerts/rules" ) + +// OutputConfig maps to each element of 'output' field of +// a rule configuration file type OutputConfig struct { Type string `json:"type"` Config map[string]interface{} `json:"config"` } +// RuleConfig represents a rule configuration file type RuleConfig struct { - Name string `json:"name"` - ElasticSearchIndex string `json:"index"` - CronSchedule string `json:"schedule"` - BodyField string `json:"body_field"` - ElasticSearchBodyRaw interface{} `json:"body"` - ElasticSearchBody map[string]interface{} `json:"-"` - Filters []string `json:"filters"` - Outputs []*OutputConfig `json:"outputs"` -} - -type DistributedConfig struct { - ConsulAddr string `json:"consul_address"` - ConsulLockKey string `json:"consul_lock_key"` + // Name is the name of the rule. This value should come + // from the 'name' field of the rule configuration file + Name string `json:"name"` + + // ElasticsearchIndex is the index that this rule should + // query. This value should come from the 'index' field + // of the rule configuration file + ElasticsearchIndex string `json:"index"` + + // CronSchedule is the interval at which the + // *github.com/morningconsult/go-elasticsearch-alerts/command/query.QueryHandler + // will execute the query. This value should come from + // the 'schedule' field of the rule configuration file + CronSchedule string `json:"schedule"` + + // BodyField is the field on which the application should + // group query responses before sending alerts. This value + // should come from the 'body_field' field of the rule + // configuration file + BodyField string `json:"body_field"` + + // ElasticsearchBodyRaw is the untyped query that this + // alert should send when querying Elasticsearch. This + // value should come from the 'body' field of the + // rule configuration file + ElasticsearchBodyRaw interface{} `json:"body"` + + // ElasticsearchBody is the typed query that this alert + // will send when querying Elasticsearch + ElasticsearchBody map[string]interface{} `json:"-"` + + // Filters are the additional fields on which the application + // should group query responses before sending alerts. This + // value should come from the 'filters' field of the rule + // configuration file + Filters []string `json:"filters"` + + // Outputs are the methods by which alerts should be sent + Outputs []*OutputConfig `json:"outputs"` } +// ServerConfig represents the 'elasticsearch.server' +// field of the main configuration file type ServerConfig struct { - ElasticSearchURL string `json:"url"` + // ElasticsearchURL is the URL of your Elasticsearch instance. + // This value should come from the 'elasticsearch.server.url' + // field of the main configuration file + ElasticsearchURL string `json:"url"` } +// ESConfig represents the 'elasticsearch' field of the +// main configuration file type ESConfig struct { + // Server represents the 'elasticsearch.server' field + // of the main configuration file Server *ServerConfig `json:"server"` + + // Client represents the 'elasticsearch.client' field + // of the main configuration file Client *ClientConfig `json:"client"` } +// Config represents the main configuration file type Config struct { - ElasticSearch *ESConfig `json:"elasticsearch"` + Elasticsearch *ESConfig `json:"elasticsearch"` Distributed bool `json:"distributed"` Consul map[string]string `json:"consul"` Rules []*RuleConfig `json:"-"` } +// ParseConfig parses the main configuration file and returns a +// *Config instance or an error if there was an error func ParseConfig() (*Config, error) { configFile := defaultConfigFile if v := os.Getenv(envConfigFile); v != "" { @@ -89,15 +134,15 @@ func ParseConfig() (*Config, error) { } file.Close() - if cfg.ElasticSearch == nil { + if cfg.Elasticsearch == nil { return nil, errors.New("no 'elasticsearch' field found in main configuration file") } - if cfg.ElasticSearch.Server == nil { + if cfg.Elasticsearch.Server == nil { return nil, errors.New("no 'elasticsearch.server' field found in main configuration file") } - if cfg.ElasticSearch.Server.ElasticSearchURL == "" { + if cfg.Elasticsearch.Server.ElasticsearchURL == "" { return nil, errors.New("field 'elasticsearch.server.url' of main configuration file is empty") } @@ -128,6 +173,8 @@ func ParseConfig() (*Config, error) { return cfg, nil } +// ParseRules parses the rule configuration files and returns +// an array of *RuleConfig or an error if there was an error func ParseRules() ([]*RuleConfig, error) { rulesDir := defaultRulesDir if v := os.Getenv(envRulesDir); v != "" { @@ -160,25 +207,25 @@ func ParseRules() ([]*RuleConfig, error) { } file.Close() - switch b := rule.ElasticSearchBodyRaw.(type) { + switch b := rule.ElasticsearchBodyRaw.(type) { case map[string]interface{}: - rule.ElasticSearchBody = b + rule.ElasticsearchBody = b case string: var body map[string]interface{} if err = jsonutil.DecodeJSON([]byte(b), &body); err != nil { return nil, fmt.Errorf("error JSON-decoding 'body' field of file %s: %v", file.Name(), err) } - rule.ElasticSearchBody = body + rule.ElasticsearchBody = body default: return nil, fmt.Errorf("'body' field of file %s must be valid JSON", file.Name()) } - rule.ElasticSearchBodyRaw = nil + rule.ElasticsearchBodyRaw = nil if rule.Name == "" { return nil, fmt.Errorf("no 'name' field found in rule file %s", file.Name()) } - if rule.ElasticSearchIndex == "" { + if rule.ElasticsearchIndex == "" { return nil, fmt.Errorf("no 'index' field found in rule file %s", file.Name()) } diff --git a/config/parse_test.go b/config/parse_test.go index 9d087a2..15388ac 100644 --- a/config/parse_test.go +++ b/config/parse_test.go @@ -167,8 +167,8 @@ func TestParseConfig_MainConfig(t *testing.T) { t.Fatal(err) } - if cfg.ElasticSearch.Server.ElasticSearchURL != "http://127.0.0.1:9200" { - t.Fatalf("got %q, expected \"http://127.0.0.1:9200\"", cfg.ElasticSearch.Server.ElasticSearchURL) + if cfg.Elasticsearch.Server.ElasticsearchURL != "http://127.0.0.1:9200" { + t.Fatalf("got %q, expected \"http://127.0.0.1:9200\"", cfg.Elasticsearch.Server.ElasticsearchURL) } if !cfg.Distributed { @@ -575,9 +575,9 @@ func TestParseConfig_Rules(t *testing.T) { continue } - if rules[i].ElasticSearchIndex != index { + if rules[i].ElasticsearchIndex != index { t.Fatalf("unexpected index value (got %q, expected %q)", - rules[i].ElasticSearchIndex, index) + rules[i].ElasticsearchIndex, index) } schedule, ok := contents["schedule"].(string) @@ -612,9 +612,9 @@ func TestParseConfig_Rules(t *testing.T) { continue } - if !reflect.DeepEqual(body, rules[i].ElasticSearchBody) { + if !reflect.DeepEqual(body, rules[i].ElasticsearchBody) { t.Fatalf("rule 'body' is unexpected:\nGot:\n\t%+v\nExpected:\n\t%+v", - rules[i].ElasticSearchBody, body) + rules[i].ElasticsearchBody, body) } outputs, ok := contents["outputs"].([]interface{}) diff --git a/main.go b/main.go index dcec917..ece0918 100644 --- a/main.go +++ b/main.go @@ -22,7 +22,7 @@ import ( "github.com/morningconsult/go-elasticsearch-alerts/version" ) -const banner = "Go ElasticSearch Alerts version %v, commit %v, built %v\n" +const banner = "Go Elasticsearch Alerts version %v, commit %v, built %v\n" func main() { var versionFlag bool diff --git a/utils/lock/lock.go b/utils/lock/lock.go index 0868e84..2886d0a 100644 --- a/utils/lock/lock.go +++ b/utils/lock/lock.go @@ -1,12 +1,28 @@ +// Copyright 2018 The Morning Consult, LLC or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"). You may +// not use this file except in compliance with the License. A copy of the +// License is located at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// or in the "license" file accompanying this file. This file is distributed +// on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +// express or implied. See the License for the specific language governing +// permissions and limitations under the License. + package lock import "sync" +// Lock is used as a mutex to synchronize between +// nodes running this proces type Lock struct { mutex *sync.RWMutex have *bool } +// NewLock creates a new *Lock instance func NewLock() *Lock { return &Lock{ mutex: new(sync.RWMutex), @@ -14,12 +30,16 @@ func NewLock() *Lock { } } +// Lock returns true if the lock has been acquired and +// false otherwise func (l *Lock) Acquired() bool { l.mutex.RLock() defer l.mutex.RUnlock() return *l.have } +// Set is used to set whether or not the lock +// has been acquired func (l *Lock) Set(b bool) { l.mutex.Lock() *l.have = b