Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use local timezone for TZ conversion in the FB system module #5647

Merged
merged 6 commits into from
Nov 21, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ https://github.com/elastic/beats/compare/v6.0.0-beta2...master[Check the HEAD di
- Add Kubernetes manifests to deploy Filebeat. {pull}5349[5349]
- Add experimental Docker `json-file` prospector . {pull}5402[5402]
- Add experimental Docker autodiscover functionality. {pull}5245[5245]
- Add option to convert the timestamps to UTC in the system module. {pull}5647[5647]

*Heartbeat*

Expand Down
8 changes: 8 additions & 0 deletions filebeat/docs/include/var-convert-timezone.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
*`var.convert_timezone`*::

If this option is enabled, Filebeat reads the local timezone and uses it at log
parsing time to convert the timestamp to UTC. The local timezone is also added
in each event in a dedicated field (`beat.timezone`). The conversion is only
possible in Elasticsearch >= 6.1. If the Elasticsearch version is less than 6.1,
the `beat.timezone` field is added, but the conversion to UTC is not made. The
default is `false`.
14 changes: 12 additions & 2 deletions filebeat/docs/modules/system.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ image::./images/kibana-system.png[]
include::../include/configuring-intro.asciidoc[]

The following example shows how to set paths in the +modules.d/{modulename}.yml+
file to override the default paths for the syslog and authorization logs:
file to override the default paths for the syslog and authorization logs:

["source","yaml",subs="attributes"]
-----
Expand All @@ -55,7 +55,7 @@ To specify the same settings at the command line, you use:
-----


The command in the example assumes that you have already enabled the +{modulename}+ module.
The command in the example assumes that you have already enabled the +{modulename}+ module.

//set the fileset name used in the included example
:fileset_ex: syslog
Expand All @@ -68,6 +68,16 @@ include::../include/config-option-intro.asciidoc[]

include::../include/var-paths.asciidoc[]

include::../include/var-convert-timezone.asciidoc[]

[float]
==== `auth` fileset settings

include::../include/var-paths.asciidoc[]

include::../include/var-convert-timezone.asciidoc[]




[float]
Expand Down
6 changes: 6 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ filebeat.modules:
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
#convert_timezone: false

# Prospector configuration (advanced). Any prospector configuration option
# can be added under this section.
#prospector:
Expand All @@ -33,6 +36,9 @@ filebeat.modules:
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
#convert_timezone: false

# Prospector configuration (advanced). Any prospector configuration option
# can be added under this section.
#prospector:
Expand Down
87 changes: 74 additions & 13 deletions filebeat/fileset/fileset.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"text/template"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
mlimporter "github.com/elastic/beats/libbeat/ml-importer"
)

Expand Down Expand Up @@ -51,6 +52,11 @@ func New(
}, nil
}

// String returns the module and the name of the fileset.
func (fs *Fileset) String() string {
return fs.mcfg.Module + "/" + fs.name
}

