diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc
index f997f507de3..7742c6d24de 100644
--- a/CHANGELOG.next.asciidoc
+++ b/CHANGELOG.next.asciidoc
@@ -178,6 +178,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Add support to new MongoDB additional diagnostic information {pull}11952[11952]
- New module `palo_alto` for Palo Alto Networks PAN-OS logs. {pull}11999[11999]
- Add RabbitMQ module. {pull}12032[12032]
+- Add new `container` input. {pull}12162[12162]
*Heartbeat*
@@ -218,6 +219,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Filebeat*
+- `docker` input is deprecated in favour `container`. {pull}12162[12162]
+
*Heartbeat*
*Journalbeat*
diff --git a/filebeat/autodiscover/builder/hints/config.go b/filebeat/autodiscover/builder/hints/config.go
index cae7aa2192c..aaf8901022d 100644
--- a/filebeat/autodiscover/builder/hints/config.go
+++ b/filebeat/autodiscover/builder/hints/config.go
@@ -26,7 +26,7 @@ type config struct {
func defaultConfig() config {
rawCfg := map[string]interface{}{
- "type": "docker",
+ "type": "container",
"containers": map[string]interface{}{
"paths": []string{
// To be able to use this builder with CRI-O replace paths with:
diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc
index c87e68fe05a..b7adb5d1d6f 100644
--- a/filebeat/docs/filebeat-options.asciidoc
+++ b/filebeat/docs/filebeat-options.asciidoc
@@ -44,6 +44,7 @@ You can configure {beatname_uc} to use the following inputs:
* <<{beatname_lc}-input-log>>
* <<{beatname_lc}-input-stdin>>
+* <<{beatname_lc}-input-container>>
* <<{beatname_lc}-input-redis>>
* <<{beatname_lc}-input-udp>>
* <<{beatname_lc}-input-docker>>
@@ -57,6 +58,8 @@ include::inputs/input-log.asciidoc[]
include::inputs/input-stdin.asciidoc[]
+include::inputs/input-container.asciidoc[]
+
include::inputs/input-redis.asciidoc[]
include::inputs/input-udp.asciidoc[]
diff --git a/filebeat/docs/inputs/input-container.asciidoc b/filebeat/docs/inputs/input-container.asciidoc
new file mode 100644
index 00000000000..fcd3051f1ae
--- /dev/null
+++ b/filebeat/docs/inputs/input-container.asciidoc
@@ -0,0 +1,64 @@
+:type: container
+
+[id="{beatname_lc}-input-{type}"]
+=== Container input
+
+++++
+Container
+++++
+
+Use the `container` input to read containers log files.
+
+This input searches for container logs under the given path, and parse them into
+common message lines, extracting timestamps too. Everything happens before line
+filtering, multiline, and JSON decoding, so this input can be used in
+combination with those settings.
+
+Example configuration:
+
+["source","yaml",subs="attributes"]
+----
+{beatname_lc}.inputs:
+- type: container
+ paths: <1>
+ - '/var/lib/docker/containers/*/*.log'
+----
+
+<1> `paths` is required. All other settings are optional.
+
+==== Configuration options
+
+The `container` input supports the following configuration options plus the
+<<{beatname_lc}-input-{type}-common-options>> described later.
+
+===== `stream`
+
+Reads from the specified streams only: `all`, `stdout` or `stderr`. The default
+is `all`.
+
+===== `format`
+
+Use the given format when reading the log file: `auto`, `docker` or `cri`. The
+default is `auto`, it will automatically detect the format. To disable
+autodetection set any of the other options.
+
+
+The following input configures {beatname_uc} to read the `stdout` stream from
+all containers under the default Kubernetes logs path:
+
+[source,yaml]
+----
+- type: container
+ stream: stdout
+ paths:
+ - "/var/log/containers/*.log"
+----
+
+include::../inputs/input-common-harvester-options.asciidoc[]
+
+include::../inputs/input-common-file-options.asciidoc[]
+
+[id="{beatname_lc}-input-{type}-common-options"]
+include::../inputs/input-common-options.asciidoc[]
+
+:type!:
diff --git a/filebeat/docs/inputs/input-docker.asciidoc b/filebeat/docs/inputs/input-docker.asciidoc
index c801fbb6d85..7949ce90db6 100644
--- a/filebeat/docs/inputs/input-docker.asciidoc
+++ b/filebeat/docs/inputs/input-docker.asciidoc
@@ -7,6 +7,8 @@
Docker
++++
+deprecated[7.2.0, Use `container` input instead.]
+
Use the `docker` input to read logs from Docker containers.
This input searches for container logs under its path, and parse them into
@@ -103,4 +105,4 @@ include::../inputs/input-common-file-options.asciidoc[]
[id="{beatname_lc}-input-{type}-common-options"]
include::../inputs/input-common-options.asciidoc[]
-:type!:
+:type!:
\ No newline at end of file
diff --git a/filebeat/harvester/util.go b/filebeat/harvester/util.go
index cd22f7f71fd..e885aab3881 100644
--- a/filebeat/harvester/util.go
+++ b/filebeat/harvester/util.go
@@ -21,11 +21,12 @@ import "github.com/elastic/beats/libbeat/common/match"
// Contains available input types
const (
- LogType = "log"
- StdinType = "stdin"
- RedisType = "redis"
- UdpType = "udp"
- DockerType = "docker"
+ LogType = "log"
+ StdinType = "stdin"
+ RedisType = "redis"
+ UdpType = "udp"
+ DockerType = "docker"
+ ContainerType = "container"
)
// MatchAny checks if the text matches any of the regular expressions
diff --git a/filebeat/include/list.go b/filebeat/include/list.go
index 4f2f6892930..6876c7a1a62 100644
--- a/filebeat/include/list.go
+++ b/filebeat/include/list.go
@@ -21,6 +21,7 @@ package include
import (
// Import packages that need to register themselves.
+ _ "github.com/elastic/beats/filebeat/input/container"
_ "github.com/elastic/beats/filebeat/input/docker"
_ "github.com/elastic/beats/filebeat/input/log"
_ "github.com/elastic/beats/filebeat/input/redis"
diff --git a/filebeat/input/container/config.go b/filebeat/input/container/config.go
new file mode 100644
index 00000000000..b6a23f52a13
--- /dev/null
+++ b/filebeat/input/container/config.go
@@ -0,0 +1,58 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package container
+
+import (
+ "fmt"
+ "strings"
+)
+
+var defaultConfig = config{
+ Stream: "all",
+ Format: "auto",
+}
+
+type config struct {
+ // Stream can be all, stdout or stderr
+ Stream string `config:"stream"`
+
+ // Format can be auto, cri, json-file
+ Format string `config:"format"`
+}
+
+// Validate validates the config.
+func (c *config) Validate() error {
+ if !stringInSlice(c.Stream, []string{"all", "stdout", "stderr"}) {
+ return fmt.Errorf("invalid value for stream: %s, supported values are: all, stdout, stderr", c.Stream)
+ }
+
+ if !stringInSlice(strings.ToLower(c.Format), []string{"auto", "docker", "cri"}) {
+ return fmt.Errorf("invalid value for format: %s, supported values are: auto, docker, cri", c.Format)
+ }
+
+ return nil
+}
+
+func stringInSlice(str string, list []string) bool {
+ for _, v := range list {
+ if v == str {
+ return true
+ }
+ }
+ return false
+}
diff --git a/filebeat/input/container/input.go b/filebeat/input/container/input.go
new file mode 100644
index 00000000000..656b93b47fd
--- /dev/null
+++ b/filebeat/input/container/input.go
@@ -0,0 +1,74 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package container
+
+import (
+ "github.com/elastic/beats/filebeat/channel"
+ "github.com/elastic/beats/filebeat/input"
+ "github.com/elastic/beats/filebeat/input/log"
+ "github.com/elastic/beats/libbeat/common"
+
+ "github.com/pkg/errors"
+)
+
+func init() {
+ err := input.Register("container", NewInput)
+ if err != nil {
+ panic(err)
+ }
+}
+
+// NewInput creates a new container input
+func NewInput(
+ cfg *common.Config,
+ outletFactory channel.Connector,
+ context input.Context,
+) (input.Input, error) {
+ // Wrap log input with custom docker settings
+ config := defaultConfig
+ if err := cfg.Unpack(&config); err != nil {
+ return nil, errors.Wrap(err, "reading container input config")
+ }
+
+ err := cfg.Merge(common.MapStr{
+ "docker-json.partial": true,
+ "docker-json.cri_flags": true,
+
+ // Allow stream selection (stdout/stderr/all)
+ "docker-json.stream": config.Stream,
+
+ // Select file format (auto/cri/docker)
+ "docker-json.format": config.Format,
+
+ // Set symlinks to true as CRI-O paths could point to symlinks instead of the actual path.
+ "symlinks": true,
+ })
+ if err != nil {
+ return nil, errors.Wrap(err, "update input config")
+ }
+
+ // Add stream to meta to ensure different state per stream
+ if config.Stream != "all" {
+ if context.Meta == nil {
+ context.Meta = map[string]string{}
+ }
+ context.Meta["stream"] = config.Stream
+ }
+
+ return log.NewInput(cfg, outletFactory, context)
+}
diff --git a/filebeat/input/docker/config.go b/filebeat/input/docker/config.go
index 2f0b1323c7e..8a79f57f68b 100644
--- a/filebeat/input/docker/config.go
+++ b/filebeat/input/docker/config.go
@@ -44,8 +44,6 @@ type containers struct {
IDs []string `config:"ids"`
Path string `config:"path"`
- Paths []string `config:"paths"`
-
// Stream can be all, stdout or stderr
Stream string `config:"stream"`
}
diff --git a/filebeat/input/docker/input.go b/filebeat/input/docker/input.go
index 458745e66dd..c174e5aa1f0 100644
--- a/filebeat/input/docker/input.go
+++ b/filebeat/input/docker/input.go
@@ -25,6 +25,7 @@ import (
"github.com/elastic/beats/filebeat/input"
"github.com/elastic/beats/filebeat/input/log"
"github.com/elastic/beats/libbeat/common"
+ "github.com/elastic/beats/libbeat/common/cfgwarn"
"github.com/elastic/beats/libbeat/logp"
"github.com/pkg/errors"
@@ -45,25 +46,18 @@ func NewInput(
) (input.Input, error) {
logger := logp.NewLogger("docker")
+ cfgwarn.Deprecate("8.0.0", "'docker' input deprecated. Use 'container' input instead.")
+
// Wrap log input with custom docker settings
config := defaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, errors.Wrap(err, "reading docker input config")
}
- // Docker input should make sure that no callers should ever pass empty strings as container IDs or paths
+ // Docker input should make sure that no callers should ever pass empty strings as container IDs
// Hence we explicitly make sure that we catch such things and print stack traces in the event of
// an invocation so that it can be fixed.
- var ids, paths []string
- for _, p := range config.Containers.Paths {
- if p != "" {
- paths = append(paths, p)
- } else {
- logger.Error("Docker paths can't be empty for Docker input config")
- logger.Debugw("Empty path for Docker logfile was received", logp.Stack("stacktrace"))
- }
- }
-
+ var ids []string
for _, containerID := range config.Containers.IDs {
if containerID != "" {
ids = append(ids, containerID)
@@ -73,23 +67,12 @@ func NewInput(
}
}
- if len(ids) == 0 && len(paths) == 0 {
- return nil, errors.New("Docker input requires at least one entry under 'containers.ids' or 'containers.paths'")
- }
-
- // IDs + Path and Paths are mutually exclusive. Ensure that only one of them are set in a given configuration
- if len(ids) != 0 && len(paths) != 0 {
- return nil, errors.New("can not provide both 'containers.ids' and 'containers.paths' in the same input config")
+ if len(ids) == 0 {
+ return nil, errors.New("Docker input requires at least one entry under 'containers.ids''")
}
- if len(ids) != 0 {
- for idx, containerID := range ids {
- cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log"))
- }
- } else {
- for idx, p := range paths {
- cfg.SetString("paths", idx, p)
- }
+ for idx, containerID := range ids {
+ cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log"))
}
if err := checkStream(config.Containers.Stream); err != nil {
@@ -108,13 +91,8 @@ func NewInput(
return nil, errors.Wrap(err, "update input config")
}
- if err := cfg.SetBool("docker-json.force_cri_logs", -1, config.CRIForce); err != nil {
- return nil, errors.Wrap(err, "update input config")
- }
-
- if len(paths) != 0 {
- // Set symlinks to true as CRI-O paths could point to symlinks instead of the actual path.
- if err := cfg.SetBool("symlinks", -1, true); err != nil {
+ if config.CRIForce {
+ if err := cfg.SetString("docker-json.format", -1, "cri"); err != nil {
return nil, errors.Wrap(err, "update input config")
}
}
diff --git a/filebeat/input/log/config.go b/filebeat/input/log/config.go
index 9a0f0d8dcb7..78669b4c107 100644
--- a/filebeat/input/log/config.go
+++ b/filebeat/input/log/config.go
@@ -109,7 +109,7 @@ type config struct {
DockerJSON *struct {
Stream string `config:"stream"`
Partial bool `config:"partial"`
- ForceCRI bool `config:"force_cri_logs"`
+ Format string `config:"format"`
CRIFlags bool `config:"cri_flags"`
} `config:"docker-json"`
}
diff --git a/filebeat/input/log/harvester.go b/filebeat/input/log/harvester.go
index eb85420ec44..23cf62d26e4 100644
--- a/filebeat/input/log/harvester.go
+++ b/filebeat/input/log/harvester.go
@@ -154,9 +154,7 @@ func (h *Harvester) open() error {
switch h.config.Type {
case harvester.StdinType:
return h.openStdin()
- case harvester.LogType:
- return h.openFile()
- case harvester.DockerType:
+ case harvester.LogType, harvester.DockerType, harvester.ContainerType:
return h.openFile()
default:
return fmt.Errorf("Invalid harvester type: %+v", h.config)
@@ -577,7 +575,7 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) {
if h.config.DockerJSON != nil {
// Docker json-file format, add custom parsing to the pipeline
- r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.ForceCRI, h.config.DockerJSON.CRIFlags)
+ r = readjson.New(r, h.config.DockerJSON.Stream, h.config.DockerJSON.Partial, h.config.DockerJSON.Format, h.config.DockerJSON.CRIFlags)
}
if h.config.JSON != nil {
diff --git a/filebeat/tests/files/logs/cri.log b/filebeat/tests/files/logs/cri.log
new file mode 100644
index 00000000000..b469e1b8e3d
--- /dev/null
+++ b/filebeat/tests/files/logs/cri.log
@@ -0,0 +1 @@
+2017-09-12T22:32:21.212861448Z stdout F 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache
diff --git a/filebeat/tests/system/test_container.py b/filebeat/tests/system/test_container.py
new file mode 100644
index 00000000000..deb5cb12aab
--- /dev/null
+++ b/filebeat/tests/system/test_container.py
@@ -0,0 +1,68 @@
+from filebeat import BaseTest
+import socket
+import os
+
+
+class Test(BaseTest):
+ """
+ Test filebeat with the container input
+ """
+
+ def test_container_input(self):
+ """
+ Test container input
+ """
+ input_raw = """
+- type: container
+ paths:
+ - {}/logs/*.log
+"""
+ self.render_config_template(
+ input_raw=input_raw.format(os.path.abspath(self.working_dir)),
+ inputs=False,
+ )
+
+ os.mkdir(self.working_dir + "/logs/")
+ self.copy_files(["logs/docker.log"],
+ target_dir="logs")
+
+ filebeat = self.start_beat()
+
+ self.wait_until(lambda: self.output_has(lines=21))
+
+ filebeat.check_kill_and_wait()
+
+ output = self.read_output()
+ assert len(output) == 21
+ assert output[0]["message"] == "Fetching main repository github.com/elastic/beats..."
+ for o in output:
+ assert o["stream"] == "stdout"
+
+ def test_container_input_cri(self):
+ """
+ Test container input with CRI format
+ """
+ input_raw = """
+- type: container
+ paths:
+ - {}/logs/*.log
+"""
+ self.render_config_template(
+ input_raw=input_raw.format(os.path.abspath(self.working_dir)),
+ inputs=False,
+ )
+
+ os.mkdir(self.working_dir + "/logs/")
+ self.copy_files(["logs/cri.log"],
+ target_dir="logs")
+
+ filebeat = self.start_beat()
+
+ self.wait_until(lambda: self.output_count(lambda x: x >= 1))
+ self.wait_until(lambda: self.log_contains("End of file reached"))
+
+ filebeat.check_kill_and_wait()
+
+ output = self.read_output()
+ assert len(output) == 1
+ assert output[0]["stream"] == "stdout"
diff --git a/libbeat/reader/readjson/docker_json.go b/libbeat/reader/readjson/docker_json.go
index bdea5177fd9..d104ad7cf21 100644
--- a/libbeat/reader/readjson/docker_json.go
+++ b/libbeat/reader/readjson/docker_json.go
@@ -21,6 +21,7 @@ import (
"bytes"
"encoding/json"
"runtime"
+ "strings"
"time"
"github.com/pkg/errors"
@@ -38,12 +39,11 @@ type DockerJSONReader struct {
// join partial lines
partial bool
- // Force log format: json-file | cri
- forceCRI bool
-
// parse CRI flags
criflags bool
+ parseLine func(message *reader.Message, msg *logLine) error
+
stripNewLine func(msg *reader.Message)
}
@@ -56,15 +56,23 @@ type logLine struct {
}
// New creates a new reader renaming a field
-func New(r reader.Reader, stream string, partial bool, forceCRI bool, CRIFlags bool) *DockerJSONReader {
+func New(r reader.Reader, stream string, partial bool, format string, CRIFlags bool) *DockerJSONReader {
reader := DockerJSONReader{
stream: stream,
partial: partial,
reader: r,
- forceCRI: forceCRI,
criflags: CRIFlags,
}
+ switch strings.ToLower(format) {
+ case "docker", "json-file":
+ reader.parseLine = reader.parseDockerJSONLog
+ case "cri":
+ reader.parseLine = reader.parseCRILog
+ default:
+ reader.parseLine = reader.parseAuto
+ }
+
if runtime.GOOS == "windows" {
reader.stripNewLine = stripNewLineWin
} else {
@@ -92,7 +100,7 @@ func (p *DockerJSONReader) parseCRILog(message *reader.Message, msg *logLine) er
if len(log) < split {
return errors.New("invalid CRI log format")
}
- ts, err := time.Parse(time.RFC3339, string(log[i]))
+ ts, err := time.Parse(time.RFC3339Nano, string(log[i]))
if err != nil {
return errors.Wrap(err, "parsing CRI timestamp")
}
@@ -155,12 +163,7 @@ func (p *DockerJSONReader) parseDockerJSONLog(message *reader.Message, msg *logL
return nil
}
-func (p *DockerJSONReader) parseLine(message *reader.Message, msg *logLine) error {
- if p.forceCRI {
- return p.parseCRILog(message, msg)
- }
-
- // If froceCRI isn't set, autodetect file type
+func (p *DockerJSONReader) parseAuto(message *reader.Message, msg *logLine) error {
if len(message.Content) > 0 && message.Content[0] == '{' {
return p.parseDockerJSONLog(message, msg)
}
diff --git a/libbeat/reader/readjson/docker_json_test.go b/libbeat/reader/readjson/docker_json_test.go
index a89c8341991..f83d9b38b9f 100644
--- a/libbeat/reader/readjson/docker_json_test.go
+++ b/libbeat/reader/readjson/docker_json_test.go
@@ -33,7 +33,7 @@ func TestDockerJSON(t *testing.T) {
input [][]byte
stream string
partial bool
- forceCRI bool
+ format string
criflags bool
expectedError bool
expectedMessage reader.Message
@@ -206,12 +206,22 @@ func TestDockerJSON(t *testing.T) {
name: "Force CRI with JSON logs",
input: [][]byte{[]byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`)},
stream: "all",
- forceCRI: true,
+ format: "cri",
expectedError: true,
expectedMessage: reader.Message{
Bytes: 82,
},
},
+ {
+ name: "Force JSON with CRI logs",
+ input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)},
+ stream: "all",
+ format: "docker",
+ expectedError: true,
+ expectedMessage: reader.Message{
+ Bytes: 115,
+ },
+ },
{
name: "Force CRI log no tags",
input: [][]byte{[]byte(`2017-09-12T22:32:21.212861448Z stdout 2017-09-12 22:32:21.212 [INFO][88] table.go 710: Invalidating dataplane cache`)},
@@ -222,7 +232,7 @@ func TestDockerJSON(t *testing.T) {
Ts: time.Date(2017, 9, 12, 22, 32, 21, 212861448, time.UTC),
Bytes: 115,
},
- forceCRI: true,
+ format: "cri",
criflags: false,
},
{
@@ -235,7 +245,7 @@ func TestDockerJSON(t *testing.T) {
Ts: time.Date(2017, 9, 12, 22, 32, 21, 212861448, time.UTC),
Bytes: 117,
},
- forceCRI: true,
+ format: "cri",
criflags: true,
},
{
@@ -252,7 +262,7 @@ func TestDockerJSON(t *testing.T) {
Ts: time.Date(2017, 10, 12, 13, 32, 21, 232861448, time.UTC),
Bytes: 163,
},
- forceCRI: true,
+ format: "cri",
criflags: true,
},
{
@@ -269,7 +279,7 @@ func TestDockerJSON(t *testing.T) {
Bytes: 164,
},
partial: true,
- forceCRI: true,
+ format: "cri",
criflags: true,
},
{
@@ -290,7 +300,7 @@ func TestDockerJSON(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
r := &mockReader{messages: test.input}
- json := New(r, test.stream, test.partial, test.forceCRI, test.criflags)
+ json := New(r, test.stream, test.partial, test.format, test.criflags)
message, err := json.Next()
if test.expectedError {