Skip to content

Commit

Permalink
Add support for dynamical ingest pipeline selection in ES
Browse files Browse the repository at this point in the history
- Add dynamic ingest pipeline selection based on outil.Select
- Add api ingest node integration test
- Update logstash integration tests
- Add output plugin -> ingest node integration tests
- Add default setting to full config
  • Loading branch information
urso committed Aug 3, 2016
1 parent ec4a9b4 commit 69443f7
Show file tree
Hide file tree
Showing 15 changed files with 562 additions and 125 deletions.
3 changes: 3 additions & 0 deletions filebeat/filebeat.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,9 @@ output.elasticsearch:
# and generates [filebeat-]YYYY.MM.DD keys.
#index: "filebeat-%{+yyyy.MM.dd}"

# Optional ingest node pipeline. By default no pipeline will be used.
#pipeline: ""

# Optional HTTP Path
#path: "/elasticsearch"

Expand Down
3 changes: 3 additions & 0 deletions libbeat/_meta/config.full.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ output.elasticsearch:
# and generates [beatname-]YYYY.MM.DD keys.
#index: "beatname-%{+yyyy.MM.dd}"

# Optional ingest node pipeline. By default no pipeline will be used.
#pipeline: ""

# Optional HTTP Path
#path: "/elasticsearch"

Expand Down
93 changes: 56 additions & 37 deletions libbeat/outputs/elasticsearch/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ import (
)

type QueryResult struct {
Ok bool `json:"ok"`
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Source json.RawMessage `json:"_source"`
Version int `json:"_version"`
Found bool `json:"found"`
Exists bool `json:"exists"`
Created bool `json:"created"`
Matches []string `json:"matches"`
Ok bool `json:"ok"`
Index string `json:"_index"`
Type string `json:"_type"`
ID string `json:"_id"`
Source json.RawMessage `json:"_source"`
Version int `json:"_version"`
Found bool `json:"found"`
Exists bool `json:"exists"`
Created bool `json:"created"`
Acknowledged bool `json:"acknowledged"`
Matches []string `json:"matches"`
}