// Read reads the manifest file and evaluates the variables.
func (fs *Fileset) Read(beatVersion string) error {
var err error
Expand Down Expand Up @@ -155,18 +161,57 @@ func (fs *Fileset) evaluateVars() (map[string]interface{}, error) {
return vars, nil
}

// turnOffElasticsearchVars re-evaluates the variables that have `min_elasticsearch_version`
// set.
func (fs *Fileset) turnOffElasticsearchVars(vars map[string]interface{}, esVersion string) (map[string]interface{}, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice feature.

retVars := map[string]interface{}{}
for key, val := range vars {
retVars[key] = val
}

haveVersion, err := common.NewVersion(esVersion)
if err != nil {
return vars, fmt.Errorf("Error parsing version %s: %v", esVersion, err)
}

for _, vals := range fs.manifest.Vars {
var ok bool
name, ok := vals["name"].(string)
if !ok {
return nil, fmt.Errorf("Variable doesn't have a string 'name' key")
}

minESVersion, ok := vals["min_elasticsearch_version"].(map[string]interface{})
if ok {
minVersion, err := common.NewVersion(minESVersion["version"].(string))
if err != nil {
return vars, fmt.Errorf("Error parsing version %s: %v", minESVersion["version"].(string), err)
}

logp.Debug("fileset", "Comparing ES version %s with requirement of %s", haveVersion, minVersion)

if haveVersion.LessThan(minVersion) {
retVars[name] = minESVersion["value"]
logp.Info("Setting var %s (%s) to %v because Elasticsearch version is %s", name, fs, minESVersion["value"], haveVersion)
}
}
}

return retVars, nil
}

// resolveVariable considers the value as a template so it can refer to built-in variables
// as well as other variables defined before them.
func resolveVariable(vars map[string]interface{}, value interface{}) (interface{}, error) {
switch v := value.(type) {
case string:
return applyTemplate(vars, v)
return applyTemplate(vars, v, false)
case []interface{}:
transformed := []interface{}{}
for _, val := range v {
s, ok := val.(string)
if ok {
transf, err := applyTemplate(vars, s)
transf, err := applyTemplate(vars, s, false)
if err != nil {
return nil, fmt.Errorf("array: %v", err)
}
Expand All @@ -180,9 +225,15 @@ func resolveVariable(vars map[string]interface{}, value interface{}) (interface{
return value, nil
}

// applyTemplate applies a Golang text/template
func applyTemplate(vars map[string]interface{}, templateString string) (string, error) {
tpl, err := template.New("text").Parse(templateString)
// applyTemplate applies a Golang text/template. If specialDelims is set to true,
// the delimiters are set to `{%` and `%}` instead of `{{` and `}}`. These are easier to use
// in pipeline definitions.
func applyTemplate(vars map[string]interface{}, templateString string, specialDelims bool) (string, error) {
tpl := template.New("text")
if specialDelims {
tpl = tpl.Delims("{%", "%}")
}
tpl, err := tpl.Parse(templateString)
if err != nil {
return "", fmt.Errorf("Error parsing template %s: %v", templateString, err)
}
Expand Down Expand Up @@ -215,7 +266,7 @@ func (fs *Fileset) getBuiltinVars() (map[string]interface{}, error) {
}

func (fs *Fileset) getProspectorConfig() (*common.Config, error) {
path, err := applyTemplate(fs.vars, fs.manifest.Prospector)
path, err := applyTemplate(fs.vars, fs.manifest.Prospector, false)
if err != nil {
return nil, fmt.Errorf("Error expanding vars on the prospector path: %v", err)
}
Expand All @@ -224,7 +275,7 @@ func (fs *Fileset) getProspectorConfig() (*common.Config, error) {
return nil, fmt.Errorf("Error reading prospector file %s: %v", path, err)
}

yaml, err := applyTemplate(fs.vars, string(contents))
yaml, err := applyTemplate(fs.vars, string(contents), false)
if err != nil {
return nil, fmt.Errorf("Error interpreting the template of the prospector: %v", err)
}
Expand Down Expand Up @@ -269,27 +320,37 @@ func (fs *Fileset) getProspectorConfig() (*common.Config, error) {

// getPipelineID returns the Ingest Node pipeline ID
func (fs *Fileset) getPipelineID(beatVersion string) (string, error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline)
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false)
if err != nil {
return "", fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

return formatPipelineID(fs.mcfg.Module, fs.name, path, beatVersion), nil
}

func (fs *Fileset) GetPipeline() (pipelineID string, content map[string]interface{}, err error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline)
// GetPipeline returns the JSON content of the Ingest Node pipeline that parses the logs.
func (fs *Fileset) GetPipeline(esVersion string) (pipelineID string, content map[string]interface{}, err error) {
path, err := applyTemplate(fs.vars, fs.manifest.IngestPipeline, false)
if err != nil {
return "", nil, fmt.Errorf("Error expanding vars on the ingest pipeline path: %v", err)
}

f, err := os.Open(filepath.Join(fs.modulePath, fs.name, path))
strContents, err := ioutil.ReadFile(filepath.Join(fs.modulePath, fs.name, path))
if err != nil {
return "", nil, fmt.Errorf("Error reading pipeline file %s: %v", path, err)
}

dec := json.NewDecoder(f)
err = dec.Decode(&content)
vars, err := fs.turnOffElasticsearchVars(fs.vars, esVersion)
if err != nil {
return "", nil, err
}

jsonString, err := applyTemplate(vars, string(strContents), true)
if err != nil {
return "", nil, fmt.Errorf("Error interpreting the template of the ingest pipeline: %v", err)
}

err = json.Unmarshal([]byte(jsonString), &content)
if err != nil {
return "", nil, fmt.Errorf("Error JSON decoding the pipeline file: %s: %v", path, err)
}
Expand Down
46 changes: 45 additions & 1 deletion filebeat/fileset/fileset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
package fileset

import (
"encoding/json"
"fmt"
"path/filepath"
"runtime"
"testing"

"github.com/stretchr/testify/assert"

"github.com/elastic/beats/libbeat/logp"
)

func getModuleForTesting(t *testing.T, module, fileset string) *Fileset {
Expand Down Expand Up @@ -193,9 +196,50 @@ func TestGetPipelineNginx(t *testing.T) {
fs := getModuleForTesting(t, "nginx", "access")
assert.NoError(t, fs.Read("5.2.0"))

pipelineID, content, err := fs.GetPipeline()
pipelineID, content, err := fs.GetPipeline("5.2.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-5.2.0-nginx-access-default", pipelineID)
assert.Contains(t, content, "description")
assert.Contains(t, content, "processors")
}

func TestGetPipelineConvertTS(t *testing.T) {
if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"fileset", "modules"})
}

// load system/syslog
modulesPath, err := filepath.Abs("../module")
assert.NoError(t, err)
fs, err := New(modulesPath, "syslog", &ModuleConfig{Module: "system"}, &FilesetConfig{
Var: map[string]interface{}{
"convert_timezone": true,
},
})
assert.NoError(t, err)
assert.NoError(t, fs.Read("6.1.0"))

// ES 6.0.0 should not have beat.timezone referenced
pipelineID, content, err := fs.GetPipeline("6.0.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
marshaled, err := json.Marshal(content)
assert.NoError(t, err)
assert.NotContains(t, string(marshaled), "beat.timezone")

// ES 6.1.0 should have beat.timezone referenced
pipelineID, content, err = fs.GetPipeline("6.1.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
marshaled, err = json.Marshal(content)
assert.NoError(t, err)
assert.Contains(t, string(marshaled), "beat.timezone")

// ES 6.2.0 should have beat.timezone referenced
pipelineID, content, err = fs.GetPipeline("6.2.0")
assert.NoError(t, err)
assert.Equal(t, "filebeat-6.1.0-system-syslog-pipeline", pipelineID)
marshaled, err = json.Marshal(content)
assert.NoError(t, err)
assert.Contains(t, string(marshaled), "beat.timezone")
}
2 changes: 1 addition & 1 deletion filebeat/fileset/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader) error {
}
}

pipelineID, content, err := fileset.GetPipeline()
pipelineID, content, err := fileset.GetPipeline(esClient.GetVersion())
if err != nil {
return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err)
}
Expand Down
6 changes: 6 additions & 0 deletions filebeat/module/system/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
#convert_timezone: false

# Prospector configuration (advanced). Any prospector configuration option
# can be added under this section.
#prospector:
Expand All @@ -19,6 +22,9 @@
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
#convert_timezone: false

# Prospector configuration (advanced). Any prospector configuration option
# can be added under this section.
#prospector:
6 changes: 6 additions & 0 deletions filebeat/module/system/_meta/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,16 @@
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
#convert_timezone: false

# Authorization logs
auth:
enabled: true

# Set custom paths for the log files. If left empty,
# Filebeat will choose the paths depending on your OS.
#var.paths:

# Convert the timestamp to UTC. Requires Elasticsearch >= 6.1.
#convert_timezone: false
14 changes: 12 additions & 2 deletions filebeat/module/system/_meta/docs.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ image::./images/kibana-system.png[]
include::../include/configuring-intro.asciidoc[]

The following example shows how to set paths in the +modules.d/{modulename}.yml+
file to override the default paths for the syslog and authorization logs:
file to override the default paths for the syslog and authorization logs:

["source","yaml",subs="attributes"]
-----
Expand All @@ -50,7 +50,7 @@ To specify the same settings at the command line, you use:
-----


The command in the example assumes that you have already enabled the +{modulename}+ module.
The command in the example assumes that you have already enabled the +{modulename}+ module.

//set the fileset name used in the included example
:fileset_ex: syslog
Expand All @@ -63,3 +63,13 @@ include::../include/config-option-intro.asciidoc[]

include::../include/var-paths.asciidoc[]

include::../include/var-convert-timezone.asciidoc[]

[float]
==== `auth` fileset settings

include::../include/var-paths.asciidoc[]

include::../include/var-convert-timezone.asciidoc[]


4 changes: 4 additions & 0 deletions filebeat/module/system/auth/config/auth.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,7 @@ exclude_files: [".gz$"]
multiline:
pattern: "^\\s"
match: after
{{ if .convert_timezone }}
processors:
- add_locale: ~
{{ end }}
1 change: 1 addition & 0 deletions filebeat/module/system/auth/ingest/pipeline.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"MMM d HH:mm:ss",
"MMM dd HH:mm:ss"
],
{% if .convert_timezone %}"timezone": "{{ beat.timezone }}",{% end %}
"ignore_failure": true
}
},
Expand Down
Loading