From bc3bfef104cc3f9d7f81ea907971594be3ba0454 Mon Sep 17 00:00:00 2001 From: Martin Chodur Date: Fri, 28 Jan 2022 02:56:55 +0100 Subject: [PATCH] feat: add elastic search ingester module Signed-off-by: Martin Chodur --- CHANGELOG.md | 2 + cmd/slo_exporter.go | 3 + docs/configuration.md | 1 + docs/modules/elasticsearch_ingester.md | 82 +++++++ go.mod | 1 + go.sum | 13 +- pkg/elasticsearch_client/elastic_client.go | 52 +++++ pkg/elasticsearch_client/mock.go | 25 +++ pkg/elasticsearch_client/v7.go | 97 ++++++++ pkg/elasticsearch_ingester/elastic_tailer.go | 192 ++++++++++++++++ .../elastic_tailer_test.go | 174 +++++++++++++++ .../elasticsearch_ingester.go | 209 ++++++++++++++++++ pkg/tailer/tailer.go | 6 +- pkg/tailer/tailer_test.go | 4 +- 14 files changed, 855 insertions(+), 6 deletions(-) create mode 100644 docs/modules/elasticsearch_ingester.md create mode 100644 pkg/elasticsearch_client/elastic_client.go create mode 100644 pkg/elasticsearch_client/mock.go create mode 100644 pkg/elasticsearch_client/v7.go create mode 100644 pkg/elasticsearch_ingester/elastic_tailer.go create mode 100644 pkg/elasticsearch_ingester/elastic_tailer_test.go create mode 100644 pkg/elasticsearch_ingester/elasticsearch_ingester.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 4af9ef1..1697dc1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## Unreleased +### Added +- [#82](https://github.com/seznam/slo-exporter/pull/82) New module `elasticSerachIngester`, for more info see [the docs](./docs/modules/elasticsearch_ingester.md) ## [v6.11.0] 2022-01-19 ### Added diff --git a/cmd/slo_exporter.go b/cmd/slo_exporter.go index 8925059..ed9200b 100644 --- a/cmd/slo_exporter.go +++ b/cmd/slo_exporter.go @@ -3,6 +3,7 @@ package main import ( "context" "fmt" + "github.com/seznam/slo-exporter/pkg/elasticsearch_ingester" "runtime" "github.com/gorilla/mux" @@ -71,6 +72,8 @@ func moduleFactory(moduleName string, logger logrus.FieldLogger, conf *viper.Vip return prometheus_ingester.NewFromViper(conf, logger) case "kafkaIngester": return kafka_ingester.NewFromViper(conf, logger) + case "elasticSearchIngester": + return elasticsearch_ingester.NewFromViper(conf, logger) case "envoyAccessLogServer": return envoy_access_log_server.NewFromViper(conf, logger) case "eventMetadataRenamer": diff --git a/docs/configuration.md b/docs/configuration.md index fe4f612..cc3816b 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -58,6 +58,7 @@ Only produces new events from the specified data source. - [`prometheusIngester`](modules/prometheus_ingester.md) - [`envoyAccessLogServer`](modules/envoy_access_log_server.md) - [`kafkaIngester`](modules/kafka_ingester.md) + - [`elasticSearchIngester`](modules/elasticsearch_ingester.md) ##### Processors: Reads input events, does some processing based in the module type and produces modified event. diff --git a/docs/modules/elasticsearch_ingester.md b/docs/modules/elasticsearch_ingester.md new file mode 100644 index 0000000..0999f94 --- /dev/null +++ b/docs/modules/elasticsearch_ingester.md @@ -0,0 +1,82 @@ +# Elasticsearch ingester + +| | | +|----------------|-------------------------| +| `moduleName` | `elasticSearchIngester` | +| Module type | `producer` | +| Output event | `raw` | + +This module allows you to real time follow all new documents using Elasticsearch query and compute SLO based on those. + +Most common use case would be, if running in Kubernetes for example and already collecting logs using the ELK stack. You +can simply hook to those data and compute SLO based on those application logs. + +### Elastic search versions and support + +Currently, only v7 is supported. + +### How does it work + +The module periodically(interval is configurable) queries(you can pass in custom Lucene query) +the Elasticsearch index(needs to be specified) and for every hit creates a new event from the document. All the +documents needs to have a field with a timestamp(field name and format configurable), so the module can sort them and +store the last queried document timestamp. In next iteration it will use this timestamp as lower limit for the range +query, so it does not miss any entries. Each query is limited by maximum batch size(configurable) co the requests are +not huge. + +In case you do not use structured logging and your logs are not indexed, you can specify name of the field with the raw +log entry and regular expression with named groups which, if matched, will be propagated to the event metadata. + +### moduleConfig + +```yaml +# OPTIONAL Debug logging +debug: false +# REQUIRED Version of the Elasticsearch API to be used, possible values: 7 +apiVersion: "v7" +# REQUIRED List of addresses pointing to the Elasticsearch API endpoint to query +addresses: + - "https://foo.bar:4433" +# OPTIONAL Skip verification of the server certificate +insecureSkipVerify: false +# OPTIONAL Timeout for the Elasticsearch API call +timeout: "5s" +# Enable/disable sniffing, autodiscovery of other nodes in Elasticsearch cluster +sniffing: true +# Enable/disable healtchecking of the Elasticsearch nodes +healthchecks: true + +# OPTIONAL username to use for authentication +username: "foo" +# OPTIONAL password to use for authentication +password: "bar" +# OPTIONAL Client certificate to be used for authentication +clientCertFile: "./client.pem" +# OPTIONAL Client certificate key to be used for authentication +clientKeyFile: "./client-key.pem" +# OPTIONAL Custom CA certificate to verify the server certificate +clientCaCertFile: "./ca.cert" + +# OPTIONAL Interval how often to check for new documents from the Elasticsearch API. +# If the module was falling behind fo the amount of documents in the Elaseticsearch, it will +# query it more often. +interval: 5s +# REQUIRED Name of the index to be queried +index: "my-index" +# OPTIONAL Additional Lucene query to filter the results +query: "app_name: nginx AND namespace: test" +# OPTIONAL Maximum number of documents to be read in one batch +maxBatchSize: 100 + +# REQUIRED Document filed name containing a timestamp of the event +timestampField: "@timestamp" +# REQUIRED Golang time parse format to parse the timestamp from the timestampField +timestampFormat: "2006-01-02T15:04:05Z07:00" # See # https://www.geeksforgeeks.org/time-formatting-in-golang/ for common examples +# OPTIONAL Name of the field in document containing the raw log message you want to parse +rawLogField: "log" +# OPTIONAL Regular expression to be used to parse the raw log message, +# each matched named group will be propagated to the new event metadata +rawLogParseRegexp: '^(?P[A-Fa-f0-9.:]{4,50}) \S+ \S+ \[(?P