Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport template loading #1137

Merged
merged 4 commits into from
Mar 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]

*Affecting all Beats*
- Add ability to override configuration settings using environment variables {issue}114[114]
- Libbeat now always exits through a single exit method for proper cleanup and control {pull}736[736]
- Add ability to create Elasticsearch mapping on startup {pull}639[639]

*Packetbeat*

Expand Down
16 changes: 15 additions & 1 deletion filebeat/etc/filebeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ filebeat:
# Ignore files which were modified more then the defined timespan in the past.
# In case all files on your system must be read you can set this value very large.
# Time strings like 2h (2 hours), 5m (5 minutes) can be used.
#ignore_older: 72h
#ignore_older: 0

# Close older closes the file handler for which were not modified
# for longer then close_older
Expand Down Expand Up @@ -199,6 +199,20 @@ output:
# [filebeat-]YYYY.MM.DD keys.
#index: "filebeat"

# A template is used to set the mapping in Elasticsearch
# By default template loading is disabled and no template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
#template:

# Template name. By default the template name is filebeat.
#name: "filebeat"

# Path to template file
#path: "filebeat.template.json"

# Overwrite existing template
#overwrite: false

# Optional HTTP Path
#path: "/elasticsearch"

Expand Down
14 changes: 14 additions & 0 deletions libbeat/etc/libbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ output:
# [beatname-]YYYY.MM.DD keys.
#index: "beatname"

# A template is used to set the mapping in Elasticsearch
# By default template loading is disabled and no template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
#template:

# Template name. By default the template name is beatname.
#name: "beatname"

# Path to template file
#path: "beatname.template.json"

# Overwrite existing template
#overwrite: false

# Optional HTTP Path
#path: "/elasticsearch"

Expand Down
34 changes: 33 additions & 1 deletion libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,38 @@ func (client *Client) PublishEvent(event common.MapStr) error {
return nil
}

// LoadTemplate loads a template into Elasticsearch overwriting the existing
// template if it exists. If you wish to not overwrite an existing template
// then use CheckTemplate prior to calling this method.
func (client *Client) LoadTemplate(templateName string, reader *bytes.Reader) error {

status, _, err := client.execRequest("PUT", client.URL+"/_template/"+templateName, reader)

if err != nil {
return fmt.Errorf("Template could not be loaded. Error: %s", err)
}
if status != 200 {
return fmt.Errorf("Template could not be loaded. Status: %v", status)
}

logp.Info("Elasticsearch template with name '%s' loaded", templateName)

return nil
}

// CheckTemplate checks if a given template already exist. It returns true if
// and only if Elasticsearch returns with HTTP status code 200.
func (client *Client) CheckTemplate(templateName string) bool {

status, _, _ := client.request("HEAD", "/_template/"+templateName, nil, nil)

if status != 200 {
return false
}

return true
}

func (conn *Connection) Connect(timeout time.Duration) error {
var err error
conn.connected, err = conn.Ping(timeout)
Expand Down Expand Up @@ -290,7 +322,7 @@ func (conn *Connection) request(
body interface{},
) (int, []byte, error) {
url := makeURL(conn.URL, path, params)
logp.Debug("elasticsearch", "%s %s %s", method, url, body)
logp.Debug("elasticsearch", "%s %s %v", method, url, body)

var obj []byte
if body != nil {
Expand Down
76 changes: 76 additions & 0 deletions libbeat/outputs/elasticsearch/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"testing"
"time"

"bytes"
"github.com/stretchr/testify/assert"
"io/ioutil"
"path/filepath"
)

func TestClientConnect(t *testing.T) {
Expand All @@ -18,3 +21,76 @@ func TestClientConnect(t *testing.T) {
assert.Nil(t, err)
assert.True(t, client.IsConnected())
}

func TestCheckTemplate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test in short mode because it requires ES")
}

client := GetTestingElasticsearch()
err := client.Connect(5 * time.Second)
assert.Nil(t, err)

// Check for non existant template
assert.False(t, client.CheckTemplate("libbeat"))
}

func TestLoadTemplate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test in short mode because it requires ES")
}

// Load template
absPath, err := filepath.Abs("../../tests/files/")
assert.NotNil(t, absPath)
assert.Nil(t, err)

templatePath := absPath + "/template.json"
content, err := ioutil.ReadFile(templatePath)
reader := bytes.NewReader(content)
assert.Nil(t, err)

// Setup ES
client := GetTestingElasticsearch()
err = client.Connect(5 * time.Second)
assert.Nil(t, err)

templateName := "testbeat"

// Load template
err = client.LoadTemplate(templateName, reader)
assert.Nil(t, err)

// Make sure template was loaded
assert.True(t, client.CheckTemplate(templateName))

