Skip to content

Commit

Permalink
Allow to configure docker input with paths (#10687)
Browse files Browse the repository at this point in the history
Add a paths option to the docker input so paths can be used
as an alternative to ids. This will allow some level of support
of other runtimes as CRI-O.
  • Loading branch information
vjsamuel authored and jsoriano committed Feb 21, 2019
1 parent 2b7d9fe commit 6e56b99
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...v7.0.0-beta1[Check the
- Added support for ingesting structured Elasticsearch server logs {pull}10428[10428]
- Populate more ECS fields in the Suricata module. {pull}10006[10006]
- Add module zeek. {issue}9931[9931] {pull}10034[10034]
- Add support for CRI-O based logs autodiscover {pull}10687[10687]

*Heartbeat*

Expand Down
6 changes: 4 additions & 2 deletions filebeat/autodiscover/builder/hints/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ func defaultConfig() config {
rawCfg := map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"ids": []string{
"${data.container.id}",
"paths": []string{
// To be able to use this builder with CRI-O replace paths with:
// /var/log/pods/${data.kubernetes.pod.uid}/${data.kubernetes.container.name}/*.log
"/var/lib/docker/containers/${data.container.id}/*-json.log",
},
},
}
Expand Down
232 changes: 232 additions & 0 deletions filebeat/autodiscover/builder/hints/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,3 +399,235 @@ func TestGenerateHints(t *testing.T) {

}
}

func TestGenerateHintsWithPaths(t *testing.T) {
tests := []struct {
msg string
event bus.Event
path string
len int
result common.MapStr
}{
{
msg: "Empty event hints should return default config",
event: bus.Event{
"host": "1.2.3.4",
"kubernetes": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"pod": common.MapStr{
"name": "pod",
"uid": "12345",
},
},
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
},
path: "/var/lib/docker/containers/${data.kubernetes.container.id}/*-json.log",
len: 1,
result: common.MapStr{
"type": "docker",
"containers": map[string]interface{}{
"paths": []interface{}{"/var/lib/docker/containers/abc/*-json.log"},
},
"close_timeout": "true",
},
},
{
msg: "Hint with processors config must have a processors in the input config",
event: bus.Event{
"host": "1.2.3.4",
"kubernetes": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"pod": common.MapStr{
"name": "pod",
"uid": "12345",
},
},
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"hints": common.MapStr{
"logs": common.MapStr{
"processors": common.MapStr{
"1": common.MapStr{
"dissect": common.MapStr{
"tokenizer": "%{key1} %{key2}",
},
},
"drop_event": common.MapStr{},
},
},
},
},
len: 1,
path: "/var/log/pods/${data.kubernetes.pod.uid}/${data.kubernetes.container.name}/*.log",
result: common.MapStr{
"type": "docker",
"containers": map[string]interface{}{
"paths": []interface{}{"/var/log/pods/12345/foobar/*.log"},
},
"close_timeout": "true",
"processors": []interface{}{
map[string]interface{}{
"dissect": map[string]interface{}{
"tokenizer": "%{key1} %{key2}",
},
},
map[string]interface{}{
"drop_event": nil,
},
},
},
},
{
msg: "Hint with module should attach input to its filesets",
event: bus.Event{
"host": "1.2.3.4",
"kubernetes": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"pod": common.MapStr{
"name": "pod",
"uid": "12345",
},
},
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"hints": common.MapStr{
"logs": common.MapStr{
"module": "apache2",
},
},
},
len: 1,
path: "/var/log/pods/${data.kubernetes.pod.uid}/${data.kubernetes.container.name}/*.log",
result: common.MapStr{
"module": "apache2",
"error": map[string]interface{}{
"enabled": true,
"input": map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"stream": "all",
"paths": []interface{}{"/var/log/pods/12345/foobar/*.log"},
},
"close_timeout": "true",
},
},
"access": map[string]interface{}{
"enabled": true,
"input": map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"stream": "all",
"paths": []interface{}{"/var/log/pods/12345/foobar/*.log"},
},
"close_timeout": "true",
},
},
},
},
{
msg: "Hint with module should honor defined filesets",
event: bus.Event{
"host": "1.2.3.4",
"kubernetes": common.MapStr{
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"pod": common.MapStr{
"name": "pod",
"uid": "12345",
},
},
"container": common.MapStr{
"name": "foobar",
"id": "abc",
},
"hints": common.MapStr{
"logs": common.MapStr{
"module": "apache2",
"fileset": "access",
},
},
},
len: 1,
path: "/var/log/pods/${data.kubernetes.pod.uid}/${data.kubernetes.container.name}/*.log",
result: common.MapStr{
"module": "apache2",
"access": map[string]interface{}{
"enabled": true,
"input": map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"stream": "all",
"paths": []interface{}{"/var/log/pods/12345/foobar/*.log"},
},
"close_timeout": "true",
},
},
"error": map[string]interface{}{
"enabled": false,
"input": map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"stream": "all",
"paths": []interface{}{"/var/log/pods/12345/foobar/*.log"},
},
"close_timeout": "true",
},
},
},
},
}