type SearchResults struct {
Expand Down Expand Up @@ -45,6 +46,14 @@ func (r QueryResult) String() string {
return string(out)
}

func withQueryResult(status int, resp []byte, err error) (int, *QueryResult, error) {
if err != nil {
return status, nil, err
}
result, err := readQueryResult(resp)
return status, result, err
}

func readQueryResult(obj []byte) (*QueryResult, error) {
var result QueryResult
if obj == nil {
Expand Down Expand Up @@ -97,54 +106,64 @@ func (es *Connection) Index(
if id == "" {
method = "POST"
}
return withQueryResult(es.apiCall(method, index, docType, id, "", params, body))
}

status, resp, err := es.apiCall(method, index, docType, id, params, body)
if err != nil {
return status, nil, err
func (es *Connection) Ingest(
index, docType, pipeline, id string,
params map[string]string,
body interface{},
) (int, *QueryResult, error) {
method := "PUT"
if id == "" {
method = "POST"
}
result, err := readQueryResult(resp)
return status, result, err
return withQueryResult(es.apiCall(method, index, docType, id, pipeline, params, body))
}

// Refresh an index. Call this after doing inserts or creating/deleting
// indexes in unit tests.
func (es *Connection) Refresh(index string) (int, *QueryResult, error) {
status, resp, err := es.apiCall("POST", index, "", "_refresh", nil, nil)
if err != nil {
return status, nil, err
}
result, err := readQueryResult(resp)
return status, result, err
return withQueryResult(es.apiCall("POST", index, "", "_refresh", "", nil, nil))
}

// CreateIndex creates a new index, optionally with settings and mappings passed in
// the body.
// Implements: https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html
//
func (es *Connection) CreateIndex(index string, body interface{}) (int, *QueryResult, error) {
status, resp, err := es.apiCall("PUT", index, "", "", nil, body)
if err != nil {
return status, nil, err
}
result, err := readQueryResult(resp)
return status, result, err
return withQueryResult(es.apiCall("PUT", index, "", "", "", nil, body))
}

// Delete deletes a typed JSON document from a specific index based on its id.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete.html
func (es *Connection) Delete(index string, docType string, id string, params map[string]string) (int, *QueryResult, error) {
status, resp, err := es.apiCall("DELETE", index, docType, id, params, nil)
if err != nil {
return status, nil, err
}
result, err := readQueryResult(resp)
return status, result, err
return withQueryResult(es.apiCall("DELETE", index, docType, id, "", params, nil))
}

// CreatePipeline create a new ingest pipeline with name id.
// Implements: https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html
func (es *Connection) CreatePipeline(
id string,
params map[string]string,
body interface{},
) (int, *QueryResult, error) {
return withQueryResult(es.apiCall("PUT", "_ingest", "pipeline", id, "", params, body))
}

// DeletePipeline deletes an ingest pipeline by id.
// Implements: https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-pipeline-api.html
func (es *Connection) DeletePipeline(
id string,
params map[string]string,
) (int, *QueryResult, error) {
return withQueryResult(es.apiCall("DELETE", "_ingest", "pipeline", id, "", params, nil))
}

// A search request can be executed purely using a URI by providing request parameters.
// Implements: http://www.elastic.co/guide/en/elasticsearch/reference/current/search-uri-request.html
func (es *Connection) SearchURI(index string, docType string, params map[string]string) (int, *SearchResults, error) {
status, resp, err := es.apiCall("GET", index, docType, "_search", params, nil)
status, resp, err := es.apiCall("GET", index, docType, "_search", "", params, nil)
if err != nil {
return status, nil, err
}
Expand All @@ -156,7 +175,7 @@ func (es *Connection) CountSearchURI(
index string, docType string,
params map[string]string,
) (int, *CountResults, error) {
status, resp, err := es.apiCall("GET", index, docType, "_count", params, nil)
status, resp, err := es.apiCall("GET", index, docType, "_count", "", params, nil)
if err != nil {
return status, nil, err
}
Expand All @@ -165,13 +184,13 @@ func (es *Connection) CountSearchURI(
}

func (es *Connection) apiCall(
method, index, docType, id string,
method, index, docType, id, pipeline string,
params map[string]string,
body interface{},
) (int, []byte, error) {
path, err := makePath(index, docType, id)
if err != nil {
return 0, nil, err
}
return es.request(method, path, params, body)
return es.request(method, path, pipeline, params, body)
}
67 changes: 64 additions & 3 deletions libbeat/outputs/elasticsearch/api_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,23 @@
package elasticsearch

import (
"encoding/json"
"fmt"
"os"
"testing"

"github.com/elastic/beats/libbeat/logp"
"github.com/stretchr/testify/assert"
)

func TestIndex(t *testing.T) {

if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"elasticsearch"})
}

client := GetTestingElasticsearch()
index := fmt.Sprintf("beats-test-index-%d", os.Getpid())

index := fmt.Sprintf("packetbeat-unittest-%d", os.Getpid())
client := GetTestingElasticsearch()

body := map[string]interface{}{
"user": "test",
Expand Down Expand Up @@ -52,3 +53,63 @@ func TestIndex(t *testing.T) {
t.Errorf("Delete() returns error: %s", err)
}
}

func TestIngest(t *testing.T) {
type obj map[string]interface{}

if testing.Verbose() {
logp.LogInit(logp.LOG_DEBUG, "", false, true, []string{"elasticsearch"})
}

index := fmt.Sprintf("beats-test-ingest-%d", os.Getpid())
pipeline := fmt.Sprintf("beats-test-pipeline-%d", os.Getpid())

pipelineBody := obj{
"description": "Test pipeline",
"processors": []obj{
{
"lowercase": obj{
"field": "testfield",
},
},
},
}

client := GetTestingElasticsearch()
_, _, err := client.DeletePipeline(pipeline, nil)

_, resp, err := client.CreatePipeline(pipeline, nil, pipelineBody)
if err != nil {
t.Fatal(err)
}
if !resp.Acknowledged {
t.Fatalf("Test pipeline %v not created", pipeline)
}

params := map[string]string{"refresh": "true"}
_, resp, err = client.Ingest(index, "test", pipeline, "1", params, obj{
"testfield": "TEST",
})
if err != nil {
t.Fatalf("Ingest() returns error: %s", err)
}
if !resp.Created {
t.Errorf("Ingest() fails: %s", resp)
}

// get _source field from indexed document
_, docBody, err := client.apiCall("GET", index, "test", "1/_source", "", nil, nil)
if err != nil {
t.Fatal(err)
}

doc := struct {
Field string `json:"testfield"`
}{}
err = json.Unmarshal(docBody, &doc)
if err != nil {
t.Fatal(err)
}

assert.Equal(t, "test", doc.Field)
}
10 changes: 8 additions & 2 deletions libbeat/outputs/elasticsearch/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,14 @@ func newTestClient(url string) *Client {
}

func newTestClientAuth(url, user, pass string) *Client {
index := outil.MakeSelector()
client, err := NewClient(url, index, nil, nil, user, pass, nil, 60*time.Second, 3, nil)
client, err := NewClient(ClientSettings{
URL: url,
Index: outil.MakeSelector(),
Username: user,
Password: pass,
Timeout: 60 * time.Second,
CompressionLevel: 3,
}, nil)
if err != nil {
panic(err)
}
Expand Down
11 changes: 1 addition & 10 deletions libbeat/outputs/elasticsearch/bulkapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,6 @@ import (
"strings"
)

type bulkMeta struct {
Index bulkMetaIndex `json:"index"`
}

type bulkMetaIndex struct {
Index string `json:"_index"`
DocType string `json:"_type"`
}

// MetaBuilder creates meta data for bulk requests
type MetaBuilder func(interface{}) interface{}

Expand Down Expand Up @@ -80,7 +71,7 @@ func newBulkRequest(
return nil, err
}

url := makeURL(urlStr, path, params)
url := makeURL(urlStr, path, "", params)

var reader io.Reader
if body != nil {
Expand Down
Loading

0 comments on commit 69443f7

Please sign in to comment.