// Delete template again to clean up
client.request("DELETE", "/_template/"+templateName, nil, nil)

// Make sure it was removed
assert.False(t, client.CheckTemplate(templateName))

}

func TestLoadInvalidTemplate(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test in short mode because it requires ES")
}

// Invalid Template
reader := bytes.NewReader([]byte("{json:invalid}"))

// Setup ES
client := GetTestingElasticsearch()
err := client.Connect(5 * time.Second)
assert.Nil(t, err)

templateName := "invalidtemplate"

// Try to load invalid template
err = client.LoadTemplate(templateName, reader)
assert.Error(t, err)

// Make sure template was not loaded
assert.False(t, client.CheckTemplate(templateName))
}
45 changes: 45 additions & 0 deletions libbeat/outputs/elasticsearch/output.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ import (
"strings"
"time"

"bytes"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/outputs/mode"
"io/ioutil"
)

var debug = logp.MakeDebug("elasticsearch")
Expand Down Expand Up @@ -77,6 +79,7 @@ func (out *elasticsearchOutput) init(
}

clients, err := mode.MakeClients(config, makeClientFactory(tlsConfig, config))

if err != nil {
return err
}
Expand Down Expand Up @@ -117,6 +120,8 @@ func (out *elasticsearchOutput) init(
return err
}

loadTemplate(config.Template, clients)

if config.Save_topology {
err := out.EnableTTL()
if err != nil {
Expand Down Expand Up @@ -146,6 +151,46 @@ func (out *elasticsearchOutput) init(
return nil
}

// loadTemplate checks if the index mapping template should be loaded
// In case template loading is enabled, template is written to index
func loadTemplate(config outputs.Template, clients []mode.ProtocolClient) {
// Check if template should be loaded
// Not being able to load the template will output an error but will not stop execution
if config.Name != "" && len(clients) > 0 {

// Always takes the first client
esClient := clients[0].(*Client)

logp.Info("Loading template enabled. Trying to load template: %v", config.Path)

exists := esClient.CheckTemplate(config.Name)

// Check if template already exist or should be overwritten
if !exists || config.Overwrite {

if config.Overwrite {
logp.Info("Existing template will be overwritten, as overwrite is enabled.")
}

// Load template from file
content, err := ioutil.ReadFile(config.Path)
if err != nil {
logp.Err("Could not load template from file path: %s; Error: %s", config.Path, err)
} else {
reader := bytes.NewReader(content)
err = esClient.LoadTemplate(config.Name, reader)

if err != nil {
logp.Err("Could not load template: %v", err)
}
}
} else {
logp.Info("Template already exists and will not be overwritten.")
}

}
}

func makeClientFactory(
tls *tls.Config,
config outputs.MothershipConfig,
Expand Down
7 changes: 7 additions & 0 deletions libbeat/outputs/outputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type MothershipConfig struct {
ProxyURL string `yaml:"proxy_url"`
Index string
Path string
Template Template
Db int
Db_topology int
Timeout int
Expand All @@ -34,6 +35,12 @@ type MothershipConfig struct {
CompressionLevel *int `yaml:"compression_level"`
}

type Template struct {
Name string
Path string
Overwrite bool
}

type Options struct {
Guaranteed bool
}
Expand Down
42 changes: 42 additions & 0 deletions libbeat/tests/files/template.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"mappings": {
"_default_": {
"_all": {
"enabled": true,
"norms": {
"enabled": false
}
},
"dynamic_templates": [
{
"template1": {
"mapping": {
"doc_values": true,
"ignore_above": 1024,
"index": "not_analyzed",
"type": "{dynamic_type}"
},
"match": "*"
}
}
],
"properties": {
"@timestamp": {
"type": "date"
},
"message": {
"type": "string",
"index": "analyzed"
},
"offset": {
"type": "long",
"doc_values": "true"
}
}
}
},
"settings": {
"index.refresh_interval": "5s"
},
"template": "mockbeat-*"
}
1 change: 0 additions & 1 deletion libbeat/tests/system/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ def test_base(self):
"""

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*"
)
os.mkdir(self.working_dir + "/log/")

Expand Down
14 changes: 14 additions & 0 deletions packetbeat/etc/packetbeat.yml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ output:
# [packetbeat-]YYYY.MM.DD keys.
#index: "packetbeat"

# A template is used to set the mapping in Elasticsearch
# By default template loading is disabled and no template is loaded.
# These settings can be adjusted to load your own template or overwrite existing ones
#template:

# Template name. By default the template name is packetbeat.
#name: "packetbeat"

# Path to template file
#path: "packetbeat.template.json"

# Overwrite existing template
#overwrite: false

# Optional HTTP Path
#path: "/elasticsearch"

Expand Down
Loading