Skip to content

Commit

Permalink
Adding tests for multiple pipeline loading, including rollback
Browse files Browse the repository at this point in the history
  • Loading branch information
ycombinator committed Dec 27, 2018
1 parent 0a10ae2 commit c5cb4d7
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 0 deletions.
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/_meta/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
- module: foo
# Fileset with multiple pipelines
multi:
enabled: true

# Fileset with multiple pipelines with the last one being bad
multibad:
enabled: true
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/multi/config/multi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type: log
paths:
- /tmp
exclude_files: [".gz$"]

fields:
service.name: "foo"
fields_under_root: true
10 changes: 10 additions & 0 deletions filebeat/_meta/test/module/foo/multi/ingest/json_logs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"processors": [
{
"rename": {
"field": "json",
"target_field": "log.meta"
}
}
]
}
27 changes: 27 additions & 0 deletions filebeat/_meta/test/module/foo/multi/ingest/pipeline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"^%{CHAR:first_char}"
],
"pattern_definitions": {
"CHAR": "."
}
}
},
{
"pipeline": {
"if": "ctx.first_char == '{'",
"name": "{< IngestPipeline "json_logs" >}"
}
},
{
"pipeline": {
"if": "ctx.first_char != '{'",
"name": "{< IngestPipeline "plain_logs" >}"
}
}
]
}
12 changes: 12 additions & 0 deletions filebeat/_meta/test/module/foo/multi/ingest/plain_logs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"^%{DATA:some_data}"
]
}
}
]
}
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/multi/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module_version: 1.0

ingest_pipeline:
- ingest/pipeline.json
- ingest/json_logs.json
- ingest/plain_logs.json

input: config/multi.yml
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/config/multi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
type: log
paths:
- /tmp
exclude_files: [".gz$"]

fields:
service.name: "foo"
fields_under_root: true
10 changes: 10 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/ingest/json_logs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"processors": [
{
"rename": {
"field": "json",
"target_field": "log.meta"
}
}
]
}
27 changes: 27 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/ingest/pipeline.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"processors": [
{
"grok": {
"field": "message",
"patterns": [
"^%{CHAR:first_char}"
],
"pattern_definitions": {
"CHAR": "."
}
}
},
{
"pipeline": {
"if": "ctx.first_char == '{'",
"name": "{< IngestPipeline "json_logs" >}"
}
},
{
"pipeline": {
"if": "ctx.first_char != '{'",
"name": "{< IngestPipeline "plain_logs" >}"
}
}
]
}
12 changes: 12 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/ingest/plain_logs_bad.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"processors": [
{
"invalid_processor": {
"field": "message",
"patterns": [
"^%{DATA:some_data}"
]
}
}
]
}
8 changes: 8 additions & 0 deletions filebeat/_meta/test/module/foo/multibad/manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module_version: 1.0

ingest_pipeline:
- ingest/pipeline.json
- ingest/json_logs.json
- ingest/plain_logs_bad.json

input: config/multi.yml
94 changes: 94 additions & 0 deletions filebeat/fileset/modules_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,97 @@ func hasIngest(client *elasticsearch.Client) bool {
v := client.GetVersion()
return v.Major >= 5
}

func hasIngestPipelineProcessor(client *elasticsearch.Client) bool {
v := client.GetVersion()
return v.Major > 6 || (v.Major == 6 && v.Minor >= 5)
}

func TestLoadMultiplePipelines(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}

if !hasIngestPipelineProcessor(client) {
t.Skip("Skip tests because ingest is missing the pipeline processor: ", client.GetVersion())
}

client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-pipeline", "", nil, nil)
client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-json_logs", "", nil, nil)
client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-plain_logs", "", nil, nil)

modulesPath, err := filepath.Abs("../_meta/test/module")
assert.NoError(t, err)

enabled := true
disabled := false
filesetConfigs := map[string]*FilesetConfig{
"multi": &FilesetConfig{Enabled: &enabled},
"multibad": &FilesetConfig{Enabled: &disabled},
}
configs := []*ModuleConfig{
&ModuleConfig{"foo", &enabled, filesetConfigs},
}

reg, err := newModuleRegistry(modulesPath, configs, nil, "6.6.0")
if err != nil {
t.Fatal(err)
}

err = reg.LoadPipelines(client, false)
if err != nil {
t.Fatal(err)
}

status, _, _ := client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-pipeline", "", nil, nil)
assert.Equal(t, 200, status)
status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-json_logs", "", nil, nil)
assert.Equal(t, 200, status)
status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multi-plain_logs", "", nil, nil)
assert.Equal(t, 200, status)
}

func TestLoadMultiplePipelinesWithRollback(t *testing.T) {
client := estest.GetTestingElasticsearch(t)
if !hasIngest(client) {
t.Skip("Skip tests because ingest is missing in this elasticsearch version: ", client.GetVersion())
}

if !hasIngestPipelineProcessor(client) {
t.Skip("Skip tests because ingest is missing the pipeline processor: ", client.GetVersion())
}

client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-pipeline", "", nil, nil)
client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-json_logs", "", nil, nil)
client.Request("DELETE", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-plain_logs_bad", "", nil, nil)

modulesPath, err := filepath.Abs("../_meta/test/module")
assert.NoError(t, err)

enabled := true
disabled := false
filesetConfigs := map[string]*FilesetConfig{
"multi": &FilesetConfig{Enabled: &disabled},
"multibad": &FilesetConfig{Enabled: &enabled},
}
configs := []*ModuleConfig{
&ModuleConfig{"foo", &enabled, filesetConfigs},
}

reg, err := newModuleRegistry(modulesPath, configs, nil, "6.6.0")
if err != nil {
t.Fatal(err)
}

err = reg.LoadPipelines(client, false)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "invalid_processor")

status, _, _ := client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-pipeline", "", nil, nil)
assert.Equal(t, 404, status)
status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-json_logs", "", nil, nil)
assert.Equal(t, 404, status)
status, _, _ = client.Request("GET", "/_ingest/pipeline/filebeat-6.6.0-foo-multibad-plain_logs_bad", "", nil, nil)
assert.Equal(t, 404, status)
}

0 comments on commit c5cb4d7

Please sign in to comment.