-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add experimental Docker json-file prospector #5402
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
package reader | ||
|
||
import ( | ||
"bytes" | ||
"encoding/json" | ||
"time" | ||
|
||
"github.com/elastic/beats/libbeat/common" | ||
|
||
"github.com/pkg/errors" | ||
) | ||
|
||
// DockerJSON processor renames a given field | ||
type DockerJSON struct { | ||
reader Reader | ||
} | ||
|
||
type dockerLog struct { | ||
Timestamp string `json:"time"` | ||
Log string `json:"log"` | ||
Stream string `json:"stream"` | ||
} | ||
|
||
// NewDockerJSON creates a new reader renaming a field | ||
func NewDockerJSON(r Reader) *DockerJSON { | ||
return &DockerJSON{reader: r} | ||
} | ||
|
||
// Next returns the next line. | ||
func (p *DockerJSON) Next() (Message, error) { | ||
message, err := p.reader.Next() | ||
if err != nil { | ||
return message, err | ||
} | ||
|
||
var line dockerLog | ||
dec := json.NewDecoder(bytes.NewReader(message.Content)) | ||
if err = dec.Decode(&line); err != nil { | ||
return message, errors.Wrap(err, "decoding docker JSON") | ||
} | ||
|
||
// Parse timestamp | ||
ts, err := time.Parse(time.RFC3339, line.Timestamp) | ||
if err != nil { | ||
return message, errors.Wrap(err, "parsing docker timestamp") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Perhaps worth adding the given timestamp to the error. Probably already part of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. parse error looks pretty verbose: https://golang.org/src/time/format.go?s=23626:23672#L669 |
||
} | ||
|
||
message.AddFields(common.MapStr{ | ||
"stream": line.Stream, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could you give some details on what values There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see below this is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just pushed it, thanks! |
||
}) | ||
message.Content = []byte(line.Log) | ||
message.Ts = ts | ||
return message, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package reader | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/elastic/beats/libbeat/common" | ||
|
||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestDockerJSON(t *testing.T) { | ||
tests := []struct { | ||
input []byte | ||
expectedError bool | ||
expectedMessage Message | ||
}{ | ||
// Common log message | ||
{ | ||
input: []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout","time":"2017-11-09T13:27:36.277747246Z"}`), | ||
expectedMessage: Message{ | ||
Content: []byte("1:M 09 Nov 13:27:36.276 # User requested shutdown...\n"), | ||
Fields: common.MapStr{"stream": "stdout"}, | ||
Ts: time.Date(2017, 11, 9, 13, 27, 36, 277747246, time.UTC), | ||
}, | ||
}, | ||
// Wrong JSON | ||
{ | ||
input: []byte(`this is not JSON`), | ||
expectedError: true, | ||
}, | ||
// Missing time | ||
{ | ||
input: []byte(`{"log":"1:M 09 Nov 13:27:36.276 # User requested shutdown...\n","stream":"stdout"}`), | ||
expectedError: true, | ||
}, | ||
} | ||
|
||
for _, test := range tests { | ||
r := mockReader{message: test.input} | ||
json := NewDockerJSON(r) | ||
message, err := json.Next() | ||
|
||
assert.Equal(t, test.expectedError, err != nil) | ||
|
||
if !test.expectedError { | ||
assert.EqualValues(t, test.expectedMessage, message) | ||
} | ||
} | ||
} | ||
|
||
type mockReader struct { | ||
message []byte | ||
} | ||
|
||
func (m mockReader) Next() (Message, error) { | ||
return Message{ | ||
Content: m.message, | ||
}, nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,10 +4,11 @@ import "github.com/elastic/beats/libbeat/common/match" | |
|
||
// Contains available prospector types | ||
const ( | ||
LogType = "log" | ||
StdinType = "stdin" | ||
RedisType = "redis" | ||
UdpType = "udp" | ||
LogType = "log" | ||
StdinType = "stdin" | ||
RedisType = "redis" | ||
UdpType = "udp" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. const UdpType should be UDPType |
||
DockerType = "docker" | ||
) | ||
|
||
// MatchAny checks if the text matches any of the regular expressions | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
package docker | ||
|
||
var defaultConfig = config{ | ||
Containers: containers{ | ||
IDs: []string{}, | ||
Path: "/var/lib/docker/containers", | ||
}, | ||
} | ||
|
||
type config struct { | ||
Containers containers `config:"containers"` | ||
} | ||
|
||
type containers struct { | ||
IDs []string `config:"ids"` | ||
Path string `config:"path"` | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package docker | ||
|
||
import ( | ||
"path" | ||
|
||
"github.com/elastic/beats/filebeat/channel" | ||
"github.com/elastic/beats/filebeat/prospector" | ||
"github.com/elastic/beats/filebeat/prospector/log" | ||
"github.com/elastic/beats/libbeat/common" | ||
"github.com/elastic/beats/libbeat/common/cfgwarn" | ||
|
||
"github.com/pkg/errors" | ||
) | ||
|
||
func init() { | ||
err := prospector.Register("docker", NewProspector) | ||
if err != nil { | ||
panic(err) | ||
} | ||
} | ||
|
||
// NewProspector creates a new docker prospector | ||
func NewProspector(cfg *common.Config, outletFactory channel.Factory, context prospector.Context) (prospector.Prospectorer, error) { | ||
cfgwarn.Experimental("Docker prospector is enabled.") | ||
|
||
config := defaultConfig | ||
if err := cfg.Unpack(&config); err != nil { | ||
return nil, errors.Wrap(err, "reading docker prospector config") | ||
} | ||
|
||
// Wrap log prospector with custom docker settings | ||
if len(config.Containers.IDs) > 0 { | ||
for idx, containerID := range config.Containers.IDs { | ||
cfg.SetString("paths", idx, path.Join(config.Containers.Path, containerID, "*.log")) | ||
} | ||
} | ||
|
||
if err := cfg.SetBool("docker-json", -1, true); err != nil { | ||
return nil, errors.Wrap(err, "update prospector config") | ||
} | ||
return log.NewProspector(cfg, outletFactory, context) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -83,6 +83,9 @@ type config struct { | |
MaxBytes int `config:"max_bytes" validate:"min=0,nonzero"` | ||
Multiline *reader.MultilineConfig `config:"multiline"` | ||
JSON *reader.JSONConfig `config:"json"` | ||
|
||
// Hidden on purpose, used by the docker prospector: | ||
DockerJSON bool `config:"docker-json"` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Kind of hacky but don't have a better solution. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @urso recommended something like this to avoid repeating code or a major refactor, works for me 😇 |
||
} | ||
|
||
type LogConfig struct { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -126,6 +126,8 @@ func (h *Harvester) open() error { | |
return h.openStdin() | ||
case harvester.LogType: | ||
return h.openFile() | ||
case harvester.DockerType: | ||
return h.openFile() | ||
default: | ||
return fmt.Errorf("Invalid harvester type: %+v", h.config) | ||
} | ||
|
@@ -515,6 +517,11 @@ func (h *Harvester) newLogFileReader() (reader.Reader, error) { | |
return nil, err | ||
} | ||
|
||
if h.config.DockerJSON { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's interesting that form an implementation point of view it just comes down to adding a reader in the harvester (simplified). Seems like the log harvester is pretty generic and perhaps should be extracted. Prospectors then could configure "readers" on the harvester. (just thinking out loud for later and @urso was also talking about similar stuff in the past). |
||
// Docker json-file format, add custom parsing to the pipeline | ||
r = reader.NewDockerJSON(r) | ||
} | ||
|
||
if h.config.JSON != nil { | ||
r = reader.NewJSON(r, h.config.JSON) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we call this
log.stream
or something similar?log.source
? Source is already overloaded.@weltenwort has some ideas for a better name?