for _, test := range tests {
cfg, _ := common.NewConfigFrom(map[string]interface{}{
"config": map[string]interface{}{
"type": "docker",
"containers": map[string]interface{}{
"paths": []string{
test.path,
},
},
"close_timeout": "true",
},
})

// Configure path for modules access
abs, _ := filepath.Abs("../../..")
err := paths.InitPaths(&paths.Path{
Home: abs,
})

l, err := NewLogHints(cfg)
if err != nil {
t.Fatal(err)
}

cfgs := l.CreateConfig(test.event)
assert.Equal(t, len(cfgs), test.len, test.msg)
if test.len != 0 {
config := common.MapStr{}
err := cfgs[0].Unpack(&config)
assert.Nil(t, err, test.msg)

assert.Equal(t, test.result, config, test.msg)
}

}
}
3 changes: 3 additions & 0 deletions filebeat/input/docker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var defaultConfig = config{
}

type config struct {
// List of containers' log files to tail
Containers containers `config:"containers"`

// Partial configures the input to join partial lines
Expand All @@ -43,6 +44,8 @@ type containers struct {
IDs []string `config:"ids"`
Path string `config:"path"`

Paths []string `config:"paths"`

// Stream can be all, stdout or stderr
Stream string `config:"stream"`
}
39 changes: 33 additions & 6 deletions filebeat/input/docker/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,19 @@ func NewInput(
return nil, errors.Wrap(err, "reading docker input config")
}

// Docker input should make sure that no callers should ever pass empty strings as container IDs
// Docker input should make sure that no callers should ever pass empty strings as container IDs or paths
// Hence we explicitly make sure that we catch such things and print stack traces in the event of
// an invocation so that it can be fixed.
var ids []string
var ids, paths []string
for _, p := range config.Containers.Paths {
if p != "" {
paths = append(paths, p)
} else {
logger.Error("Docker paths can't be empty for Docker input config")
logger.Debugw("Empty path for Docker logfile was received", logp.Stack("stacktrace"))
}
}

for _, containerID := range config.Containers.IDs {
if containerID != "" {
ids = append(ids, containerID)
Expand All @@ -64,12 +73,23 @@ func NewInput(
}
}

if len(ids) == 0 {
return nil, errors.New("Docker input requires at least one entry under 'containers.ids'")
if len(ids) == 0 && len(paths) == 0 {
return nil, errors.New("Docker input requires at least one entry under 'containers.ids' or 'containers.paths'")
}

for idx, containerID := range ids {
cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log"))
// IDs + Path and Paths are mutually exclusive. Ensure that only one of them are set in a given configuration
if len(ids) != 0 && len(paths) != 0 {
return nil, errors.New("can not provide both 'containers.ids' and 'containers.paths' in the same input config")
}

if len(ids) != 0 {
for idx, containerID := range ids {
cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log"))
}
} else {
for idx, p := range paths {
cfg.SetString("paths", idx, p)
}
}

if err := checkStream(config.Containers.Stream); err != nil {
Expand All @@ -92,6 +112,13 @@ func NewInput(
return nil, errors.Wrap(err, "update input config")
}

if len(paths) != 0 {
// Set symlinks to true as CRI-O paths could point to symlinks instead of the actual path.
if err := cfg.SetBool("symlinks", -1, true); err != nil {
return nil, errors.Wrap(err, "update input config")
}
}

// Add stream to meta to ensure different state per stream
if config.Containers.Stream != "all" {
if context.Meta == nil {
Expand Down

0 comments on commit 6e56b99

Please sign in to comment.